home / skills / ancoleman / ai-design-components / ingesting-data

ingesting-data skill

/skills/ingesting-data

This skill guides data ingestion across cloud storage, files, APIs, and streaming sources, enabling reliable ETL pipelines and scalable data loading.

npx playbooks add skill ancoleman/ai-design-components --skill ingesting-data

Review the files below or copy the command above to add this skill to your agents.

Files (11)
SKILL.md
7.6 KB
---
name: ingesting-data
description: Data ingestion patterns for loading data from cloud storage, APIs, files, and streaming sources into databases. Use when importing CSV/JSON/Parquet files, pulling from S3/GCS buckets, consuming API feeds, or building ETL pipelines.
---

# Data Ingestion Patterns

This skill provides patterns for getting data INTO systems from external sources.

## When to Use This Skill

- Importing CSV, JSON, Parquet, or Excel files
- Loading data from S3, GCS, or Azure Blob storage
- Consuming REST/GraphQL API feeds
- Building ETL/ELT pipelines
- Database migration and CDC (Change Data Capture)
- Streaming data ingestion from Kafka/Kinesis

## Ingestion Pattern Decision Tree

```
What is your data source?
├── Cloud Storage (S3, GCS, Azure) → See cloud-storage.md
├── Files (CSV, JSON, Parquet) → See file-formats.md
├── REST/GraphQL APIs → See api-feeds.md
├── Streaming (Kafka, Kinesis) → See streaming-sources.md
├── Legacy Database → See database-migration.md
└── Need full ETL framework → See etl-tools.md
```

## Quick Start by Language

### Python (Recommended for ETL)

**dlt (data load tool) - Modern Python ETL:**
```python
import dlt

# Define a source
@dlt.source
def github_source(repo: str):
    @dlt.resource(write_disposition="merge", primary_key="id")
    def issues():
        response = requests.get(f"https://api.github.com/repos/{repo}/issues")
        yield response.json()
    return issues

# Load to destination
pipeline = dlt.pipeline(
    pipeline_name="github_issues",
    destination="postgres",  # or duckdb, bigquery, snowflake
    dataset_name="github_data"
)

load_info = pipeline.run(github_source("owner/repo"))
print(load_info)
```

**Polars for file processing (faster than pandas):**
```python
import polars as pl

# Read CSV with schema inference
df = pl.read_csv("data.csv")

# Read Parquet (columnar, efficient)
df = pl.read_parquet("s3://bucket/data.parquet")

# Read JSON lines
df = pl.read_ndjson("events.jsonl")

# Write to database
df.write_database(
    table_name="events",
    connection="postgresql://user:pass@localhost/db",
    if_table_exists="append"
)
```

### TypeScript/Node.js

**S3 ingestion:**
```typescript
import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3";
import { parse } from "csv-parse/sync";

const s3 = new S3Client({ region: "us-east-1" });

async function ingestFromS3(bucket: string, key: string) {
  const response = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
  const body = await response.Body?.transformToString();

  // Parse CSV
  const records = parse(body, { columns: true, skip_empty_lines: true });

  // Insert to database
  await db.insert(eventsTable).values(records);
}
```

**API feed polling:**
```typescript
import { Hono } from "hono";

// Webhook receiver for real-time ingestion
const app = new Hono();

app.post("/webhooks/stripe", async (c) => {
  const event = await c.req.json();

  // Validate webhook signature
  const signature = c.req.header("stripe-signature");
  // ... validation logic

  // Ingest event
  await db.insert(stripeEventsTable).values({
    eventId: event.id,
    type: event.type,
    data: event.data,
    receivedAt: new Date()
  });

  return c.json({ received: true });
});
```

### Rust

**High-performance file ingestion:**
```rust
use polars::prelude::*;
use aws_sdk_s3::Client;

async fn ingest_parquet(client: &Client, bucket: &str, key: &str) -> Result<DataFrame> {
    // Download from S3
    let resp = client.get_object()
        .bucket(bucket)
        .key(key)
        .send()
        .await?;

    let bytes = resp.body.collect().await?.into_bytes();

    // Parse with Polars
    let df = ParquetReader::new(Cursor::new(bytes))
        .finish()?;

    Ok(df)
}
```

### Go

**Concurrent file processing:**
```go
package main

import (
    "context"
    "encoding/csv"
    "github.com/aws/aws-sdk-go-v2/service/s3"
)

func ingestCSV(ctx context.Context, client *s3.Client, bucket, key string) error {
    resp, err := client.GetObject(ctx, &s3.GetObjectInput{
        Bucket: &bucket,
        Key:    &key,
    })
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    reader := csv.NewReader(resp.Body)
    records, err := reader.ReadAll()
    if err != nil {
        return err
    }

    // Batch insert to database
    return batchInsert(ctx, records)
}
```

## Ingestion Patterns

### 1. Batch Ingestion (Files/Storage)

For periodic bulk loads:

```
Source → Extract → Transform → Load → Validate
  ↓         ↓          ↓         ↓        ↓
 S3      Download   Clean/Map  Insert   Count check
```

**Key considerations:**
- Use chunked reading for large files (>100MB)
- Implement idempotency with checksums
- Track file processing state
- Handle partial failures

### 2. Streaming Ingestion (Real-time)

For continuous data flow:

```
Source → Buffer → Process → Load → Ack
  ↓        ↓         ↓        ↓      ↓
Kafka   In-memory  Transform  DB   Commit offset
```

