home / skills / manutej / luxor-claude-marketplace / apache-airflow-orchestration

This skill guides building and deploying robust Apache Airflow data pipelines with DAGs, operators, sensors, dependencies, and production best practices.

npx playbooks add skill manutej/luxor-claude-marketplace --skill apache-airflow-orchestration

Review the files below or copy the command above to add this skill to your agents.

Files (4)
SKILL.md
39.3 KB
---
name: apache-airflow-orchestration
description: Complete guide for Apache Airflow orchestration including DAGs, operators, sensors, XComs, task dependencies, dynamic workflows, and production deployment
tags: [airflow, workflow, orchestration, dags, operators, sensors, data-pipelines, scheduling]
tier: tier-1
---

# Apache Airflow Orchestration

A comprehensive skill for mastering Apache Airflow workflow orchestration. This skill covers DAG development, operators, sensors, task dependencies, dynamic workflows, XCom communication, scheduling patterns, and production deployment strategies.

## When to Use This Skill

Use this skill when:

- Building and managing complex data pipelines with task dependencies
- Orchestrating ETL/ELT workflows across multiple systems
- Scheduling and monitoring batch processing jobs
- Coordinating multi-step data transformations
- Managing workflows with conditional execution and branching
- Implementing event-driven or asset-based workflows
- Deploying production-grade workflow automation
- Creating dynamic workflows that generate tasks programmatically
- Coordinating distributed task execution across clusters
- Building data engineering platforms with workflow orchestration

## Core Concepts

### What is Apache Airflow?

Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows. It allows you to define workflows as Directed Acyclic Graphs (DAGs) using Python code, making complex workflow orchestration maintainable and version-controlled.

**Key Principles:**
- **Dynamic**: Workflows are defined in Python, enabling dynamic generation
- **Extensible**: Rich ecosystem of operators, sensors, and hooks
- **Scalable**: Can scale from single machine to large clusters
- **Observable**: Comprehensive UI for monitoring and troubleshooting

### DAGs (Directed Acyclic Graphs)

A DAG is a collection of tasks organized to reflect their relationships and dependencies.

**DAG Properties:**
- **dag_id**: Unique identifier for the DAG
- **start_date**: When the DAG should start being scheduled
- **schedule**: How often to run (cron, timedelta, or asset-based)
- **catchup**: Whether to run missed intervals on DAG activation
- **tags**: Labels for organization and filtering
- **default_args**: Default parameters for all tasks in the DAG

**DAG Definition Example:**
```python
from datetime import datetime
from airflow.sdk import DAG

with DAG(
    dag_id="example_dag",
    start_date=datetime(2022, 1, 1),
    schedule="0 0 * * *",  # Daily at midnight
    catchup=False,
    tags=["example", "tutorial"],
) as dag:
    # Tasks defined here
    pass
```

### Tasks and Operators

**Tasks** are the basic units of execution in Airflow. **Operators** are templates for creating tasks.

**Common Operator Types:**

1. **BashOperator**: Execute bash commands
2. **PythonOperator**: Execute Python functions
3. **EmailOperator**: Send emails
4. **EmptyOperator**: Placeholder/dummy tasks
5. **Custom Operators**: User-defined operators for specific needs

**Operator vs. Task:**
- Operator: Template/class definition
- Task: Instantiation of an operator with specific parameters

### Task Dependencies

Task dependencies define the execution order and workflow structure.

**Dependency Operators:**
- `>>`: Sets downstream dependency (task1 >> task2)
- `<<`: Sets upstream dependency (task2 << task1)
- `chain()`: Sequential dependencies for multiple tasks
- `cross_downstream()`: Many-to-many relationships

**Dependency Examples:**
```python
# Simple linear flow
task1 >> task2 >> task3

# Fan-out pattern
task1 >> [task2, task3, task4]

# Fan-in pattern
[task1, task2, task3] >> task4

# Complex dependencies
first_task >> [second_task, third_task]
third_task << fourth_task
```

### Executors

Executors determine how and where tasks run.

**Executor Types:**
- **SequentialExecutor**: Single-threaded, local (default, not for production)
- **LocalExecutor**: Multi-threaded, single machine
- **CeleryExecutor**: Distributed execution using Celery
- **KubernetesExecutor**: Each task runs in a separate Kubernetes pod
- **DaskExecutor**: Distributed execution using Dask

### Scheduler

The Airflow scheduler:
- Monitors all DAGs and their tasks
- Triggers task instances based on dependencies and schedules
- Submits tasks to executors for execution
- Handles retries and task state management

**Starting the Scheduler:**
```bash
airflow scheduler
```

## DAG Development Patterns

### Basic DAG Structure

Every DAG follows this structure:

```python
from datetime import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="basic_dag",
    start_date=datetime(2022, 1, 1),
    schedule="0 0 * * *",
    catchup=False,
) as dag:

    task1 = BashOperator(
        task_id="task1",
        bash_command="echo 'Task 1 executed'"
    )

    task2 = BashOperator(
        task_id="task2",
        bash_command="echo 'Task 2 executed'"
    )

    task1 >> task2
```

