home / skills / astronomer / agents / testing-dags

testing-dags skill

/skills/testing-dags

This skill helps you test, debug, and fix Airflow DAGs iteratively, guiding you through trigger, diagnosis, and retest cycles.

npx playbooks add skill astronomer/agents --skill testing-dags

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
9.4 KB
---
name: testing-dags
description: Complex DAG testing workflows with debugging and fixing cycles. Use for multi-step testing requests like "test this dag and fix it if it fails", "test and debug", "run the pipeline and troubleshoot issues". For simple test requests ("test dag", "run dag"), the airflow entrypoint skill handles it directly. This skill is for iterative test-debug-fix cycles.
---

# DAG Testing Skill

Use `af` commands to test, debug, and fix DAGs in iterative cycles.

## Running the CLI

Run all `af` commands using uvx (no installation required):

```bash
uvx --from astro-airflow-mcp af <command>
```

Throughout this document, `af` is shorthand for `uvx --from astro-airflow-mcp af`.

---

## FIRST ACTION: Just Trigger the DAG

When the user asks to test a DAG, your **FIRST AND ONLY action** should be:

```bash
af runs trigger-wait <dag_id>
```

**DO NOT:**
- Call `af dags list` first
- Call `af dags get` first
- Call `af dags errors` first
- Use `grep` or `ls` or any other bash command
- Do any "pre-flight checks"

**Just trigger the DAG.** If it fails, THEN debug.

---

## Testing Workflow Overview

```
┌─────────────────────────────────────┐
│ 1. TRIGGER AND WAIT                 │
│    Run DAG, wait for completion     │
└─────────────────────────────────────┘
                 ↓
        ┌───────┴───────┐
        ↓               ↓
   ┌─────────┐    ┌──────────┐
   │ SUCCESS │    │ FAILED   │
   │ Done!   │    │ Debug... │
   └─────────┘    └──────────┘
                       ↓
        ┌─────────────────────────────────────┐
        │ 2. DEBUG (only if failed)           │
        │    Get logs, identify root cause    │
        └─────────────────────────────────────┘
                       ↓
        ┌─────────────────────────────────────┐
        │ 3. FIX AND RETEST                   │
        │    Apply fix, restart from step 1   │
        └─────────────────────────────────────┘
```

**Philosophy: Try first, debug on failure.** Don't waste time on pre-flight checks — just run the DAG and diagnose if something goes wrong.

---

## Phase 1: Trigger and Wait

Use `af runs trigger-wait` to test the DAG:

### Primary Method: Trigger and Wait

```bash
af runs trigger-wait <dag_id> --timeout 300
```

**Example:**

```bash
af runs trigger-wait my_dag --timeout 300
```

**Why this is the preferred method:**
- Single command handles trigger + monitoring
- Returns immediately when DAG completes (success or failure)
- Includes failed task details if run fails
- No manual polling required

### Response Interpretation

**Success:**
```json
{
  "dag_run": {
    "dag_id": "my_dag",
    "dag_run_id": "manual__2025-01-14T...",
    "state": "success",
    "start_date": "...",
    "end_date": "..."
  },
  "timed_out": false,
  "elapsed_seconds": 45.2
}
```

**Failure:**
```json
{
  "dag_run": {
    "state": "failed"
  },
  "timed_out": false,
  "elapsed_seconds": 30.1,
  "failed_tasks": [
    {
      "task_id": "extract_data",
      "state": "failed",
      "try_number": 2
    }
  ]
}
```

**Timeout:**
```json
{
  "dag_id": "my_dag",
  "dag_run_id": "manual__...",
  "state": "running",
  "timed_out": true,
  "elapsed_seconds": 300.0,
  "message": "Timed out after 300 seconds. DAG run is still running."
}
```

### Alternative: Trigger and Monitor Separately

Use this only when you need more control:

```bash
# Step 1: Trigger
af runs trigger my_dag
# Returns: {"dag_run_id": "manual__...", "state": "queued"}

# Step 2: Check status
af runs get my_dag manual__2025-01-14T...
# Returns current state
```

