home / skills / astronomer / agents / creating-openlineage-extractors
This skill helps you create custom OpenLineage extractors for Airflow to capture precise lineage from unsupported operators.
npx playbooks add skill astronomer/agents --skill creating-openlineage-extractorsReview the files below or copy the command above to add this skill to your agents.
---
name: creating-openlineage-extractors
description: Create custom OpenLineage extractors for Airflow operators. Use when the user needs lineage from unsupported or third-party operators, wants column-level lineage, or needs complex extraction logic beyond what inlets/outlets provide.
---
# Creating OpenLineage Extractors
This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that don't have built-in support.
> **Reference:** See the [OpenLineage provider developer guide](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/developer.html) for the latest patterns and list of supported operators/hooks.
## When to Use Each Approach
| Scenario | Approach |
|----------|----------|
| Operator you own/maintain | **OpenLineage Methods** (recommended, simplest) |
| Third-party operator you can't modify | Custom Extractor |
| Need column-level lineage | OpenLineage Methods or Custom Extractor |
| Complex extraction logic | OpenLineage Methods or Custom Extractor |
| Simple table-level lineage | Inlets/Outlets (simplest, but lowest priority) |
> **Important:** Always prefer OpenLineage methods over custom extractors when possible. Extractors are harder to write, easier to diverge from operator behavior after changes, and harder to debug.
---
## Two Approaches
### 1. OpenLineage Methods (Recommended)
Use when you can add methods directly to your custom operator. This is the **go-to solution** for operators you own.
### 2. Custom Extractors
Use when you need lineage from third-party or provider operators that you **cannot modify**.
---
## Approach 1: OpenLineage Methods (Recommended)
When you own the operator, add OpenLineage methods directly:
```python
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
"""Custom operator with built-in OpenLineage support."""
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
self._rows_processed = 0 # Set during execution
def execute(self, context):
# Do the actual work
self._rows_processed = self._process_data()
return self._rows_processed
def get_openlineage_facets_on_start(self):
"""Called when task starts. Return known inputs/outputs."""
# Import locally to avoid circular imports
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
)
def get_openlineage_facets_on_complete(self, task_instance):
"""Called after success. Add runtime metadata."""
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import output_statistics_output_dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[
Dataset(
namespace="postgres://db",
name=self.target_table,
facets={
"outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
rowCount=self._rows_processed
)
},
)
],
)
def get_openlineage_facets_on_failure(self, task_instance):
"""Called after failure. Optional - for partial lineage."""
return None
```
### OpenLineage Methods Reference
| Method | When Called | Required |
|--------|-------------|----------|
| `get_openlineage_facets_on_start()` | Task enters RUNNING | No |
| `get_openlineage_facets_on_complete(ti)` | Task succeeds | No |
| `get_openlineage_facets_on_failure(ti)` | Task fails | No |
> Implement only the methods you need. Unimplemented methods fall through to Hook-Level Lineage or inlets/outlets.
---
## Approach 2: Custom Extractors
Use this approach only when you **cannot modify** the operator (e.g., third-party or provider operators).
### Basic Structure
```python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class MyOperatorExtractor(BaseExtractor):
"""Extract lineage from MyCustomOperator."""
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""Return operator class names this extractor handles."""
return ["MyCustomOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
"""Called BEFORE operator executes. Use for known inputs/outputs."""
# Access operator properties via self.operator
source_table = self.operator.source_table
target_table = self.operator.target_table
return OperatorLineage(
inputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{source_table}",
)
],
outputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{target_table}",
)
],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""Called AFTER operator executes. Use for runtime-determined lineage."""
# Access properties set during execution
# Useful for operators that determine outputs at runtime
return None
```
### OperatorLineage Structure
```python
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
lineage = OperatorLineage(
inputs=[Dataset(namespace="...", name="...")], # Input datasets
outputs=[Dataset(namespace="...", name="...")], # Output datasets
run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")}, # Run metadata
job_facets={}, # Job metadata
)
```
### Extraction Methods
| Method | When Called | Use For |
|--------|-------------|---------|
| `_execute_extraction()` | Before operator runs | Static/known lineage |
| `extract_on_complete(task_instance)` | After success | Runtime-determined lineage |
| `extract_on_failure(task_instance)` | After failure | Partial lineage on errors |
### Registering Extractors
**Option 1: Configuration file (`airflow.cfg`)**
```ini
[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor
```
**Option 2: Environment variable**
```bash
AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'
```
> **Important:** The path must be importable from the Airflow worker. Place extractors in your DAGs folder or installed package.
---
## Common Patterns
### SQL Operator Extractor
```python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
class MySqlOperatorExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["MySqlOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
sql = self.operator.sql
conn_id = self.operator.conn_id
# Parse SQL to find tables (simplified example)
# In practice, use a SQL parser like sqlglot
inputs, outputs = self._parse_sql(sql)
namespace = f"postgres://{conn_id}"
return OperatorLineage(
inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
job_facets={
"sql": sql_job.SQLJobFacet(query=sql)
},
)
def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
"""Parse SQL to extract table names. Use sqlglot for real parsing."""
# Simplified example - use proper SQL parser in production
inputs = []
outputs = []
# ... parsing logic ...
return inputs, outputs
```
### File Transfer Extractor
```python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class S3ToSnowflakeExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["S3ToSnowflakeOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
s3_bucket = self.operator.s3_bucket
s3_key = self.operator.s3_key
table = self.operator.table
schema = self.operator.schema
return OperatorLineage(
inputs=[
Dataset(
namespace=f"s3://{s3_bucket}",
name=s3_key,
)
],
outputs=[
Dataset(
namespace="snowflake://myaccount.snowflakecomputing.com",
name=f"{schema}.{table}",
)
],
)
```
### Dynamic Lineage from Execution
```python
from openlineage.client.event_v2 import Dataset
class DynamicOutputExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["DynamicOutputOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
# Only inputs known before execution
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
# Outputs determined during execution
# Access via operator properties set in execute()
outputs = self.operator.created_tables # Set during execute()
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
outputs=[Dataset(namespace="...", name=t) for t in outputs],
)
```
---
## Common Pitfalls
### 1. Circular Imports
**Problem:** Importing Airflow modules at the top level causes circular imports.
```python
# ❌ BAD - can cause circular import issues
from airflow.models import TaskInstance
from openlineage.client.event_v2 import Dataset
class MyExtractor(BaseExtractor):
...
```
```python
# ✅ GOOD - import inside methods
class MyExtractor(BaseExtractor):
def _execute_extraction(self):
from openlineage.client.event_v2 import Dataset
# ...
```
### 2. Wrong Import Path
**Problem:** Extractor path doesn't match actual module location.
```bash
# ❌ Wrong - path doesn't exist
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'
# ✅ Correct - full importable path
AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'
```
### 3. Not Handling None
**Problem:** Extraction fails when operator properties are None.
```python
# ✅ Handle optional properties
def _execute_extraction(self) -> OperatorLineage | None:
if not self.operator.source_table:
return None # Skip extraction
return OperatorLineage(...)
```
---
## Testing Extractors
### Unit Testing
```python
import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor
def test_extractor():
# Mock the operator
operator = MagicMock()
operator.source_table = "input_table"
operator.target_table = "output_table"
# Create extractor
extractor = MyOperatorExtractor(operator)
# Test extraction
lineage = extractor._execute_extraction()
assert len(lineage.inputs) == 1
assert lineage.inputs[0].name == "input_table"
assert len(lineage.outputs) == 1
assert lineage.outputs[0].name == "output_table"
```
---
## Precedence Rules
OpenLineage checks for lineage in this order:
1. **Custom Extractors** (highest priority)
2. **OpenLineage Methods** on operator
3. **Hook-Level Lineage** (from `HookLineageCollector`)
4. **Inlets/Outlets** (lowest priority)
If a custom extractor exists, it overrides built-in extraction and inlets/outlets.
---
## Related Skills
- **annotating-task-lineage**: For simple table-level lineage with inlets/outlets
- **tracing-upstream-lineage**: Investigate data origins
- **tracing-downstream-lineage**: Investigate data dependencies
This skill helps you create custom OpenLineage extractors for Airflow operators to capture lineage when built-in support is missing. It explains when to implement OpenLineage methods vs. writing a custom extractor, how extractors hook into task lifecycle, and practical patterns for SQL, file-transfer, and dynamic-output operators.
The skill shows two approaches: add OpenLineage methods directly on operators you control, or implement a BaseExtractor subclass for third-party operators you cannot modify. Extractors expose methods called before execution (_execute_extraction), after success (extract_on_complete), and on failure (extract_on_failure) to produce OperatorLineage objects containing datasets and facets.
When should I write a custom extractor instead of operator methods?
Write a custom extractor only when you cannot modify the operator (third-party/provider). If you control the operator, prefer adding OpenLineage methods — they are simpler and safer.
How do I avoid circular import errors?
Import OpenLineage and Airflow modules inside extraction methods rather than at top-level of the module.