### Task Dependencies and Chains

**Linear Chain:**
```python
from airflow.sdk import chain

# These are equivalent:
task1 >> task2 >> task3 >> task4
chain(task1, task2, task3, task4)
```

**Dynamic Chain:**
```python
from airflow.sdk import chain
from airflow.operators.empty import EmptyOperator

# Dynamically generate and chain tasks
chain(*[EmptyOperator(task_id=f"task_{i}") for i in range(1, 6)])
```

**Pairwise Chain:**
```python
from airflow.sdk import chain

# Creates paired dependencies:
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)
```

**Cross Downstream:**
```python
from airflow.sdk import cross_downstream

# Both op1 and op2 feed into both op3 and op4
cross_downstream([op1, op2], [op3, op4])
```

### Branching and Conditional Execution

**BranchPythonOperator:**
```python
from airflow.operators.python import BranchPythonOperator

def choose_branch(**context):
    if context['data_interval_start'].day == 1:
        return 'monthly_task'
    return 'daily_task'

branch = BranchPythonOperator(
    task_id='branch_task',
    python_callable=choose_branch
)

daily_task = BashOperator(task_id='daily_task', bash_command='echo daily')
monthly_task = BashOperator(task_id='monthly_task', bash_command='echo monthly')

branch >> [daily_task, monthly_task]
```

**Custom Branch Operator:**
```python
from airflow.operators.branch import BaseBranchOperator

class MyBranchOperator(BaseBranchOperator):
    def choose_branch(self, context):
        """
        Run extra branch on first day of month
        """
        if context['data_interval_start'].day == 1:
            return ['daily_task_id', 'monthly_task_id']
        elif context['data_interval_start'].day == 2:
            return 'daily_task_id'
        else:
            return None  # Skip all downstream tasks
```

### TaskGroups for Organization

TaskGroups help organize related tasks hierarchically:

```python
from airflow.sdk import task_group
from airflow.operators.empty import EmptyOperator

@task_group()
def data_processing_group():
    extract = EmptyOperator(task_id="extract")
    transform = EmptyOperator(task_id="transform")
    load = EmptyOperator(task_id="load")

    extract >> transform >> load

@task_group()
def validation_group():
    validate_schema = EmptyOperator(task_id="validate_schema")
    validate_data = EmptyOperator(task_id="validate_data")

    validate_schema >> validate_data

start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

start >> data_processing_group() >> validation_group() >> end
```

### Edge Labeling

Add labels to dependency edges for clarity:

```python
from airflow.sdk import Label

# Inline labeling
my_task >> Label("When empty") >> other_task

# Method-based labeling
my_task.set_downstream(other_task, Label("When empty"))
```

### LatestOnlyOperator

Skip tasks if not the latest DAG run:

```python
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.operators.empty import EmptyOperator
import pendulum

with DAG(
    dag_id='latest_only_example',
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=True,
    schedule="@daily",
) as dag:
    latest_only = LatestOnlyOperator(task_id='latest_only')
    task1 = EmptyOperator(task_id='task1')
    task2 = EmptyOperator(task_id='task2')
    task3 = EmptyOperator(task_id='task3')
    task4 = EmptyOperator(task_id='task4', trigger_rule='all_done')

    latest_only >> task1 >> task3
    latest_only >> task4
    task2 >> task3
    task2 >> task4
```

## Operators Deep Dive

### BashOperator

Execute bash commands:

```python
from airflow.providers.standard.operators.bash import BashOperator

bash_task = BashOperator(
    task_id="bash_example",
    bash_command="echo 'Hello from Bash'; date",
    env={'MY_VAR': 'value'},  # Environment variables
    append_env=True,  # Append to existing env vars
    output_encoding='utf-8'
)
```

**Complex Bash Command:**
```python
bash_complex = BashOperator(
    task_id="complex_bash",
    bash_command="""
        cd /path/to/dir
        python process_data.py --input {{ ds }} --output {{ tomorrow_ds }}
        if [ $? -eq 0 ]; then
            echo "Success"
        else
            echo "Failed" && exit 1
        fi
    """,
)
```

### PythonOperator

Execute Python functions:

```python
from airflow.providers.standard.operators.python import PythonOperator

def my_python_function(name, **context):
    print(f"Hello {name}!")
    print(f"Execution date: {context['ds']}")
    return "Success"

python_task = PythonOperator(
    task_id="python_example",
    python_callable=my_python_function,
    op_kwargs={'name': 'Airflow'},
    provide_context=True
)
```