---

## Handling Results

### If Success

The DAG ran successfully. Summarize for the user:
- Total elapsed time
- Number of tasks completed
- Any notable outputs (if visible in logs)

**You're done!**

### If Timed Out

The DAG is still running. Options:
1. Check current status: `af runs get <dag_id> <dag_run_id>`
2. Ask user if they want to continue waiting
3. Increase timeout and try again

### If Failed

Move to Phase 2 (Debug) to identify the root cause.

---

## Phase 2: Debug Failures (Only If Needed)

When a DAG run fails, use these commands to diagnose:

### Get Comprehensive Diagnosis

```bash
af runs diagnose <dag_id> <dag_run_id>
```

Returns in one call:
- Run metadata (state, timing)
- All task instances with states
- Summary of failed tasks
- State counts (success, failed, skipped, etc.)

### Get Task Logs

```bash
af tasks logs <dag_id> <dag_run_id> <task_id>
```

**Example:**

```bash
af tasks logs my_dag manual__2025-01-14T... extract_data
```

**For specific retry attempt:**

```bash
af tasks logs my_dag manual__2025-01-14T... extract_data --try 2
```

**Look for:**
- Exception messages and stack traces
- Connection errors (database, API, S3)
- Permission errors
- Timeout errors
- Missing dependencies

### Check Upstream Tasks

If a task shows `upstream_failed`, the root cause is in an upstream task. Use `af runs diagnose` to find which task actually failed.

### Check Import Errors (If DAG Didn't Run)

If the trigger failed because the DAG doesn't exist:

```bash
af dags errors
```

This reveals syntax errors or missing dependencies that prevented the DAG from loading.

---

## Phase 3: Fix and Retest

Once you identify the issue:

### Common Fixes

| Issue | Fix |
|-------|-----|
| Missing import | Add to DAG file |
| Missing package | Add to `requirements.txt` |
| Connection error | Check `af config connections`, verify credentials |
| Variable missing | Check `af config variables`, create if needed |
| Timeout | Increase task timeout or optimize query |
| Permission error | Check credentials in connection |

### After Fixing

1. Save the file
2. **Retest:** `af runs trigger-wait <dag_id>`

**Repeat the test → debug → fix loop until the DAG succeeds.**

---

## CLI Quick Reference

