Orchestrating ETL with Apache Airflow and Workflows
Apache Airflow is a workflow orchestration platform that schedules, monitors, and retries ETL pipelines. Instead of cron jobs scattered across servers, Airflow provides a unified control plane: define pipelines as code (Directed Acyclic Graphs or DAGs), set up task dependencies, automatic retry, data-aware scheduling, and a web UI for monitoring. Without orchestration, managing hundreds of interdependent jobs is chaos: task A depends on task B, which depends on tasks C and D; if C fails, D doesn't wait, leading to cascading failures. Airflow solves this. According to a 2026 Statista survey, 67% of enterprises use workflow orchestration (Airflow, Prefect, Dagster) for production pipelines; 89% reported reduced pipeline failures and 45% improved time-to-insight.
Airflow Concepts: DAGs, Tasks, Operators
DAG (Directed Acyclic Graph) is a workflow definition: a set of tasks with explicit dependencies, e.g., "extract" → "transform" → "load". Each task is a node; dependencies are edges.
Operators are executable units: PythonOperator runs Python code, BashOperator runs shell scripts, HttpOperator calls APIs, S3Operator interacts with S3, etc. Airflow provides 200+ built-in operators; you can write custom ones.
Here is a minimal ETL DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# Define DAG metadata
default_args = {
'owner': 'data_engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2026, 1, 1),
'email_on_failure': ['[email protected]']
}
dag = DAG(
'unstructured_etl',
default_args=default_args,
description='ETL pipeline for unstructured documents',
schedule_interval='@daily', # Run every day at midnight UTC
catchup=False
)
# Define Python tasks
def extract_documents():
"""Extract documents from S3."""
print("Extracting documents...")
# Call API or list S3 objects
return 1000 # Return count of documents
def transform_documents():
"""Transform and chunk documents."""
print("Transforming documents...")
# Clean, chunk, embed
return 800 # Return count of transformed documents
def load_to_vector_db():
"""Load embeddings to Pinecone."""
print("Loading to vector database...")
# Upsert embeddings
# Create task objects
extract = PythonOperator(
task_id='extract',
python_callable=extract_documents,
dag=dag
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_documents,
dag=dag
)
load = PythonOperator(
task_id='load',
python_callable=load_to_vector_db,
dag=dag
)
# Define task dependencies
extract >> transform >> load # extract runs first, then transform, then load
This DAG runs daily, retrying each task up to 2 times if it fails, and sends an alert email on failure.
Data-Aware Scheduling: provide_context and XCom
Airflow passes data between tasks using XCom (cross-communication):
def extract_with_checkpoint(**context):
"""Extract documents and save checkpoint."""
from datetime import datetime, timedelta
# Get execution date (when the DAG run was scheduled)
exec_date = context['execution_date']
previous_exec_date = context['prev_data_interval_start']
print(f"Extracting since {previous_exec_date}")
# Extract documents modified since last run
documents = fetch_documents_since(previous_exec_date)
# Push result to XCom for downstream tasks
context['task_instance'].xcom_push(key='doc_count', value=len(documents))
context['task_instance'].xcom_push(key='documents', value=documents)
return {"status": "success", "count": len(documents)}
def transform_with_context(**context):
"""Get extracted documents from upstream task."""
# Pull from XCom (get data pushed by extract task)
ti = context['task_instance']
documents = ti.xcom_pull(task_ids='extract', key='documents')
doc_count = ti.xcom_pull(task_ids='extract', key='doc_count')
print(f"Transforming {doc_count} documents...")
transformed = [transform(doc) for doc in documents]
ti.xcom_push(key='transformed_count', value=len(transformed))
return transformed
extract_task = PythonOperator(
task_id='extract',
python_callable=extract_with_checkpoint,
provide_context=True,
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform_with_context,
provide_context=True,
dag=dag
)
extract_task >> transform_task
XCom enables stateful pipelines: extract passes document list to transform, transform passes embeddings to load.
Error Handling and Retries
Airflow automatically retries failed tasks:
dag = DAG(
'robust_etl',
default_args={
'owner': 'data_engineering',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'max_tries': 5 # Total attempts = retries + 1
},
schedule_interval='@daily'
)
# Task-specific retry overrides
extract = PythonOperator(
task_id='extract',
python_callable=extract_documents,
retries=5, # Retry this task up to 5 times (override DAG default)
retry_delay=timedelta(minutes=1),
dag=dag
)
# Exponential backoff for flaky APIs
def backoff_decorator(max_retries=5, backoff_factor=2):
import time
def decorator(func):
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = backoff_factor ** attempt
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s")
time.sleep(wait_time)
return wrapper
return decorator
@backoff_decorator(max_retries=5)
def fetch_api_with_retry():
# Exponential backoff: 1s, 2s, 4s, 8s, 16s
response = requests.get("https://api.example.com/documents")
response.raise_for_status()
return response.json()
Monitoring and Alerts
Airflow's web UI shows pipeline status, logs, and XCom data. For production, configure alerts:
from airflow.providers.email.operators.email import EmailOperator
from airflow.models import Variable
dag = DAG(
'monitored_etl',
default_args={
'owner': 'data_engineering',
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True
}
)
# Tasks...
# Alert on failure
send_alert = EmailOperator(
task_id='send_failure_alert',
to='[email protected]',
subject='ETL Pipeline Failed: {{ ds }}',
html_content="""
Pipeline {{ dag.dag_id }} failed on {{ ds }}.
Check logs at {{ logs_url }}
""",
trigger_rule='one_failed', # Only run if any upstream task fails
dag=dag
)
# Success notification
notify_success = EmailOperator(
task_id='notify_success',
to='[email protected]',
subject='ETL Success: {{ ds }}',
html_content="""
Pipeline {{ dag.dag_id }} completed successfully on {{ ds }}.
Documents processed: {{ ti.xcom_pull(task_ids='extract', key='doc_count') }}
""",
trigger_rule='all_success',
dag=dag
)
# Conditional branching based on task results
from airflow.operators.python import BranchPythonOperator
def check_doc_count(**context):
doc_count = context['ti'].xcom_pull(task_ids='extract', key='doc_count')
if doc_count > 0:
return 'transform'
else:
return 'skip_transform'
branch = BranchPythonOperator(
task_id='check_docs',
python_callable=check_doc_count,
dag=dag
)
transform = PythonOperator(task_id='transform', python_callable=lambda: None, dag=dag)
skip = PythonOperator(task_id='skip_transform', python_callable=lambda: None, dag=dag)
extract >> branch
branch >> [transform, skip]
transform >> notify_success
Production Deployment: Multi-Machine Airflow
For production, deploy Airflow on a cluster:
# docker-compose.yml for Airflow cluster
version: '3.8'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7
webserver:
image: apache/airflow:2.7.0-python3.10
command: webserver
ports:
- "8080:8080"
depends_on:
- postgres
- redis
environment:
AIRFLOW_HOME: /opt/airflow
AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dags
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CELERY__BROKER_URL: redis://redis:6379/0
scheduler:
image: apache/airflow:2.7.0-python3.10
command: scheduler
depends_on:
- postgres
- redis
environment:
AIRFLOW_HOME: /opt/airflow
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
worker1:
image: apache/airflow:2.7.0-python3.10
command: celery worker
depends_on:
- redis
environment:
AIRFLOW_HOME: /opt/airflow
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
volumes:
postgres_data:
This deploys:
- Webserver: UI at
http://localhost:8080 - Scheduler: Triggers DAG runs on schedule
- Workers: Execute tasks in parallel across machines
Key Takeaways
- Airflow is a workflow orchestrator: define pipelines as DAGs (directed acyclic graphs), set task dependencies, automatic retry.
- Use
provide_context=Trueand XCom to pass data between tasks. - Configure retries, exponential backoff, and alerts for production robustness.
- Deploy on a cluster with Celery workers for horizontal scalability.
- Monitor pipeline health via web UI, logs, and email alerts.
Frequently Asked Questions
Should I use Airflow or Prefect or Dagster?
Airflow is most mature (10+ years), largest community, most operators. Prefect has superior UI/UX and task-level retries. Dagster is newer, with asset-oriented design. Start with Airflow for production workflows; switch to Prefect if you love the UI; Dagster if you prefer asset-based thinking.
How do I test DAGs locally before deploying?
Use pytest with Airflow's test context: LocalExecutor runs tasks sequentially. Test each task independently, mock API calls, and use the dag.test() method to simulate full DAG runs.
What if a task takes longer than expected?
Set execution_timeout=timedelta(hours=2) to kill tasks exceeding 2 hours. Monitor task duration trends; if average duration increases, investigate (slow API, larger datasets, resource contention).
Can I run DAGs on Kubernetes instead of VMs?
Yes, use KubernetesPodOperator to launch tasks in Kubernetes pods. Each task gets its own container, enabling fine-grained resource allocation and automatic scaling.
How do I handle data quality checks in Airflow?
Add a "validate" task between extract and transform using Great Expectations or custom validators. If validation fails, trigger an alert and skip downstream tasks (using trigger_rule='none_failed_min_one_success').