**Traditional ETL with PythonOperator:**
```python
import json
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator

def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)

def transform(ti):
    # Pull from XCom
    order_data_dict = ti.xcom_pull(task_ids="extract")
    total_order_value = sum(order_data_dict.values())
    return {"total_order_value": total_order_value}

def load(ti):
    # Pull from XCom
    total = ti.xcom_pull(task_ids="transform")["total_order_value"]
    print(f"Total order value is: {total:.2f}")

with DAG(
    dag_id="legacy_etl_pipeline",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
) as dag:
    extract_task = PythonOperator(task_id="extract", python_callable=extract)
    transform_task = PythonOperator(task_id="transform", python_callable=transform)
    load_task = PythonOperator(task_id="load", python_callable=load)

    extract_task >> transform_task >> load_task
```

### EmailOperator

Send email notifications:

```python
from airflow.providers.smtp.operators.smtp import EmailOperator

email_task = EmailOperator(
    task_id='send_email',
    to='[email protected]',
    subject='Airflow Notification',
    html_content='<h3>Task completed successfully!</h3>',
    cc=['[email protected]'],
    bcc=['[email protected]']
)
```

### EmptyOperator

Placeholder for workflow structure:

```python
from airflow.operators.empty import EmptyOperator

start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end')

# Useful for organizing complex DAGs
start >> [task1, task2, task3] >> end
```

### Custom Operators

Create reusable custom operators:

```python
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class MyCustomOperator(BaseOperator):
    @apply_defaults
    def __init__(self, my_param, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.my_param = my_param

    def execute(self, context):
        self.log.info(f"Executing with param: {self.my_param}")
        # Custom logic here
        return "Result"

# Usage
custom_task = MyCustomOperator(
    task_id="custom",
    my_param="value"
)
```

## Sensors Deep Dive

Sensors are a special type of operator that wait for a certain condition to be met before proceeding.

### ExternalTaskSensor

Wait for tasks in other DAGs:

```python
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
import pendulum

with DAG(
    dag_id="example_external_task_sensor",
    start_date=pendulum.datetime(2021, 10, 20, tz="UTC"),
    catchup=False,
    schedule=None,
) as dag:

    wait_for_task = ExternalTaskSensor(
        task_id="wait_for_task",
        external_dag_id="upstream_dag",
        external_task_id="upstream_task",
        allowed_states=["success"],
        failed_states=["failed"],
        execution_delta=None,  # Same execution_date
        timeout=600,  # 10 minutes
        poke_interval=60,  # Check every 60 seconds
    )
```

**Deferrable ExternalTaskSensor:**
```python
# More efficient - releases worker slot while waiting
wait_for_task_async = ExternalTaskSensor(
    task_id="wait_for_task_async",
    external_dag_id="upstream_dag",
    external_task_id="upstream_task",
    allowed_states=["success"],
    failed_states=["failed"],
    deferrable=True,  # Use async mode
)
```

### FileSensor

Wait for files to appear:

```python
from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id="wait_for_file",
    filepath="/path/to/file.csv",
    poke_interval=30,
    timeout=600,
    mode='poke'  # or 'reschedule' for long waits
)
```

### TimeDeltaSensor

Wait for a specific time period:

```python
from datetime import timedelta
from airflow.sensors.time_delta import TimeDeltaSensor

wait_one_hour = TimeDeltaSensor(
    task_id="wait_one_hour",
    delta=timedelta(hours=1)
)
```

### BigQuery Table Sensor

Wait for BigQuery table to exist:

```python
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
import pendulum

with DAG(
    dag_id="bigquery_sensor_example",
    start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
) as dag:

    wait_for_table = BigQueryTableExistenceSensor(
        task_id="wait_for_table",
        project_id="your-project-id",
        dataset_id="your_dataset",
        table_id="your_table",
        bigquery_conn_id="google_cloud_default",
        location="US",
        poke_interval=60,
        timeout=3600,
    )
```

### Custom Sensors

Create custom sensors for specific conditions:

```python
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

class MyCustomSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, my_condition, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.my_condition = my_condition

    def poke(self, context):
        # Return True when condition is met
        self.log.info(f"Checking condition: {self.my_condition}")
        # Custom logic to check condition
        return check_condition(self.my_condition)
```

### Deferrable Sensors

Deferrable sensors release worker slots while waiting:

```python
from datetime import timedelta
from airflow.sdk import BaseSensorOperator, StartTriggerArgs

class WaitHoursSensor(BaseSensorOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
        trigger_kwargs={"moment": timedelta(hours=1)},
        next_method="execute_complete",
        next_kwargs=None,
        timeout=None,
    )
    start_from_trigger = True

    def __init__(self, *args, trigger_kwargs=None, start_from_trigger=True, **kwargs):
        super().__init__(*args, **kwargs)
        if trigger_kwargs:
            self.start_trigger_args.trigger_kwargs = trigger_kwargs
        self.start_from_trigger = start_from_trigger

    def execute_complete(self, context, event=None):
        return  # Task complete
```

## XComs (Cross-Communication)

XComs enable task-to-task communication by storing and retrieving data.

### Basic XCom Usage