| Phase | Command | Purpose |
|-------|---------|---------|
| Test | `af runs trigger-wait <dag_id>` | **Primary test method — start here** |
| Test | `af runs trigger <dag_id>` | Start run (alternative) |
| Test | `af runs get <dag_id> <run_id>` | Check run status |
| Debug | `af runs diagnose <dag_id> <run_id>` | Comprehensive failure diagnosis |
| Debug | `af tasks logs <dag_id> <run_id> <task_id>` | Get task output/errors |
| Debug | `af dags errors` | Check for parse errors (if DAG won't load) |
| Debug | `af dags get <dag_id>` | Verify DAG config |
| Debug | `af dags explore <dag_id>` | Full DAG inspection |
| Config | `af config connections` | List connections |
| Config | `af config variables` | List variables |

---

## Testing Scenarios

### Scenario 1: Test a DAG (Happy Path)

```bash
af runs trigger-wait my_dag
# Success! Done.
```

### Scenario 2: Test a DAG (With Failure)

```bash
# 1. Run and wait
af runs trigger-wait my_dag
# Failed...

# 2. Find failed tasks
af runs diagnose my_dag manual__2025-01-14T...

# 3. Get error details
af tasks logs my_dag manual__2025-01-14T... extract_data

# 4. [Fix the issue in DAG code]

# 5. Retest
af runs trigger-wait my_dag
```

### Scenario 3: DAG Doesn't Exist / Won't Load

```bash
# 1. Trigger fails - DAG not found
af runs trigger-wait my_dag
# Error: DAG not found

# 2. Find parse error
af dags errors

# 3. [Fix the issue in DAG code]

# 4. Retest
af runs trigger-wait my_dag
```

### Scenario 4: Debug a Failed Scheduled Run

```bash
# 1. Get failure summary
af runs diagnose my_dag scheduled__2025-01-14T...

# 2. Get error from failed task
af tasks logs my_dag scheduled__2025-01-14T... failed_task_id

# 3. [Fix the issue]

# 4. Retest
af runs trigger-wait my_dag
```

### Scenario 5: Test with Custom Configuration

```bash
af runs trigger-wait my_dag --conf '{"env": "staging", "batch_size": 100}' --timeout 600
```

### Scenario 6: Long-Running DAG

```bash
# Wait up to 1 hour
af runs trigger-wait my_dag --timeout 3600

# If timed out, check current state
af runs get my_dag manual__2025-01-14T...
```

---

## Debugging Tips

### Common Error Patterns

**Connection Refused / Timeout:**
- Check `af config connections` for correct host/port
- Verify network connectivity to external system
- Check if connection credentials are correct

**ModuleNotFoundError:**
- Package missing from `requirements.txt`
- After adding, may need environment restart

**PermissionError:**
- Check IAM roles, database grants, API keys
- Verify connection has correct credentials

**Task Timeout:**
- Query or operation taking too long
- Consider adding timeout parameter to task
- Optimize underlying query/operation

### Reading Task Logs

Task logs typically show:
1. Task start timestamp
2. Any print/log statements from task code
3. Return value (for @task decorated functions)
4. Exception + full stack trace (if failed)
5. Task end timestamp and duration

**Focus on the exception at the bottom of failed task logs.**

---

## Related Skills

- **authoring-dags**: For creating new DAGs (includes validation before testing)
- **debugging-dags**: For general Airflow troubleshooting

Overview

This skill runs iterative test–debug–fix cycles for complex Airflow DAGs. It is designed for multi-step requests like “test this DAG and fix it if it fails,” where a single-run tool is insufficient. The workflow prioritizes triggering the run first, then diagnosing and applying fixes only when failures occur.

How this skill works

Start by triggering the DAG run with a single command that both launches and waits for completion. If the run succeeds, report elapsed time and completed tasks. If it fails or times out, gather diagnostics and task logs to identify the root cause, apply a code or configuration fix, and then re-run the DAG. Repeat until the DAG succeeds.

When to use it

  • You need an iterative test-debug-fix loop for a DAG (not a simple one-off run).
  • The DAG is complex and may fail for code, dependency, or infra reasons that require inspection and fixes.
  • You expect multiple cycles of testing and repairs rather than a single status check.
  • You want scripted, reproducible debugging commands for failed runs.

Best practices

  • Always start with af runs trigger-wait <dag_id> as the first and only initial action.
  • Only run diagnostics if the trigger-wait indicates failure or timeout.
  • Use af runs diagnose to get a comprehensive failure summary before inspecting individual logs.
  • Inspect task logs with af tasks logs <dag_id> <run_id> <task_id> and check the bottom of the trace for the exception.
  • After fixes, retest using the same trigger-wait command and iterate until success.

Example use cases

  • "Test this DAG and fix it if it fails" — run trigger-wait, diagnose failures, patch code/config, retest.
  • Debug intermittent upstream failures: trigger, find upstream_failed, inspect the real failing upstream task with diagnose and logs.
  • Resolve parse/import issues: trigger-wait shows DAG missing, run af dags errors, fix imports, retest.
  • Long-running pipeline troubleshooting: run trigger-wait with increased --timeout, or diagnose after a timeout then continue or extend timeout.

FAQ

What is the very first command I should run?

Always run af runs trigger-wait <dag_id> as your first and only initial action. Only debug if that run fails or times out.

How do I get full context when a run fails?

Use af runs diagnose <dag_id> <run_id> for a full summary, then af tasks logs <dag_id> <run_id> <task_id> for task-level stack traces and errors.