home / skills / astronomer / agents / cosmos-dbt-core

cosmos-dbt-core skill

/skills/cosmos-dbt-core

This skill helps convert a dbt Core project into an Airflow DAG or TaskGroup using Cosmos, with setup validation and execution configuration.

npx playbooks add skill astronomer/agents --skill cosmos-dbt-core

Review the files below or copy the command above to add this skill to your agents.

Files (2)
SKILL.md
13.0 KB
---
name: cosmos-dbt-core
description: Use when turning a dbt Core project into an Airflow DAG/TaskGroup using Astronomer Cosmos. Does not cover dbt Fusion. Before implementing, verify dbt engine, warehouse, Airflow version, execution environment, DAG vs TaskGroup, and manifest availability.
---

# Cosmos + dbt Core: Implementation Checklist

Execute steps in order. Prefer the simplest configuration that meets the user's constraints.

> **Version note**: This skill targets Cosmos 1.11+ and Airflow 3.x. If the user is on Airflow 2.x, adjust imports accordingly (see Appendix A).
>
> **Reference**: Latest stable: https://pypi.org/project/astronomer-cosmos/

> **Before starting**, confirm: (1) dbt engine = Core (not Fusion → use **cosmos-dbt-fusion**), (2) warehouse type, (3) Airflow version, (4) execution environment (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup vs individual operators, (6) manifest availability.

---

## 1. Configure Project (ProjectConfig)

| Approach | When to use | Required param |
|----------|-------------|----------------|
| Project path | Files available locally | `dbt_project_path` |
| Manifest only | `dbt_manifest` load | `manifest_path` + `project_name` |

```python
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # project_name="my_project",  # if using manifest_path without dbt_project_path
    # install_dbt_deps=False,  # if deps precomputed in CI
)
```

## 2. Choose Parsing Strategy (RenderConfig)

Pick ONE load mode based on constraints:

| Load mode | When to use | Required inputs | Constraints |
|-----------|-------------|-----------------|-------------|
| `dbt_manifest` | Large projects; containerized execution; fastest | `ProjectConfig.manifest_path` | Remote manifest needs `manifest_conn_id` |
| `dbt_ls` | Complex selectors; need dbt-native selection | dbt installed OR `dbt_executable_path` | Can also be used with containerized execution |
| `dbt_ls_file` | dbt_ls selection without running dbt_ls every parse | `RenderConfig.dbt_ls_path` | `select`/`exclude` won't work |
| `automatic` (default) | Simple setups; let Cosmos pick | (none) | Falls back: manifest → dbt_ls → custom |

> **CRITICAL**: Containerized execution (`DOCKER`/`KUBERNETES`/etc.)

```python
from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.DBT_MANIFEST,  # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)
```

---

## 3. Choose Execution Mode (ExecutionConfig)

> **Reference**: See **[reference/cosmos-config.md](reference/cosmos-config.md#execution-modes-executionconfig)** for detailed configuration examples per mode.

Pick ONE execution mode:

| Execution mode | When to use | Speed | Required setup |
|----------------|-------------|-------|----------------|
| `WATCHER` | Fastest; single `dbt build` visibility | Fastest | dbt adapter in env OR `dbt_executable_path` or dbt Fusion |
| `WATCHER_KUBERNETES` | Fastest isolated method; single `dbt build` visibility | Fast | dbt installed in container |
| `LOCAL` + `DBT_RUNNER` | dbt + adapter in the same Python installation as Airflow | Fast | dbt 1.5+ in `requirements.txt` |
| `LOCAL` + `SUBPROCESS` | dbt + adapter available in the Airflow deployment, in an isolated Python installation | Medium | `dbt_executable_path` |
| `AIRFLOW_ASYNC` | BigQuery + long-running transforms | Fast | Airflow ≥2.8; provider deps |
| `KUBERNETES` | Isolation between Airflow and dbt | Medium | Airflow ≥2.8; provider deps |
| `VIRTUALENV` | Can't modify image; runtime venv | Slower | `py_requirements` in operator_args |
| Other containerized approaches | Support Airflow and dbt isolation | Medium | container config |

```python
from cosmos import ExecutionConfig, ExecutionMode

_execution_config = ExecutionConfig(
    execution_mode=ExecutionMode.WATCHER,  # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)
```

---

## 4. Configure Warehouse Connection (ProfileConfig)

> **Reference**: See **[reference/cosmos-config.md](reference/cosmos-config.md#profileconfig-warehouse-connection)** for detailed ProfileConfig options and all ProfileMapping classes.

### Option A: Airflow Connection + ProfileMapping (Recommended)

```python
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
        profile_args={"schema": "my_schema"},
    ),
)
```

### Option B: Existing profiles.yml

> **CRITICAL**: Do not hardcode secrets; use environment variables.

```python
from cosmos import ProfileConfig

_profile_config = ProfileConfig(
    profile_name="my_profile",
    target_name="dev",
    profiles_yml_filepath="/path/to/profiles.yml",
)
```

---

## 5. Configure Testing Behavior (RenderConfig)

> **Reference**: See **[reference/cosmos-config.md](reference/cosmos-config.md#testing-behavior-renderconfig)** for detailed testing options.

| TestBehavior | Behavior |
|--------------|----------|
| `AFTER_EACH` (default) | Tests run immediately after each model (default) |
| `BUILD` | Combine run + test into single `dbt build` |
| `AFTER_ALL` | All tests after all models complete |
| `NONE` | Skip tests |

```python
from cosmos import RenderConfig, TestBehavior

_render_config = RenderConfig(
    test_behavior=TestBehavior.AFTER_EACH,
)
```

---

## 6. Configure operator_args

> **Reference**: See **[reference/cosmos-config.md](reference/cosmos-config.md#operator_args-configuration)** for detailed operator_args options.

```python
_operator_args = {
    # BaseOperator params
    "retries": 3,

    # Cosmos-specific params
    "install_deps": False,
    "full_refresh": False,
    "quiet": True,

    # Runtime dbt vars (XCom / params)
    "vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}
```

---

## 7. Assemble DAG / TaskGroup

### Option A: DbtDag (Standalone)

```python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig()
_render_config = RenderConfig()

my_cosmos_dag = DbtDag(
    dag_id="my_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    operator_args={},
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)
```

### Option B: DbtTaskGroup (Inside Existing DAG)

```python
from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
        render_config=_render_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()
```

### Option C: Use Cosmos operators directly

```python
import os
from datetime import datetime
from pathlib import Path
from typing import Any

from airflow import DAG

try:
    from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
    from airflow.operators.python import PythonOperator

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profiles_yml_filepath=DBT_PROFILE_PATH,
)


def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
    """Check if a file exists in the given S3 bucket."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook

    s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
    print(f"Checking if file {s3_key} exists in S3 bucket...")
    hook = S3Hook(aws_conn_id=aws_conn_id)
    return hook.check_for_key(key=s3_key, bucket_name=bucket_name)


with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
    seed_operator = DbtSeedLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="seed",
        dbt_cmd_flags=["--select", "raw_customers"],
        install_deps=True,
        append_env=True,
    )

    check_file_uploaded_task = PythonOperator(
        task_id="check_file_uploaded_task",
        python_callable=check_s3_file,
        op_kwargs={
            "aws_conn_id": "aws_s3_conn",
            "bucket_name": "cosmos-artifacts-upload",
            "file_key": "target/run_results.json",
        },
    )

    run_operator = DbtRunLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="run",
        dbt_cmd_flags=["--models", "stg_customers"],
        install_deps=True,
        append_env=True,
    )

    clone_operator = DbtCloneLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="clone",
        dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
        install_deps=True,
        append_env=True,
    )

    seed_operator >> run_operator >> clone_operator
    seed_operator >> check_file_uploaded_task
```

### Setting Dependencies on Individual Cosmos Tasks

```python
from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain

with DbtDag(...) as dag:
    @task
    def upstream_task():
        pass

    _upstream = upstream_task()

    for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
        if dbt_node.resource_type == DbtResourceType.SEED:
            my_dbt_task = dag.tasks_map[unique_id]
            chain(_upstream, my_dbt_task)
```

---

## 8. Safety Checks

Before finalizing, verify:

- [ ] Execution mode matches constraints (AIRFLOW_ASYNC → BigQuery only)
- [ ] Warehouse adapter installed for chosen execution mode
- [ ] Secrets via Airflow connections or env vars, NOT plaintext
- [ ] Load mode matches execution (complex selectors → dbt_ls)
- [ ] Airflow 3 asset URIs if downstream DAGs scheduled on Cosmos assets (see Appendix A)

---

## Appendix A: Airflow 3 Compatibility

### Import Differences

| Airflow 3.x | Airflow 2.x |
|-------------|-------------|
| `from airflow.sdk import dag, task` | `from airflow.decorators import dag, task` |
| `from airflow.sdk import chain` | `from airflow.models.baseoperator import chain` |

### Asset/Dataset URI Format Change

Cosmos ≤1.9 (Airflow 2 Datasets):
```
postgres://0.0.0.0:5434/postgres.public.orders
```

Cosmos ≥1.10 (Airflow 3 Assets):
```
postgres://0.0.0.0:5434/postgres/public/orders
```

> **CRITICAL**: Update asset URIs when upgrading to Airflow 3.

---

## Appendix B: Operational Extras

### Caching

Cosmos caches artifacts to speed up parsing. Enabled by default.

Reference: https://astronomer.github.io/astronomer-cosmos/configuration/caching.html

### Memory-Optimized Imports

```bash
AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True
```

When enabled:
```python
from cosmos.airflow.dag import DbtDag  # instead of: from cosmos import DbtDag
```

### Artifact Upload to Object Storage

```bash
AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
```

```python
from cosmos.io import upload_to_cloud_storage

my_dag = DbtDag(
    # ...
    operator_args={"callback": upload_to_cloud_storage},
)
```

### dbt Docs Hosting (Airflow 3.1+ / Cosmos 1.11+)

```bash
AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
    "my_project": {
        "dir": "s3://bucket/docs/",
        "index": "index.html",
        "conn_id": "aws_default",
        "name": "My Project"
    }
}'
```

Reference: https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html

---

## Related Skills

- **cosmos-dbt-fusion**: For dbt Fusion projects (not dbt Core)
- **authoring-dags**: General DAG authoring patterns
- **testing-dags**: Testing DAGs after creation

Overview

This skill converts a dbt Core project into an Airflow DAG or TaskGroup using Astronomer Cosmos. It guides selection of parsing and execution strategies, configures warehouse/profile/project details, and assembles a runnable DbtDag or DbtTaskGroup with safe operator settings. Use it to produce predictable, production-ready Airflow integrations for dbt Core (not dbt Fusion).

How this skill works

The skill inspects project constraints (dbt engine, warehouse, Airflow version, execution environment, DAG vs TaskGroup, and manifest availability) and chooses a RenderConfig load mode and ExecutionConfig execution mode accordingly. It configures ProfileConfig and ProjectConfig, sets test behavior and operator_args, then builds either a standalone DbtDag or an embedded DbtTaskGroup. Finally it runs safety checks to ensure compatibility (container requirements, adapters, secret handling, and Airflow 3 asset URI differences).

When to use it

  • You have a dbt Core project and want an Airflow-native DAG/TaskGroup via Cosmos
  • You run Airflow 3.x with Cosmos >=1.11 (or can adapt imports for Airflow 2.x)
  • You need explicit control over parsing for large projects or containerized execution
  • You require integrated dbt test behavior, operator tuning, or artifact/cloud uploads
  • You must enforce connection-based secrets (no plaintext profiles)

Best practices

  • Confirm dbt engine is Core (not Fusion) before starting
  • For containerized execution always use dbt_manifest load mode and supply a manifest path
  • Prefer Airflow connections or environment variables for secrets; never hardcode credentials
  • Pick the simplest execution mode that meets constraints (WATCHER/LOCAL first, containers only when isolation required)
  • Validate Airflow version to use correct imports and update asset URIs when migrating to Airflow 3

Example use cases

  • Create a standalone DbtDag for nightly dbt builds with Snowflake credentials mapped via ProfileMapping
  • Embed a DbtTaskGroup inside an existing DAG to run dbt models between upstream and downstream tasks
  • Use dbt_manifest load mode for very large projects or CI-produced manifests in containerized Airflow workers
  • Configure test_behavior to AFTER_EACH for lineage and quick failure feedback in development

FAQ

What load mode should I pick for containerized execution?

Always use dbt_manifest load mode for containerized execution; manifest_path (and optionally project_name) must be provided.

How do I avoid exposing secrets?

Use Airflow Connections or environment variables mapped into the runtime. Do not put credentials or secrets directly in profiles.yml or operator_args.