**Pushing to XCom:**
```python
def push_function(**context):
    value = "Important data"
    context['ti'].xcom_push(key='my_key', value=value)
    # Or simply return (uses 'return_value' key)
    return value

push_task = PythonOperator(
    task_id='push',
    python_callable=push_function,
    provide_context=True
)
```

**Pulling from XCom:**
```python
def pull_function(**context):
    # Pull by task_id (uses 'return_value' key)
    value = context['ti'].xcom_pull(task_ids='push')

    # Pull with specific key
    value = context['ti'].xcom_pull(task_ids='push', key='my_key')

    print(f"Pulled value: {value}")

pull_task = PythonOperator(
    task_id='pull',
    python_callable=pull_function,
    provide_context=True
)
```

### XCom with TaskFlow API

TaskFlow API automatically manages XComs:

```python
from airflow.decorators import task

@task
def extract():
    return {"data": [1, 2, 3, 4, 5]}

@task
def transform(data_dict):
    # Automatically receives XCom from extract
    total = sum(data_dict['data'])
    return {"total": total}

@task
def load(summary):
    print(f"Total: {summary['total']}")

# Automatic XCom handling
data = extract()
summary = transform(data)
load(summary)
```

### XCom Best Practices

**Size Limitations:**
- XComs are stored in the metadata database
- Keep XCom data small (< 1MB recommended)
- For large data, store in external systems and pass references

**Example with External Storage:**
```python
@task
def process_large_data():
    # Process data
    large_result = compute_large_dataset()

    # Store in S3/GCS
    file_path = save_to_s3(large_result, "s3://bucket/result.parquet")

    # Return only the path
    return {"result_path": file_path}

@task
def consume_large_data(metadata):
    # Load from S3/GCS
    data = load_from_s3(metadata['result_path'])
    process(data)
```

### XCom with Operators

**Reading XCom in Templates:**
```python
from airflow.providers.standard.operators.bash import BashOperator

process_file = BashOperator(
    task_id="process",
    bash_command="python process.py {{ ti.xcom_pull(task_ids='extract') }}",
)
```

**XCom with EmailOperator:**
```python
from airflow.sdk import task
from airflow.providers.smtp.operators.smtp import EmailOperator

@task
def get_ip():
    return "192.168.1.1"

@task(multiple_outputs=True)
def compose_email(external_ip):
    return {
        'subject': f'Server connected from {external_ip}',
        'body': f'Your server is connected from {external_ip}<br>'
    }

email_info = compose_email(get_ip())

EmailOperator(
    task_id='send_email',
    to='[email protected]',
    subject=email_info['subject'],
    html_content=email_info['body']
)
```

## Dynamic Workflows

Create tasks dynamically based on runtime conditions or external data.

### Dynamic Task Generation with Loops

```python
from airflow.sdk import DAG
from airflow.operators.empty import EmptyOperator

with DAG("dynamic_loop_example", ...) as dag:
    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")

    # Dynamically create tasks
    options = ["branch_a", "branch_b", "branch_c", "branch_d"]
    for option in options:
        task = EmptyOperator(task_id=option)
        start >> task >> end
```

### Dynamic Task Mapping

Map over task outputs to create dynamic parallel tasks:

```python
from airflow.decorators import task

@task
def extract():
    # Returns list of items to process
    return [1, 2, 3, 4, 5]

@task
def transform(item):
    # Processes single item
    return item * 2

@task
def load(items):
    # Receives all transformed items
    print(f"Loaded {len(items)} items: {items}")

# Dynamic mapping
data = extract()
transformed = transform.expand(item=data)  # Creates 5 parallel tasks
load(transformed)
```

**Mapping with Classic Operators:**
```python
from airflow.operators.bash import BashOperator

class ExtractOperator(BaseOperator):
    def execute(self, context):
        return ["file1.csv", "file2.csv", "file3.csv"]

class TransformOperator(BaseOperator):
    def __init__(self, input, **kwargs):
        super().__init__(**kwargs)
        self.input = input

    def execute(self, context):
        # Process single file
        return f"processed_{self.input}"

extract = ExtractOperator(task_id="extract")
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)
```

### Task Group Mapping

Map over entire task groups:

```python
from airflow.decorators import task, task_group

@task
def add_one(value):
    return value + 1

@task
def double(value):
    return value * 2

@task_group
def process_group(value):
    incremented = add_one(value)
    return double(incremented)

@task
def aggregate(results):
    print(f"Results: {results}")

# Map task group over values
results = process_group.expand(value=[1, 2, 3, 4, 5])
aggregate(results)
```

### Partial Parameters with Mapping

Mix static and dynamic parameters:

```python
@task
def process(base_path, filename):
    full_path = f"{base_path}/{filename}"
    return f"Processed {full_path}"

# Static parameter 'base_path', dynamic 'filename'
results = process.partial(base_path="/data").expand(
    filename=["file1.csv", "file2.csv", "file3.csv"]
)
```

