Airflow e AWS EMR
Nesta aula, vamos explorar como o Apache Airflow pode orquestrar tarefas de processamento de dados em grande escala utilizando o Amazon EMR (anteriormente chamado de Amazon Elastic MapReduce). Esta combinação é muito poderosa para pipelines de dados que precisam de recursos computacionais elásticos e sob demanda.
O que é o Amazon EMR?
O Amazon EMR é um serviço gerenciado da AWS que facilita o processamento de grandes volumes de dados utilizando frameworks como Apache Spark, Apache Hadoop, Apache Hive, Apache HBase
EMR em Engenharia de Dados
O EMR é amplamente utilizado em Engenharia de Dados para:
- Processamento Big Data: Transformar e analisar terabytes ou petabytes de dados
- ETL em Escala: Executar transformações complexas que seriam inviáveis em máquinas individuais
- Machine Learning: Treinar modelos com grandes datasets utilizando Spark MLlib
- Análise de Logs: Processar logs de aplicações, servidores web, etc.
Como principais características do EMR, podemos citar:
- Elasticidade: Você pode dimensionar clusters (escalonamento horizontal) com base na demanda
- Gerenciado: A AWS cuida da infraestrutura, instalação e configuração dos frameworks
- Custo-efetivo: Pague apenas pelos recursos que usar, quando usar
- Integração: Funciona nativamente com outros serviços AWS (S3, RDS, Redshift, etc.)
Por que Airflow + EMR?
A combinação do Airflow com o EMR oferece vários benefícios:
- Orquestração Inteligente: O Airflow pode criar clusters EMR sob demanda, executar jobs e depois destruir os clusters, otimizando custos. Uma outra opção é utilizar clusters persistentes, mas que podem ser escalados conforme a necessidade
- Gestão de Dependências: Coordenar múltiplos jobs Spark que dependem uns dos outros
- Monitoramento Centralizado: Acompanhar todo o pipeline utilizando a interface do Airflow
- Recuperação de Falhas: Retry automático e notificações em caso de problemas
- Escalabilidade: Ajustar o tamanho do cluster baseado na carga de trabalho de cada etapa
Preparando o Ambiente
Vamos configurar nosso ambiente para trabalhar com Airflow e EMR.
Iremos utilizar um ambiente muito parecido com o que usamos na aula anterior de Airflow, com algumas modificações para suportar a integração com a AWS.
Exercício
Path!
Considere que o restante da aula será feito dentro da pasta airflow-emr
.
Exercício
Criar AWS_ACCESS_KEY_ID
e AWS_SECRET_ACCESS_KEY
Para criar as credenciais da AWS, siga o passo a passo:
-
Acesse o AWS Management Console e vá para o serviço IAM.
-
No menu lateral, clique em Users e depois em Create user.
-
Dê um nome ao usuário (ex:
u_airflow_emr
) e selecione a opção Programmatic access. -
Clique em Next e selecione Attach policies directly. Procure pelas políticas gerenciadas
AmazonS3FullAccess
,AmazonElasticMapReduceFullAccess
,CloudWatchFullAccess
eAmazonEMRFullAccessPolicy_v2
e selecione-as.Atenção
Em um ambiente de produção, é recomendável criar uma política personalizada com permissões mínimas necessárias para o usuário.
Aqui, estamos utilizando políticas gerenciadas para simplificar o processo.
-
Clique em Next. Você deve ver as políticas selecionadas.
-
Clique em Create user.
-
Com o usuário criado, clique no nome de usuário. Na seção superior, você deve ver uma opção Access key 1 Create access key. Clique nela e selecione a opção Other.
-
Na próxima página você terá a opção de adicionar tags (opcional). Clique em Next.
-
Na última página, você verá a Access Key ID e a Secret Access Key. Copie ambas e utilize no arquivo
.env
.
docker-compose.yml
:
# Basic Airflow cluster configuration for LocalExecutor.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:3.0.6
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed.
# Default: .
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested).
# Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested).
# Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
# Use this option ONLY for quick checks. Installing requirements at container
# startup is done EVERY TIME the service is started.
# A better way is to build a custom image or extend the official image
# as described in https://airflow.apache.org/docs/docker-stack/build.html.
# Default: ''
#
# Feel free to modify this file to suit your needs.
---
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider distributions you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:3.0.6}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__CORE__AUTH_MANAGER: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/'
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
# yamllint enable rule:line-length
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
# WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
# for other purpose (development, test and especially production usage) build/extend Airflow image.
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-amazon>=8.0.0 boto3>=1.26.0}
# The following line can be used to set a custom config file, stored in the local config folder
AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
postgres:
condition: service_healthy
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
airflow-apiserver:
<<: *airflow-common
command: api-server
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/version"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-dag-processor:
<<: *airflow-common
command: dag-processor
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
export AIRFLOW_UID=$$(id -u)
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
echo
echo "Creating missing opt dirs if missing:"
echo
mkdir -v -p /opt/airflow/{logs,dags,plugins,config}
echo
echo "Airflow version:"
/entrypoint airflow version
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
echo
echo "Running airflow config list to create default config file if missing."
echo
/entrypoint airflow config list >/dev/null
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
echo
echo "Change ownership of files in /opt/airflow to ${AIRFLOW_UID}:0"
echo
chown -R "${AIRFLOW_UID}:0" /opt/airflow/
echo
echo "Change ownership of files in shared volumes to ${AIRFLOW_UID}:0"
echo
chown -v -R "${AIRFLOW_UID}:0" /opt/airflow/{logs,dags,plugins,config}
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
depends_on:
<<: *airflow-common-depends-on
volumes:
postgres-db-volume:
Exercício
Principais modificações no docker-compose.yml
Em relação à aula passada, as principais modificações foram:
- Utilização do
LocalExecutor
ao invés doCeleryExecutor
: Como não precisamos de alta disponibilidade ou escalabilidade horizontal para este exemplo, oLocalExecutor
é mais simples e fácil de configurar. O principal fornecedor de poder de processamento será o EMR, não o Airflow (que será executado localmente e ficará responsável pela orquestração). - Remoção dos serviços Flower e Redis: Como não estamos utilizando o
CeleryExecutor
, não precisamos do Redis como broker de mensagens e nem do Flower para monitorar tarefas. - Configuração do
environment
do&airflow-common-env
, contendo as variáveis de ambiente para as credenciais AWS:
AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
_PIP_ADDITIONAL_REQUIREMENTS
para incluir o provedor Amazon, necessário para interagir com o EMR.
- Atualização da versão da imagem postgres
para 16-alpine
(mais recente): poderíamos manter a 13-alpine
, mas é sempre bom usar versões mais recentes quando possível!
Exercício
Preparando Recursos AWS
Antes de criar nosso DAG, precisamos preparar alguns recursos na AWS.
Criando um Bucket S3
Exercício
Criando Dados de Exemplo
Exercício
Exercício
Criando Script Spark
Exercício
Exercício
Exercício
Criando DAG Airflow + EMR
Agora vamos criar um DAG que orquestra todo o processo.
As etapas serão:
- Criar cluster EMR,
- Executar job Spark,
- Destruir o cluster.
Exercício
Entendendo o DAG EMR
Este DAG demonstra um padrão comum ao trabalhar com EMR:
- EmrCreateJobFlowOperator: Cria um cluster EMR com a configuração especificada
- EmrAddStepsOperator: Adiciona steps (jobs) ao cluster. No nosso caso, um job Spark
- EmrStepSensor: Monitora a execução do step até sua conclusão
- EmrTerminateJobFlowOperator: Termina o cluster para evitar custos desnecessários
XComs: Note o uso intensivo de XComs (xcom_pull
) para passar o ID do cluster entre as tarefas.
Configurando Roles IAM
Para que o EMR funcione, precisamos de IAM roles específicas.
Exercício
Exercício
Answer
Executando o Pipeline
Vamos executar nosso DAG e monitorar a execução.
Atenção
Antes de prosseguir, garanta que você está ciente dos custos associados ao uso do EMR.
Clusters EMR podem gerar custos significativos dependendo do tamanho e duração.
Não faça o restante da aula pela metade nem deixe clusters rodando ao final da aula.
Aviso
Acesse o menu EMR do AWS Console e verifique que não há clusters EMR rodando antes de iniciar o DAG.
Utilizaremos o Console para monitorar a criação e destruição do cluster.
Exercício
Exercício
Exercício
Answer
O cluster deve estar em estado Terminated.
Exercício
Exercício
Otimizações
Uso de Spot Instances
As Spot Instances são uma maneira de economizar custos ao executar workloads tolerantes a falhas. Graças à escala da AWS, é possível aproveitar a capacidade ociosa do EC2 com descontos que chegam a 90% em comparação ao preço sob demanda, por meio das instâncias spot do Amazon EC2.
Info!
Embora a AWS possa retomar essa capacidade com um aviso prévio de dois minutos, menos de 5% das cargas de trabalho sofrem interrupções.
Exercício
Parametrização
Exercício
Limpeza e Finalização
Exercício
Conclusão
Nesta aula, exploramos como combinar a orquestração do Apache Airflow com a capacidade de processamento em escala do Amazon EMR. Esta combinação é especialmente útil para casos de uso de Big Data em produção!
Arquitetura híbrida
Esta arquitetura é um exemplo de uma solução que poderia ser escolhida para ambientes híbridos, onde o Airflow gerencia a orquestração on-premises, enquanto o EMR lida com o processamento pesado na nuvem.
Isso permite aproveitar o melhor dos dois mundos: controle local e poder de processamento escalável.
Uma opção alternativa seria rodar o Airflow também na nuvem (ex: EC2 ou MWAA - Managed Workflows for Apache Airflow)!
Caso queira se aprofundar, considere estudar a integração do Airflow com outros serviços: Lambda functions, Step Functions, Glue.