home / skills / astronomer / agents / airflow-hitl
This skill enables human-in-the-loop decision gates in Airflow by managing approvals, forms, and branching through HITL operators for deferrable workflows.
npx playbooks add skill astronomer/agents --skill airflow-hitlReview the files below or copy the command above to add this skill to your agents.
---
name: airflow-hitl
description: Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).
---
# Airflow Human-in-the-Loop Operators
Implement human approval gates, form inputs, and human-driven branching in Airflow DAGs using the HITL operators. These deferrable operators pause workflow execution until a human responds via the Airflow UI or REST API.
## Implementation Checklist
Execute steps in order. Prefer deferrable HITL operators over custom sensors/polling loops.
> **CRITICAL**: Requires Airflow 3.1+. NOT available in Airflow 2.x.
>
> **Deferrable**: All HITL operators are deferrable—they release their worker slot while waiting for human input.
>
> **UI Location**: View pending actions at **Browse → Required Actions** in Airflow UI. Respond via the **task instance page's Required Actions tab** or the REST API.
>
> **Cross-reference**: For AI/LLM calls, see the **airflow-ai** skill.
---
## Step 1: Choose operator
| Operator | Human action | Outcome |
|----------|--------------|---------|
| `ApprovalOperator` | Approve or Reject | Reject causes downstream tasks to be skipped (approval task itself succeeds) |
| `HITLOperator` | Select option(s) + form | Returns selections |
| `HITLBranchOperator` | Select downstream task(s) | Runs selected, skips others |
| `HITLEntryOperator` | Submit form | Returns form data |
---
## Step 2: Implement operator
### ApprovalOperator
```python
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
@task
def prepare():
return "Review quarterly report"
approval = ApprovalOperator(
task_id="approve_report",
subject="Report Approval",
body="{{ ti.xcom_pull(task_ids='prepare') }}",
defaults="Approve", # Optional: auto on timeout
params={"comments": Param("", type="string")},
)
@task
def after_approval(result):
print(f"Decision: {result['chosen_options']}")
chain(prepare(), approval)
after_approval(approval.output)
approval_example()
```
### HITLOperator
> **Required parameters**: `subject` and `options`.
```python
from airflow.providers.standard.operators.hitl import HITLOperator
from airflow.sdk import dag, task, chain, Param
from datetime import timedelta
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def hitl_example():
hitl = HITLOperator(
task_id="select_option",
subject="Select Payment Method",
body="Choose how to process payment",
options=["ACH", "Wire", "Check"], # REQUIRED
defaults=["ACH"],
multiple=False,
execution_timeout=timedelta(hours=4),
params={"amount": Param(1000, type="number")},
)
@task
def process(result):
print(f"Selected: {result['chosen_options']}")
print(f"Amount: {result['params_input']['amount']}")
process(hitl.output)
hitl_example()
```
### HITLBranchOperator
> **IMPORTANT**: Options can either:
> 1. **Directly match downstream task IDs** - simpler approach
> 2. **Use `options_mapping`** - for human-friendly labels that map to task IDs
```python
from airflow.providers.standard.operators.hitl import HITLBranchOperator
from airflow.sdk import dag, task, chain
from pendulum import datetime
DEPTS = ["marketing", "engineering", "sales"]
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def branch_example():
branch = HITLBranchOperator(
task_id="select_dept",
subject="Select Departments",
options=[f"Fund {d}" for d in DEPTS],
options_mapping={f"Fund {d}": d for d in DEPTS},
multiple=True,
)
for dept in DEPTS:
@task(task_id=dept)
def handle(dept_name: str = dept):
# Bind the loop variable at definition time to avoid late-binding bugs
print(f"Processing {dept_name}")
chain(branch, handle())
branch_example()
```
### HITLEntryOperator
```python
from airflow.providers.standard.operators.hitl import HITLEntryOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def entry_example():
entry = HITLEntryOperator(
task_id="get_input",
subject="Enter Details",
body="Provide response",
params={
"response": Param("", type="string"),
"priority": Param("p3", type="string"),
},
)
@task
def process(result):
print(f"Response: {result['params_input']['response']}")
process(entry.output)
entry_example()
```
---
## Step 3: Optional features
### Notifiers
```python
from airflow.sdk import BaseNotifier, Context
from airflow.providers.standard.operators.hitl import HITLOperator
class MyNotifier(BaseNotifier):
template_fields = ("message",)
def __init__(self, message=""): self.message = message
def notify(self, context: Context):
if context["ti"].state == "running":
url = HITLOperator.generate_link_to_ui_from_context(context, base_url="https://airflow.example.com")
self.log.info(f"Action needed: {url}")
hitl = HITLOperator(..., notifiers=[MyNotifier("{{ task.subject }}")])
```
### Restrict respondents
Format depends on your auth manager:
| Auth Manager | Format | Example |
|--------------|--------|--------|
| SimpleAuthManager | Username | `["admin", "manager"]` |
| FabAuthManager | Email | `["[email protected]"]` |
| Astro | Astro ID | `["cl1a2b3cd456789ef1gh2ijkl3"]` |
> **Astro Users**: Find Astro ID at **Organization → Access Management**.
```python
hitl = HITLOperator(..., respondents=["[email protected]"]) # FabAuthManager
```
### Timeout behavior
- **With `defaults`**: Task succeeds, default option(s) selected
- **Without `defaults`**: Task fails on timeout
```python
hitl = HITLOperator(
...,
options=["Option A", "Option B"],
defaults=["Option A"], # Auto-selected on timeout
execution_timeout=timedelta(hours=4),
)
```
### Markdown in body
The `body` parameter supports **markdown formatting** and is **Jinja templatable**:
```python
hitl = HITLOperator(
...,
body="""**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}
| Category | Amount |
|----------|--------|
| Marketing | $1M |
""",
)
```
### Callbacks
All HITL operators support standard Airflow callbacks:
```python
def on_hitl_failure(context):
print(f"HITL task failed: {context['task_instance'].task_id}")
def on_hitl_success(context):
print(f"HITL task succeeded with: {context['task_instance'].xcom_pull()}")
hitl = HITLOperator(
task_id="approval_required",
subject="Review needed",
options=["Approve", "Reject"],
on_failure_callback=on_hitl_failure,
on_success_callback=on_hitl_success,
)
```
---
## Step 4: API integration
For external responders (Slack, custom app):
```python
import requests, os
HOST = os.getenv("AIRFLOW_HOST")
TOKEN = os.getenv("AIRFLOW_API_TOKEN")
# Get pending actions
r = requests.get(f"{HOST}/api/v2/hitlDetails/?state=pending",
headers={"Authorization": f"Bearer {TOKEN}"})
# Respond
requests.patch(
f"{HOST}/api/v2/hitlDetails/{dag_id}/{run_id}/{task_id}",
headers={"Authorization": f"Bearer {TOKEN}"},
json={"chosen_options": ["ACH"], "params_input": {"amount": 1500}}
)
```
---
## Step 5: Safety checks
Before finalizing, verify:
- [ ] Airflow 3.1+ installed
- [ ] For `HITLBranchOperator`: options map to downstream task IDs
- [ ] `defaults` values are in `options` list
- [ ] API token configured if using external responders
---
## Reference
- Airflow HITL Operators: https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/operators/hitl.html
---
## Related Skills
- **airflow-ai**: For AI/LLM task decorators and GenAI patterns
- **authoring-dags**: For general DAG writing best practices
- **testing-dags**: For testing DAGs with debugging cycles
This skill provides human-in-the-loop (HITL) operators for Apache Airflow 3.1+ to implement approval gates, form inputs, and human-driven branching inside DAGs. Use the provided operators to pause execution until a human responds via the Airflow UI or the REST API. Operators are deferrable so they free worker slots while waiting.
The skill exposes four deferrable operators: ApprovalOperator, HITLOperator, HITLBranchOperator, and HITLEntryOperator. Each operator creates a pending action visible in Airflow’s Browse → Required Actions or via the REST API; execution resumes when a human submits a response, and results are returned via the operator output XCom. Operators support templated markdown bodies, defaults/timeouts, notifiers, respondent restrictions, and standard Airflow callbacks.
Which Airflow versions are supported?
This skill requires Airflow 3.1 or later; HITL operators are not available in Airflow 2.x.
What happens on timeout?
If defaults are provided the defaults are selected and the task succeeds; without defaults the task fails on execution timeout.
How do I restrict who can respond?
Pass a respondents list formatted for your auth manager (username for SimpleAuthManager, email for FabAuthManager, Astro ID for Astro).
Can I surface a link to the task UI in notifications?
Yes. Notifiers can call operator helpers like generate_link_to_ui_from_context to include the Required Actions URL in messages.