## TaskFlow API

The modern way to write Airflow DAGs with automatic XCom handling and cleaner syntax.

### Basic TaskFlow Example

```python
from airflow.decorators import dag, task
import pendulum

@dag(
    dag_id="taskflow_example",
    start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
    schedule=None,
    catchup=False,
)
def my_taskflow_dag():

    @task
    def extract():
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        import json
        return json.loads(data_string)

    @task
    def transform(order_data_dict):
        total = sum(order_data_dict.values())
        return {"total_order_value": total}

    @task
    def load(summary):
        print(f"Total order value: {summary['total_order_value']:.2f}")

    # Function calls create task dependencies automatically
    order_data = extract()
    summary = transform(order_data)
    load(summary)

# Instantiate the DAG
my_taskflow_dag()
```

### Multiple Outputs

Return multiple values from tasks:

```python
@task(multiple_outputs=True)
def extract_data():
    return {
        'orders': [1, 2, 3],
        'customers': ['A', 'B', 'C'],
        'revenue': 1000.50
    }

@task
def process_orders(orders):
    print(f"Processing {len(orders)} orders")

@task
def process_customers(customers):
    print(f"Processing {len(customers)} customers")

# Access individual outputs
data = extract_data()
process_orders(data['orders'])
process_customers(data['customers'])
```

### Mixing TaskFlow with Traditional Operators

```python
from airflow.decorators import dag, task
from airflow.providers.standard.operators.bash import BashOperator
import pendulum

@dag(
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    schedule=None,
)
def mixed_dag():

    @task
    def get_date():
        from datetime import datetime
        return datetime.now().strftime("%Y%m%d")

    # Traditional operator
    bash_task = BashOperator(
        task_id="print_date",
        bash_command="echo Processing data for {{ ti.xcom_pull(task_ids='get_date') }}"
    )

    @task
    def process_results():
        print("Processing complete")

    # Mix task types
    date = get_date()
    date >> bash_task >> process_results()

mixed_dag()
```

### Virtual Environment for Tasks

Isolate task dependencies:

```python
from airflow.decorators import dag, task
import pendulum

@dag(
    dag_id="virtualenv_example",
    start_date=pendulum.datetime(2023, 10, 26, tz="UTC"),
)
def virtualenv_dag():

    @task.virtualenv(
        requirements=["pandas==1.5.0", "numpy==1.23.0"],
        system_site_packages=False
    )
    def analyze_data():
        import pandas as pd
        import numpy as np

        df = pd.DataFrame({'col1': [1, 2, 3], 'col2': [4, 5, 6]})
        result = np.mean(df['col1'])
        return float(result)

    @task
    def report_results(mean_value):
        print(f"Mean value: {mean_value}")

    result = analyze_data()
    report_results(result)

virtualenv_dag()
```

## Asset-Based Scheduling

Schedule DAGs based on data assets (formerly datasets) rather than time.

### Producer-Consumer Pattern

```python
from airflow.sdk import DAG, Asset
from airflow.operators.bash import BashOperator
from datetime import datetime

# Define asset
customer_data = Asset("s3://my-bucket/customers.parquet")

# Producer DAG
with DAG(
    dag_id="producer_dag",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
) as producer:

    BashOperator(
        task_id="generate_data",
        bash_command="python generate_customers.py",
        outlets=[customer_data]  # Marks asset as updated
    )

# Consumer DAG - triggered when asset updates
with DAG(
    dag_id="consumer_dag",
    schedule=[customer_data],  # Triggered by asset
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as consumer:

    BashOperator(
        task_id="process_data",
        bash_command="python process_customers.py"
    )
```

### Multiple Asset Dependencies

**AND Logic (all assets must update):**
```python
from airflow.datasets import Dataset

asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")

with DAG(
    dag_id="wait_for_both",
    schedule=[asset_1 & asset_2],  # Both must update
    start_date=datetime(2023, 1, 1),
):
    pass
```

**OR Logic (any asset update triggers):**
```python
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")

with DAG(
    dag_id="triggered_by_either",
    schedule=[asset_1 | asset_2],  # Either can trigger
    start_date=datetime(2023, 1, 1),
):
    pass
```

**Complex Logic:**
```python
asset_1 = Dataset("s3://bucket/file1.csv")
asset_2 = Dataset("s3://bucket/file2.csv")
asset_3 = Dataset("s3://bucket/file3.csv")

with DAG(
    dag_id="complex_condition",
    schedule=(asset_1 | (asset_2 & asset_3)),  # asset_1 OR (asset_2 AND asset_3)
    start_date=datetime(2023, 1, 1),
):
    pass
```

### Asset Aliases

Use aliases for flexible asset references:

```python
from airflow.datasets import Dataset, AssetAlias
from airflow.decorators import task

# Producer with alias
with DAG(dag_id="alias_producer", start_date=datetime(2023, 1, 1)):
    @task(outlets=[AssetAlias("my-alias")])
    def produce_data(*, outlet_events):
        # Dynamically add actual asset
        outlet_events[AssetAlias("my-alias")].add(
            Dataset("s3://bucket/my-file.csv")
        )

# Consumer depending on alias
with DAG(
    dag_id="alias_consumer",
    schedule=AssetAlias("my-alias"),
    start_date=datetime(2023, 1, 1),
):
    pass
```

### Accessing Asset Event Information

```python
@task
def process_asset_data(*, triggering_asset_events):
    for event in triggering_asset_events:
        print(f"Asset: {event.asset.uri}")
        print(f"Timestamp: {event.timestamp}")
        print(f"Extra: {event.extra}")
```

## Scheduling Patterns

### Cron Expressions

```python
# Every day at midnight
schedule="0 0 * * *"

# Every Monday at 9 AM
schedule="0 9 * * 1"

# Every 15 minutes
schedule="*/15 * * * *"

# First day of month at noon
schedule="0 12 1 * *"

# Weekdays at 6 PM
schedule="0 18 * * 1-5"
```

### Timedelta Scheduling

```python
from datetime import timedelta

with DAG(
    dag_id="timedelta_schedule",
    start_date=datetime(2023, 1, 1),
    schedule=timedelta(hours=6),  # Every 6 hours
):
    pass
```

### Preset Schedules

```python
# Common presets
schedule="@once"      # Run once
schedule="@hourly"    # Every hour
schedule="@daily"     # Daily at midnight
schedule="@weekly"    # Every Sunday at midnight
schedule="@monthly"   # First day of month at midnight
schedule="@yearly"    # January 1st at midnight
schedule=None         # Manual trigger only
```

### Catchup and Backfilling

**Catchup:**
```python
with DAG(
    dag_id="catchup_example",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    catchup=True,  # Run all missed intervals
):
    pass

with DAG(
    dag_id="no_catchup",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    catchup=False,  # Only run latest interval
):
    pass
```

**Manual Backfilling:**
```bash
# Backfill specific date range
airflow dags backfill \
    --start-date 2023-01-01 \
    --end-date 2023-01-31 \
    my_dag_id

# Backfill with marking success (no execution)
airflow dags backfill \
    --start-date 2023-01-01 \
    --end-date 2023-01-31 \
    --mark-success \
    my_dag_id
```

## Production Patterns

### Error Handling and Retries

**Task-Level Retries:**
```python
from airflow.operators.bash import BashOperator
from datetime import timedelta

task_with_retry = BashOperator(
    task_id="retry_task",
    bash_command="python might_fail.py",
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(minutes=30),
)
```

**DAG-Level Default Args:**
```python
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id="production_dag",
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
):
    pass
```

### Task Concurrency Control

**Per-Task Concurrency:**
```python
from airflow.operators.bash import BashOperator
from datetime import timedelta

# Limit concurrent instances of this task
limited_task = BashOperator(
    task_id="limited_task",
    bash_command="echo 'Processing'",
    max_active_tis_per_dag=3  # Max 3 instances running
)
```

**DAG-Level Concurrency:**
```python
with DAG(
    dag_id="concurrent_dag",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    max_active_runs=5,  # Max 5 DAG runs simultaneously
    concurrency=10,     # Max 10 task instances across all runs
):
    pass
```

### Idempotency

Make tasks idempotent for safe retries:

```python
@task
def idempotent_load(**context):
    execution_date = context['ds']

    # Delete existing data for this date first
    delete_query = f"""
        DELETE FROM target_table
        WHERE date = '{execution_date}'
    """
    execute_sql(delete_query)

    # Insert new data
    insert_query = f"""
        INSERT INTO target_table
        SELECT * FROM source
        WHERE date = '{execution_date}'
    """
    execute_sql(insert_query)
```

### SLAs and Alerts

```python
from datetime import timedelta

def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    print(f"SLA missed for {task_list}")
    # Send alert to monitoring system

with DAG(
    dag_id="sla_dag",
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
    default_args={
        'sla': timedelta(hours=2),  # Task should complete in 2 hours
    },
    sla_miss_callback=sla_miss_callback,
):
    pass
```

### Task Callbacks

```python
def on_failure_callback(context):
    print(f"Task {context['task_instance'].task_id} failed")
    # Send to Slack, PagerDuty, etc.

def on_success_callback(context):
    print(f"Task {context['task_instance'].task_id} succeeded")

def on_retry_callback(context):
    print(f"Task {context['task_instance'].task_id} retrying")

task_with_callbacks = BashOperator(
    task_id="monitored_task",
    bash_command="python my_script.py",
    on_failure_callback=on_failure_callback,
    on_success_callback=on_success_callback,
    on_retry_callback=on_retry_callback,
)
```

### Docker Deployment

**Docker Compose for Local Development:**
```yaml
version: '3'
services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

  webserver:
    image: apache/airflow:2.7.0
    depends_on:
      - postgres
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    ports:
      - "8080:8080"
    command: webserver

  scheduler:
    image: apache/airflow:2.7.0
    depends_on:
      - postgres
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    command: scheduler
```

