home / skills / astronomer / agents / tracing-upstream-lineage

tracing-upstream-lineage skill

/skills/tracing-upstream-lineage

This skill traces upstream data lineage by identifying sources, DAGs, and transformations to reveal data origins and feeding paths.

npx playbooks add skill astronomer/agents --skill tracing-upstream-lineage

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
4.1 KB
---
name: tracing-upstream-lineage
description: Trace upstream data lineage. Use when the user asks where data comes from, what feeds a table, upstream dependencies, data sources, or needs to understand data origins.
---

# Upstream Lineage: Sources

Trace the origins of data - answer "Where does this data come from?"

## Lineage Investigation

### Step 1: Identify the Target Type

Determine what we're tracing:
- **Table**: Trace what populates this table
- **Column**: Trace where this specific column comes from
- **DAG**: Trace what data sources this DAG reads from

### Step 2: Find the Producing DAG

Tables are typically populated by Airflow DAGs. Find the connection:

1. **Search DAGs by name**: Use `af dags list` and look for DAG names matching the table name
   - `load_customers` -> `customers` table
   - `etl_daily_orders` -> `orders` table

2. **Explore DAG source code**: Use `af dags source <dag_id>` to read the DAG definition
   - Look for INSERT, MERGE, CREATE TABLE statements
   - Find the target table in the code

3. **Check DAG tasks**: Use `af tasks list <dag_id>` to see what operations the DAG performs

### Step 3: Trace Data Sources

From the DAG code, identify source tables and systems:

**SQL Sources** (look for FROM clauses):
```python
# In DAG code:
SELECT * FROM source_schema.source_table  # <- This is an upstream source
```

**External Sources** (look for connection references):
- `S3Operator` -> S3 bucket source
- `PostgresOperator` -> Postgres database source
- `SalesforceOperator` -> Salesforce API source
- `HttpOperator` -> REST API source

**File Sources**:
- CSV/Parquet files in object storage
- SFTP drops
- Local file paths

### Step 4: Build the Lineage Chain

Recursively trace each source:

```
TARGET: analytics.orders_daily
    ^
    +-- DAG: etl_daily_orders
            ^
            +-- SOURCE: raw.orders (table)
            |       ^
            |       +-- DAG: ingest_orders
            |               ^
            |               +-- SOURCE: Salesforce API (external)
            |
            +-- SOURCE: dim.customers (table)
                    ^
                    +-- DAG: load_customers
                            ^
                            +-- SOURCE: PostgreSQL (external DB)
```

### Step 5: Check Source Health

For each upstream source:
- **Tables**: Check freshness with the **checking-freshness** skill
- **DAGs**: Check recent run status with `af dags stats`
- **External systems**: Note connection info from DAG code

## Lineage for Columns

When tracing a specific column:

1. Find the column in the target table schema
2. Search DAG source code for references to that column name
3. Trace through transformations:
   - Direct mappings: `source.col AS target_col`
   - Transformations: `COALESCE(a.col, b.col) AS target_col`
   - Aggregations: `SUM(detail.amount) AS total_amount`

## Output: Lineage Report

### Summary
One-line answer: "This table is populated by DAG X from sources Y and Z"

### Lineage Diagram
```
[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
                        |                        |
                   DAG: ingest_sfdc         DAG: transform_sales
```

### Source Details

| Source | Type | Connection | Freshness | Owner |
|--------|------|------------|-----------|-------|
| raw.orders | Table | Internal | 2h ago | data-team |
| Salesforce | API | salesforce_conn | Real-time | sales-ops |

### Transformation Chain
Describe how data flows and transforms:
1. Raw data lands in `raw.orders` via Salesforce API sync
2. DAG `transform_orders` cleans and dedupes into `stg.orders`
3. DAG `build_order_facts` joins with dimensions into `fct.orders`

### Data Quality Implications
- Single points of failure?
- Stale upstream sources?
- Complex transformation chains that could break?

### Related Skills
- Check source freshness: **checking-freshness** skill
- Debug source DAG: **debugging-dags** skill
- Trace downstream impacts: **tracing-downstream-lineage** skill
- Add manual lineage annotations: **annotating-task-lineage** skill
- Build custom lineage extractors: **creating-openlineage-extractors** skill

Overview

This skill traces upstream data lineage to answer where data comes from and what feeds a table, column, or DAG. It maps producing DAGs, source systems, and transformation chains into a concise lineage report that highlights sources, connections, and potential health issues.

How this skill works

The skill identifies the target type (table, column, or DAG), locates the producing Airflow DAGs, and inspects DAG source code and tasks for SQL and operator references. It extracts SQL FROM clauses, operator connection names (S3, Postgres, API), and file paths, then recursively follows each upstream source to build a lineage chain. The output includes a one-line summary, an ASCII lineage diagram, source details, and transformation steps.

When to use it

  • You need to know which DAG populates a given table or column.
  • You want to list upstream systems feeding a dataset (databases, S3, APIs).
  • You must audit transformation chains and understand how a value is computed.
  • You need to assess upstream health or identify single points of failure.
  • You are preparing impact or root-cause analysis for data issues.

Best practices

  • Start by confirming the target type (table, column, or DAG) to narrow the search.
  • Search DAG names and inspect DAG source code for INSERT/MERGE/SELECT and operator connections.
  • Recursively trace each discovered source until you hit an external system or raw file.
  • Annotate found connections and owners; record freshness and recent DAG run status.
  • Use related checks (freshness, DAG run stats) to evaluate upstream reliability.

Example use cases

  • Explain where analytics.orders_daily comes from and which DAGs feed it.
  • Trace the origin and transformations behind a specific column like total_amount.
  • Map all external systems and files that land data into a target table.
  • Produce a lineage diagram and source detail table for a data governance review.
  • Identify upstream DAG failures or stale sources causing downstream errors.

FAQ

How do you handle column-level lineage with complex transformations?

Search DAG SQL and transformation code for direct mappings, COALESCE/CASE, aggregations, and joins; trace each contributing column back to its source and describe the transformation step.

What health checks are recommended for upstream sources?

Check table freshness, recent DAG run status, and external connection metadata; flag stale tables, failed DAG runs, and single points of failure.