**Key considerations:**
- At-least-once vs exactly-once semantics
- Backpressure handling
- Dead letter queues for failures
- Checkpoint management

### 3. API Polling (Feeds)

For external API data:

```
Schedule → Fetch → Dedupe → Load → Update cursor
   ↓         ↓        ↓       ↓         ↓
 Cron     API call  By ID   Insert   Last timestamp
```

**Key considerations:**
- Rate limiting and backoff
- Incremental loading (cursors, timestamps)
- API pagination handling
- Retry with exponential backoff

### 4. Change Data Capture (CDC)

For database replication:

```
Source DB → Capture changes → Transform → Target DB
    ↓             ↓               ↓            ↓
 Postgres    Debezium/WAL      Map schema   Insert/Update
```

**Key considerations:**
- Initial snapshot + streaming changes
- Schema evolution handling
- Ordering guarantees
- Conflict resolution

## Library Recommendations

| Use Case | Python | TypeScript | Rust | Go |
|----------|--------|------------|------|-----|
| **ETL Framework** | dlt, Meltano, Dagster | - | - | - |
| **Cloud Storage** | boto3, gcsfs, adlfs | @aws-sdk/*, @google-cloud/* | aws-sdk-s3, object_store | aws-sdk-go-v2 |
| **File Processing** | polars, pandas, pyarrow | papaparse, xlsx, parquetjs | polars-rs, arrow-rs | encoding/csv, parquet-go |
| **Streaming** | confluent-kafka, aiokafka | kafkajs | rdkafka-rs | franz-go, sarama |
| **CDC** | Debezium, pg_logical | - | - | - |

## Reference Documentation

- `references/cloud-storage.md` - S3, GCS, Azure Blob patterns
- `references/file-formats.md` - CSV, JSON, Parquet, Excel handling
- `references/api-feeds.md` - REST polling, webhooks, GraphQL subscriptions
- `references/streaming-sources.md` - Kafka, Kinesis, Pub/Sub
- `references/database-migration.md` - Schema migration, CDC patterns
- `references/etl-tools.md` - dlt, Meltano, Airbyte, Fivetran

## Scripts

- `scripts/validate_csv_schema.py` - Validate CSV against expected schema
- `scripts/test_s3_connection.py` - Test S3 bucket connectivity
- `scripts/generate_dlt_pipeline.py` - Generate dlt pipeline scaffold

## Chaining with Database Skills

After ingestion, chain to appropriate database skill:

| Destination | Chain to Skill |
|-------------|----------------|
| PostgreSQL, MySQL | `databases-relational` |
| MongoDB, DynamoDB | `databases-document` |
| Qdrant, Pinecone | `databases-vector` (after embedding) |
| ClickHouse, TimescaleDB | `databases-timeseries` |
| Neo4j | `databases-graph` |

For vector databases, chain through `ai-data-engineering` for embedding:
```
ingesting-data → ai-data-engineering → databases-vector
```

Overview

This skill documents data ingestion patterns for loading data from cloud storage, APIs, files, and streaming sources into databases. It provides practical recipes, library recommendations, and decision guidance for batch, streaming, API polling, and CDC workflows. Use these patterns to design reliable, idempotent, and scalable ETL/ELT pipelines across Python, TypeScript, Rust, and Go.

How this skill works

The skill describes source-specific flows (cloud storage, file formats, REST/GraphQL, streaming, legacy databases) and a decision tree to pick the right pattern. It includes code patterns and tooling advice for extracting, transforming, loading, and validating data, plus operational concerns like checkpoints, retries, and schema evolution. Language-specific examples (Python dlt/polars, Node S3, Rust/Go readers) show concrete implementations and common libraries.

When to use it

  • Importing CSV, JSON, Parquet, or Excel files into a database or data lake
  • Loading objects from S3, GCS, or Azure Blob storage for batch processing
  • Consuming REST/GraphQL APIs or webhook feeds for incremental ingestion
  • Building real-time streaming pipelines from Kafka, Kinesis, or Pub/Sub
  • Migrating databases or implementing change data capture (CDC) workflows

Best practices

  • Chunk large files and use streaming readers to avoid memory spikes
  • Implement idempotency (checksums, processed-state tracking) and safe replays
  • Use incremental loading (cursors, timestamps) and respect API rate limits
  • Provide retries with exponential backoff, dead-letter queues, and monitoring
  • Design for schema evolution: mapping, validation, and backward-compatible changes

Example use cases

  • Daily batch load: download Parquet from S3, transform with Polars, insert to Postgres
  • Webhook ingestion: receive Stripe events, validate signatures, persist to events table
  • Streaming ETL: consume Kafka topic, apply transformations, write to analytical DB with checkpointing
  • API polling: scheduled fetch of paginated REST feed, dedupe by ID, update incremental cursor
  • CDC pipeline: initial snapshot + Debezium streaming to replicate Postgres into a target DB

FAQ

Which language is recommended for ETL?

Python is recommended for ETL tasks due to rich tooling (dlt, Polars, PyArrow) and a large ecosystem for connectors and transformations.

How do I guarantee exactly-once delivery?

Exactly-once requires end-to-end idempotency and transactional writes or deduplication keys; implement checkpoints, use transactional sinks if supported, and handle retries carefully.

When should I use streaming vs batch?

Use streaming for low-latency, continuous data flows and event-driven use cases; use batch for periodic bulk ingestion, large file processing, or cost-effective analytics loads.