### Kubernetes Executor

**KubernetesExecutor Configuration:**
```python
# In airflow.cfg
[kubernetes]
namespace = airflow
worker_container_repository = my-registry/airflow
worker_container_tag = 2.7.0
delete_worker_pods = True
delete_worker_pods_on_failure = False

[core]
executor = KubernetesExecutor
```

**Pod Override for Specific Task:**
```python
from kubernetes.client import models as k8s

task_with_gpu = BashOperator(
    task_id="gpu_task",
    bash_command="python train_model.py",
    executor_config={
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name="base",
                        resources=k8s.V1ResourceRequirements(
                            limits={"nvidia.com/gpu": "1"}
                        )
                    )
                ]
            )
        )
    }
)
```

### Monitoring and Logging

**Structured Logging:**
```python
from airflow.decorators import task
import logging

@task
def monitored_task():
    logger = logging.getLogger(__name__)

    logger.info("Starting data processing", extra={
        'process_id': 'abc123',
        'record_count': 1000
    })

    try:
        process_data()
        logger.info("Processing complete")
    except Exception as e:
        logger.error(f"Processing failed: {str(e)}", extra={
            'error_type': type(e).__name__
        })
        raise
```

**StatsD Metrics:**
```python
from airflow.stats import Stats

@task
def task_with_metrics():
    Stats.incr('my_dag.task_started')

    start_time = time.time()
    process_data()
    duration = time.time() - start_time

    Stats.timing('my_dag.task_duration', duration)
    Stats.incr('my_dag.task_completed')
```

## Best Practices

### DAG Design

1. **Keep DAGs Simple**: Break complex workflows into multiple DAGs
2. **Use Descriptive Names**: dag_id and task_id should be self-explanatory
3. **Idempotent Tasks**: Tasks should produce same result when re-run
4. **Small XComs**: Keep XCom data under 1MB
5. **External Storage**: Use S3/GCS for large data, pass references
6. **Proper Dependencies**: Model true dependencies, avoid unnecessary ones
7. **Error Handling**: Use retries, callbacks, and proper error logging
8. **Resource Management**: Set appropriate task concurrency limits

### Code Organization

```
dags/
├── common/
│   ├── __init__.py
│   ├── operators.py      # Custom operators
│   ├── sensors.py        # Custom sensors
│   └── utils.py          # Utility functions
├── etl/
│   ├── customer_pipeline.py
│   ├── order_pipeline.py
│   └── product_pipeline.py
├── ml/
│   ├── training_dag.py
│   └── inference_dag.py
└── maintenance/
    ├── cleanup_dag.py
    └── backup_dag.py
```

### Testing DAGs

**Unit Testing:**
```python
import pytest
from airflow.models import DagBag

def test_dag_loaded():
    dagbag = DagBag(dag_folder='dags/', include_examples=False)
    assert len(dagbag.import_errors) == 0

def test_task_count():
    dagbag = DagBag(dag_folder='dags/')
    dag = dagbag.get_dag('my_dag')
    assert len(dag.tasks) == 5

def test_task_dependencies():
    dagbag = DagBag(dag_folder='dags/')
    dag = dagbag.get_dag('my_dag')

    extract = dag.get_task('extract')
    transform = dag.get_task('transform')

    assert transform in extract.downstream_list
```

**Integration Testing:**
```python
from airflow.models import DagBag
from airflow.utils.state import State

def test_dag_runs():
    dagbag = DagBag(dag_folder='dags/')
    dag = dagbag.get_dag('my_dag')

    # Test DAG run
    dag_run = dag.create_dagrun(
        state=State.RUNNING,
        execution_date=datetime(2023, 1, 1),
        run_type='manual'
    )

    # Run specific task
    task_instance = dag_run.get_task_instance('extract')
    task_instance.run()

    assert task_instance.state == State.SUCCESS
```

### Performance Optimization

1. **Use Deferrable Operators**: For sensors and long-running waits
2. **Dynamic Task Mapping**: For parallel processing
3. **Appropriate Executor**: Choose based on scale (Local, Celery, Kubernetes)
4. **Connection Pooling**: Reuse database connections
5. **Task Parallelism**: Set max_active_runs and concurrency appropriately
6. **Lazy Loading**: Don't execute heavy logic at DAG parse time
7. **External Storage**: Keep metadata database light

### Security

1. **Secrets Management**: Use Airflow Secrets Backend (not hardcoded)
2. **Connection Encryption**: Use encrypted connections for databases
3. **RBAC**: Enable role-based access control
4. **Audit Logging**: Enable audit logs for compliance
5. **Network Isolation**: Restrict worker network access
6. **Credential Rotation**: Regularly rotate credentials

### Configuration Management

