Apache Airflow Advanced Guide: From DAG Optimization to Monitoring
⏱️ 25 min
📊 Advanced
Apache Airflow Advanced Guide: From DAG Optimization to Monitoring
Learn advanced features and best practices of Apache Airflow commonly used in production environments and apply them to real projects.
📖 Table of Contents
- Deep Understanding of Airflow Architecture
- DAG Optimization Strategies
- Advanced Scheduling and Triggers
- Error Handling and Retry Strategies
- Monitoring and Alerting Systems
- Hands-on: Building Production-Grade Data Pipeline
- Performance Tuning and Scalability
- Security and Access Control
- Learning Summary
🏗️ Deep Understanding of Airflow Architecture
Core Component Analysis
Apache Airflow consists of the following core components:
1. Scheduler
- Role: DAG parsing, Task instance creation, execution scheduling
- Operation: Periodically checks DAG status in metadata database
- Performance Optimization: Parallel processing, DAG parsing caching
2. Executor
- LocalExecutor: Sequential execution on single machine
- CeleryExecutor: Parallel execution in distributed environment
- KubernetesExecutor: Execution on Kubernetes cluster
- LocalKubernetesExecutor: Hybrid execution mode
3. Webserver
- UI Provision: DAG monitoring, log viewing, manual execution
- REST API: Integration with external systems
- Authentication/Authorization: Security management
4. Metadata Database
- PostgreSQL/MySQL: Metadata storage
- Connection Info: Connection, Variable, XCom
- Execution History: Task instances, log information
Understanding Metadata Database Schema
-- Key table structures
-- dag: DAG metadata
-- dag_run: DAG execution instances
-- task_instance: Task execution instances
-- log: Execution logs
-- connection: Database connection information
-- variable: Global variables
-- xcom: Data transfer between tasks
⚡ DAG Optimization Strategies
1. DAG Structure Optimization
Efficient Task Definition
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.task_group import TaskGroup
# Basic DAG settings
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'catchup': False, # Skip past executions
'max_active_runs': 1, # Limit concurrent executions
}
# DAG definition
dag = DAG(
'advanced_data_pipeline',
default_args=default_args,
description='Advanced data pipeline',
schedule_interval='0 2 * * *', # Daily at 2 AM
max_active_runs=1,
tags=['data-engineering', 'etl'],
)
# Structured approach using Task Group
with TaskGroup("data_extraction", tooltip="Data extraction") as extraction_group:
# File existence check
file_sensor = FileSensor(
task_id='check_source_file',
filepath='/data/input/sales_data.csv',
poke_interval=30,
timeout=300,
)
# Data validation
validate_data = PythonOperator(
task_id='validate_data',
python_callable=validate_source_data,
op_kwargs={'file_path': '/data/input/sales_data.csv'},
)
# Main processing logic
with TaskGroup("data_processing", tooltip="Data processing") as processing_group:
# Data transformation
transform_data = PythonOperator(
task_id='transform_data',
python_callable=transform_sales_data,
pool='data_processing_pool', # Use resource pool
pool_slots=2,
)
# Data loading
load_data = PythonOperator(
task_id='load_data',
python_callable=load_to_warehouse,
trigger_rule='all_success',
)
# Dependency setup
extraction_group >> processing_group
2. Resource Management Optimization
Pool and Queue Utilization
# airflow.cfg settings
[celery]
worker_concurrency = 16
worker_precheck = True
# Pool definition
from airflow.models import Pool
# Create resource pools
Pool.create(
pool_name='data_processing_pool',
slots=4,
description='Dedicated pool for data processing'
)
Pool.create(
pool_name='heavy_computation_pool',
slots=2,
description='Dedicated pool for heavy computations'
)
# Use Pool in tasks
heavy_task = PythonOperator(
task_id='heavy_computation',
python_callable=heavy_computation_function,
pool='heavy_computation_pool',
pool_slots=1,
)
3. Memory Usage Optimization
# Chunk-based processing for large datasets
def process_large_dataset(**context):
"""Process large dataset in chunks"""
chunk_size = 10000
for chunk in pd.read_csv(
'/data/large_dataset.csv',
chunksize=chunk_size
):
# Process each chunk
processed_chunk = process_chunk(chunk)
# Save intermediate results
save_intermediate_result(processed_chunk)
# Clean up memory
del processed_chunk
gc.collect()
# File-based data transfer instead of XCom
def save_large_data(**context):
"""Save large data to file"""
large_data = context['task_instance'].xcom_pull(task_ids='previous_task')
# Save to file
with open('/tmp/large_data.pkl', 'wb') as f:
pickle.dump(large_data, f)
return '/tmp/large_data.pkl'
def load_large_data(**context):
"""Load large data from file"""
file_path = context['task_instance'].xcom_pull(task_ids='save_task')
with open(file_path, 'rb') as f:
return pickle.load(f)
⏰ Advanced Scheduling and Triggers
1. Dynamic Scheduling
# Conditional scheduling
def should_run_dag(**context):
"""Check DAG execution conditions"""
# Run only under specific conditions
if datetime.now().weekday() in [0, 2, 4]: # Mon, Wed, Fri
return True
return False
# Conditional DAG
dag = DAG(
'conditional_dag',
schedule_interval='0 9 * * *',
catchup=False,
)
# Conditional execution
conditional_task = BranchPythonOperator(
task_id='check_conditions',
python_callable=should_run_dag,
dag=dag,
)
# Dynamic task creation
def create_dynamic_tasks(**context):
"""Create tasks dynamically at runtime"""
# Get task list from external system
task_list = get_external_task_list()
for task_info in task_list:
task = PythonOperator(
task_id=f"dynamic_task_{task_info['id']}",
python_callable=execute_dynamic_task,
op_kwargs={'task_info': task_info},
)
yield task
2. External Trigger Utilization
# External trigger via REST API
from airflow.api_connexion.endpoints import dag_run_endpoint
from airflow.models import DagRun
# Trigger DAG from external system
def trigger_dag_externally(dag_id, conf=None):
"""Trigger DAG from external system"""
dag_run = DagRun.create(
dag_id=dag_id,
execution_date=datetime.now(),
state='running',
conf=conf,
)
return dag_run
# Trigger via webhook
from airflow.operators.http_operator import SimpleHttpOperator
webhook_trigger = SimpleHttpOperator(
task_id='webhook_trigger',
http_conn_id='webhook_connection',
endpoint='trigger',
method='POST',
data='{"dag_id": "target_dag"}',
dag=dag,
)
3. Schedule Optimization
# Cron expression utilization
from airflow.timetables.trigger import CronTriggerTimetable
# Complex scheduling
custom_schedule = CronTriggerTimetable(
cron='0 2 * * 1-5', # Weekdays at 2 AM
timezone='Asia/Seoul'
)
dag = DAG(
'business_hours_dag',
timetable=custom_schedule,
catchup=False,
)
# Data-driven scheduling
def get_next_run_time(**context):
"""Determine next run time based on data status"""
last_processed = get_last_processed_time()
data_available = check_data_availability()
if data_available:
return datetime.now()
else:
return datetime.now() + timedelta(hours=1)
🚨 Error Handling and Retry Strategies
1. Advanced Retry Logic
# Exponential backoff retry
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import random
class SmartRetryOperator(BaseOperator):
"""Intelligent retry operator"""
@apply_defaults
def __init__(
self,
max_retries=3,
base_delay=60,
max_delay=300,
exponential_base=2,
jitter=True,
**kwargs
):
super().__init__(**kwargs)
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def execute(self, context):
"""Execution logic"""
try:
return self._execute_logic(context)
except Exception as e:
if self._should_retry(e, context):
self._schedule_retry(context)
else:
raise e
def _should_retry(self, exception, context):
"""Determine retry necessity"""
# Retry only specific exceptions
retryable_exceptions = (ConnectionError, TimeoutError)
return isinstance(exception, retryable_exceptions)
def _schedule_retry(self, context):
"""Schedule retry"""
retry_count = context['task_instance'].try_number - 1
delay = min(
self.base_delay * (self.exponential_base ** retry_count),
self.max_delay
)
if self.jitter:
delay += random.uniform(0, delay * 0.1)
# Schedule retry
from airflow.models import TaskInstance
ti = context['task_instance']
ti.state = 'up_for_retry'
ti.next_method = 'retry'
ti.next_kwargs = {'delay': delay}
2. Error Alert System
# Slack notification
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
def send_slack_alert(context):
"""Send error alert to Slack"""
task_instance = context['task_instance']
message = f"""
🚨 Airflow Task Failure Alert
DAG: {task_instance.dag_id}
Task: {task_instance.task_id}
Execution Time: {task_instance.start_date}
Error: {context.get('exception', 'Unknown error')}
Log: {task_instance.log_url}
"""
return SlackWebhookOperator(
task_id='slack_alert',
http_conn_id='slack_webhook',
message=message,
)
# Email notification
from airflow.operators.email_operator import EmailOperator
email_alert = EmailOperator(
task_id='email_alert',
to=['data-team@company.com'],
subject='Airflow Task Failure - ',
html_content="""
<h2>Task Failure Alert</h2>
<p><strong>DAG:</strong> {task_instance.dag_id}</p>
<p><strong>Task:</strong> {task_instance.task_id}</p>
<p><strong>Execution Time:</strong> {task_instance.start_date}</p>
<p><strong>Error:</strong> {context.get('exception', 'Unknown error')}</p>
""",
trigger_rule='one_failed',
)
3. Circuit Breaker Pattern
class CircuitBreakerOperator(BaseOperator):
"""Circuit breaker pattern implementation"""
def __init__(
self,
failure_threshold=5,
recovery_timeout=300,
expected_exception=Exception,
**kwargs
):
super().__init__(**kwargs)
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
def execute(self, context):
"""Circuit breaker logic"""
circuit_state = self._get_circuit_state()
if circuit_state == 'OPEN':
if self._should_attempt_reset():
circuit_state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = self._execute_task(context)
self._record_success()
return result
except self.expected_exception as e:
self._record_failure()
raise e
def _get_circuit_state(self):
"""Check circuit state"""
# Query state from metadata database
# Implementation omitted
pass
📊 Monitoring and Alerting Systems
1. Custom Metrics Collection
# Prometheus metrics collection
from airflow.configuration import conf
from airflow.stats import Stats
def collect_custom_metrics(**context):
"""Collect custom metrics"""
task_instance = context['task_instance']
# Execution time metrics
execution_time = (task_instance.end_date - task_instance.start_date).total_seconds()
Stats.gauge('task_execution_time', execution_time, tags={
'dag_id': task_instance.dag_id,
'task_id': task_instance.task_id
})
# Memory usage metrics
import psutil
memory_usage = psutil.Process().memory_info().rss / 1024 / 1024 # MB
Stats.gauge('task_memory_usage', memory_usage, tags={
'dag_id': task_instance.dag_id,
'task_id': task_instance.task_id
})
# Data processing volume metrics
processed_records = context.get('processed_records', 0)
Stats.gauge('processed_records', processed_records, tags={
'dag_id': task_instance.dag_id,
'task_id': task_instance.task_id
})
# Metrics collection task
metrics_task = PythonOperator(
task_id='collect_metrics',
python_callable=collect_custom_metrics,
trigger_rule='all_done', # Execute regardless of success/failure
)
2. Dashboard Construction
# Grafana dashboard queries
def get_dag_performance_metrics():
"""Query DAG performance metrics"""
query = """
SELECT
dag_id,
COUNT(*) as total_runs,
AVG(EXTRACT(EPOCH FROM (end_date - start_date))) as avg_duration,
COUNT(CASE WHEN state = 'success' THEN 1 END) as success_count,
COUNT(CASE WHEN state = 'failed' THEN 1 END) as failed_count
FROM dag_run
WHERE start_date >= NOW() - INTERVAL '7 days'
GROUP BY dag_id
ORDER BY avg_duration DESC
"""
return query
# Airflow UI custom view
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint, render_template
class CustomViewPlugin(AirflowPlugin):
name = "custom_view_plugin"
def __init__(self):
self.appbuilder_views = [
{
"name": "Custom Dashboard",
"category": "Custom",
"view": CustomDashboardView(),
}
]
class CustomDashboardView:
@expose("/custom-dashboard")
def custom_dashboard(self):
"""Custom dashboard"""
# Query metrics data
metrics = get_dag_performance_metrics()
return self.render_template(
"custom_dashboard.html",
metrics=metrics
)
3. Real-time Monitoring
# Real-time alerting system
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class RealTimeMonitorOperator(BaseOperator):
"""Real-time monitoring operator"""
@apply_defaults
def __init__(
self,
monitoring_interval=60,
alert_thresholds=None,
**kwargs
):
super().__init__(**kwargs)
self.monitoring_interval = monitoring_interval
self.alert_thresholds = alert_thresholds or {}
def execute(self, context):
"""Execute real-time monitoring"""
while True:
try:
# Check system status
system_health = self._check_system_health()
# Check thresholds
alerts = self._check_thresholds(system_health)
# Send alerts
for alert in alerts:
self._send_alert(alert)
# Wait
time.sleep(self.monitoring_interval)
except KeyboardInterrupt:
break
except Exception as e:
self.log.error(f"Monitoring error: {e}")
time.sleep(self.monitoring_interval)
def _check_system_health(self):
"""Check system status"""
return {
'cpu_usage': psutil.cpu_percent(),
'memory_usage': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent,
'active_dags': self._count_active_dags(),
'failed_tasks': self._count_failed_tasks(),
}
def _check_thresholds(self, health_data):
"""Check thresholds"""
alerts = []
if health_data['cpu_usage'] > self.alert_thresholds.get('cpu', 80):
alerts.append({
'type': 'cpu_high',
'message': f"High CPU usage: {health_data['cpu_usage']}%"
})
if health_data['memory_usage'] > self.alert_thresholds.get('memory', 85):
alerts.append({
'type': 'memory_high',
'message': f"High memory usage: {health_data['memory_usage']}%"
})
return alerts
🛠️ Hands-on: Building Production-Grade Data Pipeline
1. Environment Setup
# Airflow installation and configuration
pip install apache-airflow[postgres,celery,redis,slack]
pip install apache-airflow-providers-postgres
pip install apache-airflow-providers-slack
pip install apache-airflow-providers-http
# Database configuration
export AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql://airflow:airflow@localhost/airflow"
export AIRFLOW__CORE__EXECUTOR="CeleryExecutor"
export AIRFLOW__CELERY__BROKER_URL="redis://localhost:6379/0"
export AIRFLOW__CELERY__RESULT_BACKEND="redis://localhost:6379/0"
# Airflow initialization
airflow db init
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
2. Production-Grade ETL Pipeline
# dags/advanced_etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.task_group import TaskGroup
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
# Load configuration values
BATCH_SIZE = int(Variable.get("batch_size", default_var=1000))
MAX_RETRIES = int(Variable.get("max_retries", default_var=3))
default_args = {
'owner': 'data-engineering-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': MAX_RETRIES,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
'catchup': False,
'max_active_runs': 1,
}
dag = DAG(
'advanced_etl_pipeline',
default_args=default_args,
description='Advanced ETL pipeline - Extract, Transform, Load',
schedule_interval='0 2 * * *',
max_active_runs=1,
tags=['etl', 'data-engineering', 'production'],
doc_md=__doc__,
)
# Data extraction group
with TaskGroup("data_extraction", tooltip="Data extraction") as extraction_group:
# Check source files
check_source_files = FileSensor(
task_id='check_source_files',
filepath=[
'/data/input/sales_data.csv',
'/data/input/customer_data.csv',
'/data/input/product_data.csv'
],
poke_interval=30,
timeout=300,
mode='reschedule',
)
# Data quality validation
validate_data_quality = PythonOperator(
task_id='validate_data_quality',
python_callable=validate_data_quality,
op_kwargs={
'source_files': [
'/data/input/sales_data.csv',
'/data/input/customer_data.csv',
'/data/input/product_data.csv'
]
},
pool='data_validation_pool',
)
# Data extraction
extract_data = PythonOperator(
task_id='extract_data',
python_callable=extract_data_from_sources,
op_kwargs={'batch_size': BATCH_SIZE},
pool='data_extraction_pool',
)
# Data transformation group
with TaskGroup("data_transformation", tooltip="Data transformation") as transformation_group:
# Data cleaning
clean_data = PythonOperator(
task_id='clean_data',
python_callable=clean_and_validate_data,
op_kwargs={'batch_size': BATCH_SIZE},
pool='data_processing_pool',
)
# Data transformation
transform_data = PythonOperator(
task_id='transform_data',
python_callable=transform_business_logic,
op_kwargs={'batch_size': BATCH_SIZE},
pool='data_processing_pool',
)
# Data aggregation
aggregate_data = PythonOperator(
task_id='aggregate_data',
python_callable=create_aggregated_metrics,
pool='data_processing_pool',
)
# Data loading group
with TaskGroup("data_loading", tooltip="Data loading") as loading_group:
# Load to staging tables
load_to_staging = PythonOperator(
task_id='load_to_staging',
python_callable=load_to_staging_tables,
pool='data_loading_pool',
)
# Load to data warehouse
load_to_warehouse = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_data_warehouse,
pool='data_loading_pool',
)
# Data validation
validate_loaded_data = PythonOperator(
task_id='validate_loaded_data',
python_callable=validate_loaded_data,
trigger_rule='all_success',
)
# Notifications and monitoring
with TaskGroup("monitoring", tooltip="Monitoring") as monitoring_group:
# Success notification
success_notification = SlackWebhookOperator(
task_id='success_notification',
http_conn_id='slack_webhook',
message="""
✅ ETL Pipeline Successfully Completed
Execution Time:
Processed Records: {ti.xcom_pull(task_ids='extract_data')}
""",
trigger_rule='all_success',
)
# Failure notification
failure_notification = SlackWebhookOperator(
task_id='failure_notification',
http_conn_id='slack_webhook',
message="""
❌ ETL Pipeline Failed
Execution Time:
Failed Task:
Error Log:
""",
trigger_rule='one_failed',
)
# Dependency setup
extraction_group >> transformation_group >> loading_group
[loading_group, monitoring_group]
# DAG documentation
dag.doc_md = """
# Advanced ETL Pipeline
This DAG implements a production-grade data pipeline.
## Key Features
- Data quality validation
- Batch processing optimization
- Error handling and retry
- Real-time monitoring
- Alerting system
## Execution Schedule
Daily at 2 AM
## Monitoring
- Slack notifications
- Performance metrics collection
- Error tracking
"""
📚 Learning Summary
What We Learned in This Post
- Deep Understanding of Airflow Architecture
- Core component analysis
- Metadata database schema
- Executor types and features
- DAG Optimization Strategies
- Efficient task definition
- Resource management optimization
- Memory usage optimization
- Advanced Scheduling and Triggers
- Dynamic scheduling
- External trigger utilization
- Schedule optimization
- Error Handling and Retry Strategies
- Advanced retry logic
- Error alert system
- Circuit breaker pattern
- Monitoring and Alerting Systems
- Custom metrics collection
- Dashboard construction
- Real-time monitoring
- Building Production-Grade Data Pipeline
- Complete ETL pipeline
- Data quality validation
- Error handling and alerts
Key Concepts Summary
Concept | Description | Importance |
---|---|---|
DAG Optimization | Improving performance and resource efficiency | ⭐⭐⭐⭐⭐ |
Error Handling | Stable pipeline operation | ⭐⭐⭐⭐⭐ |
Monitoring | Real-time status tracking | ⭐⭐⭐⭐⭐ |
Scheduling | Flexible execution control | ⭐⭐⭐⭐ |
Practical Application Considerations
- Performance Optimization: Pool, Queue, memory management
- Stability: Error handling, retry, Circuit Breaker
- Monitoring: Metrics collection, alerts, dashboard
- Scalability: Distributed environment, resource management
With this guide, you can master the advanced features of Apache Airflow and build stable and efficient data pipelines in production! 🚀