```python
# Use Variables for configuration
from airflow.models import Variable

config = Variable.get("my_config", deserialize_json=True)
api_key = Variable.get("api_key")

# Use Connections for external services
from airflow.hooks.base import BaseHook

conn = BaseHook.get_connection('my_postgres')
db_url = f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"
```

## Common Patterns and Examples

See EXAMPLES.md for 18+ detailed real-world examples including:
- ETL pipelines
- Machine learning workflows
- Data quality checks
- Multi-cloud orchestration
- Event-driven architectures
- Complex branching logic
- Dynamic task generation
- Asset-based scheduling
- Sensor patterns
- Error handling strategies

## Troubleshooting

### DAG Not Appearing in UI

1. Check for Python syntax errors in DAG file
2. Verify DAG file is in correct directory
3. Check dag_id is unique
4. Ensure schedule is not None if you expect it to run
5. Check scheduler logs for import errors

### Tasks Not Running

1. Check task dependencies are correct
2. Verify upstream tasks succeeded
3. Check task concurrency limits
4. Ensure executor has available slots
5. Review task logs for errors

### Performance Issues

1. Reduce DAG complexity (break into multiple DAGs)
2. Optimize SQL queries in tasks
3. Use appropriate executor for scale
4. Enable task parallelism
5. Check for slow sensors (use deferrable mode)
6. Monitor metadata database performance

### Common Errors

**Import Errors:**
```python
# Bad - imports at DAG level slow parsing
from heavy_library import process

with DAG(...):
    pass

# Good - imports inside tasks
with DAG(...):
    @task
    def my_task():
        from heavy_library import process
        process()
```

**Circular Dependencies:**
```python
# This will fail
task1 >> task2 >> task3 >> task1  # Circular!

# Must be acyclic
task1 >> task2 >> task3
```

**Large XComs:**
```python
# Bad - storing large data in XCom
@task
def process():
    large_df = pd.read_csv('big_file.csv')
    return large_df  # Too large!

# Good - store reference
@task
def process():
    large_df = pd.read_csv('big_file.csv')
    path = save_to_s3(large_df)
    return path  # Just the path
```

## Resources

- **Official Documentation**: https://airflow.apache.org/docs/
- **Airflow GitHub**: https://github.com/apache/airflow
- **Astronomer Guides**: https://docs.astronomer.io/learn
- **Community Slack**: https://apache-airflow.slack.com
- **Stack Overflow**: Tag `apache-airflow`
- **Awesome Airflow**: https://github.com/jghoman/awesome-apache-airflow

---

**Skill Version**: 1.0.0
**Last Updated**: January 2025
**Apache Airflow Version**: 2.7+
**Skill Category**: Data Engineering, Workflow Orchestration, Pipeline Management

Overview

This skill is a complete guide to Apache Airflow orchestration that covers DAG design, operators, sensors, XComs, task dependencies, dynamic workflows, and production deployment. It focuses on practical patterns and production-ready choices so you can build, test, and operate reliable data pipelines. The content is hands-on and organized around real orchestration needs.

How this skill works

The skill inspects and explains core Airflow concepts: DAG definitions, operator types (Bash, Python, Email, Empty, custom), sensors, executors, and the scheduler. It describes dependency patterns (chains, fan-in/fan-out, branching), XCom usage for task communication, and deployment patterns for executors like Celery and Kubernetes. Examples and snippets illustrate common workflows, task grouping, edge labeling, and sensor strategies for event-driven pipelines.

When to use it

  • Building and scheduling ETL/ELT pipelines with complex dependencies
  • Coordinating multi-step data transformations across services
  • Implementing event-driven or asset-based workflows that wait on external signals
  • Creating dynamic DAGs that generate tasks programmatically
  • Scaling execution across clusters using Celery or Kubernetes executors

Best practices

  • Define DAGs programmatically in Python and keep logic testable and modular
  • Prefer deferrable sensors or reschedule mode for long waits to free worker slots
  • Use TaskGroups and clear task_ids for readability and easier troubleshooting
  • Keep side-effectful code out of DAG parse time; perform work inside operators
  • Propagate small, structured XCom payloads and avoid large binary transfers

Example use cases

  • Daily ETL pipeline: extract -> transform -> load with XCom-based handoff
  • Event-driven load: FileSensor waits for file, then triggers processing tasks
  • Cross-DAG dependency: ExternalTaskSensor coordinating upstream DAG runs
  • Dynamic task generation: create thousands of per-file tasks with chain()
  • Production deployment: KubernetesExecutor for isolated, scalable task pods

FAQ

When should I use CeleryExecutor vs KubernetesExecutor?

Use CeleryExecutor for mature distributed setups on VMs with a worker fleet; use KubernetesExecutor when you want per-task isolation, dynamic scaling, and container-native deployments.

How do I prevent long-running sensors from tying up workers?

Use deferrable sensors or set mode='reschedule' so the sensor releases the worker slot while waiting, and configure reasonable poke_interval and timeout values.