home / skills / mjunaidca / mjs-agent-skills / building-rag-systems

building-rag-systems skill

/.claude/skills/building-rag-systems

This skill helps you build production-grade RAG systems with semantic chunking, incremental indexing, and filtered retrieval for scalable context-aware search.

npx playbooks add skill mjunaidca/mjs-agent-skills --skill building-rag-systems

Review the files below or copy the command above to add this skill to your agents.

Files (4)
SKILL.md
10.8 KB
---
name: building-rag-systems
description: |
  Build production RAG systems with semantic chunking, incremental indexing, and filtered retrieval.
  Use when implementing document ingestion pipelines, vector search with Qdrant, or context-aware
  retrieval. Covers chunking strategies, change detection, payload indexing, and context expansion.
  NOT when doing simple similarity search without production requirements.
---

# Building RAG Systems

Production-grade RAG with semantic chunking, incremental updates, and filtered retrieval.

## Quick Start

```bash
# Dependencies
pip install qdrant-client openai pydantic python-frontmatter

# Core components
# 1. Crawler → discovers files, extracts path metadata
# 2. Parser → extracts frontmatter, computes file hash
# 3. Chunker → semantic split on ## headers, 400 tokens, 15% overlap
# 4. Embedder → batched OpenAI embeddings
# 5. Uploader → Qdrant upsert with indexed payloads
```

---

## Ingestion Pipeline

### Architecture

```
┌──────────┐    ┌────────┐    ┌─────────┐    ┌──────────┐    ┌──────────┐
│ Crawler  │ -> │ Parser │ -> │ Chunker │ -> │ Embedder │ -> │ Uploader │
└──────────┘    └────────┘    └─────────┘    └──────────┘    └──────────┘
     │              │              │              │              │
Discovers      Extracts       Splits by     Generates      Upserts to
files          frontmatter    semantic      vectors        Qdrant
               + file hash    boundaries    (batched)      (batched)
```

### Semantic Chunking (NOT Fixed-Size)

```python
class SemanticChunker:
    """
    Production chunking:
    - Split on ## headers (semantic boundaries)
    - Target 400 tokens (NVIDIA benchmark optimal)
    - 15% overlap for context continuity
    - Track prev/next for context expansion
    """
    SECTION_PATTERN = re.compile(r"(?=^## )", re.MULTILINE)
    TOKENS_PER_WORD = 1.3

    def __init__(
        self,
        target_tokens: int = 400,
        max_tokens: int = 512,
        overlap_percent: float = 0.15,
    ):
        self.target_words = int(target_tokens / self.TOKENS_PER_WORD)
        self.overlap_words = int(self.target_words * overlap_percent)

    def chunk(self, content: str, file_hash: str) -> list[Chunk]:
        sections = self.SECTION_PATTERN.split(content)
        chunks = []

        for idx, section in enumerate(sections):
            content_hash = hashlib.sha256(section.encode()).hexdigest()[:16]
            chunk_id = f"{file_hash[:8]}_{content_hash}_{idx}"

            chunks.append(Chunk(
                id=chunk_id,
                text=section,
                chunk_index=idx,
                total_chunks=len(sections),
                prev_chunk_id=chunks[-1].id if chunks else None,
                content_hash=content_hash,
                source_file_hash=file_hash,
            ))

            # Set next_chunk_id on previous
            if len(chunks) > 1:
                chunks[-2].next_chunk_id = chunk_id

        return chunks
```

### Change Detection (Incremental Updates)

```python
def compute_file_hash(file_path: str) -> str:
    """SHA-256 for change detection."""
    with open(file_path, 'rb') as f:
        return hashlib.sha256(f.read()).hexdigest()

class QdrantStateTracker:
    """Query Qdrant payloads directly - no external state DB needed."""

    def get_indexed_files(self, book_id: str) -> dict[str, str]:
        """Returns {file_path: file_hash} from Qdrant."""
        indexed = {}
        offset = None

        while True:
            points, next_offset = self.client.scroll(
                collection_name=self.collection,
                scroll_filter=Filter(must=[
                    FieldCondition(key="book_id", match=MatchValue(value=book_id))
                ]),
                limit=100,
                offset=offset,
                with_payload=["source_file", "source_file_hash"],
                with_vectors=False,
            )

            for point in points:
                indexed[point.payload["source_file"]] = point.payload["source_file_hash"]

            if next_offset is None:
                break
            offset = next_offset

        return indexed

    def detect_changes(self, current: dict[str, str], indexed: dict[str, str]):
        """Compare filesystem vs index."""
        new = [p for p in current if p not in indexed]
        deleted = [p for p in indexed if p not in current]
        modified = [p for p in current if p in indexed and current[p] != indexed[p]]
        return new, modified, deleted
```

### Batched Embeddings

```python
class OpenAIEmbedder:
    def __init__(self, model: str = "text-embedding-3-small", batch_size: int = 20):
        self.client = OpenAI()
        self.model = model
        self.batch_size = batch_size  # OpenAI recommendation

    def embed_chunks(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
        embedded = []
        for i in range(0, len(chunks), self.batch_size):
            batch = chunks[i:i + self.batch_size]
            response = self.client.embeddings.create(
                input=[c.text for c in batch],
                model=self.model,
            )
            for chunk, data in zip(batch, response.data):
                embedded.append(EmbeddedChunk(**chunk.dict(), embedding=data.embedding))
        return embedded
```

### Qdrant Collection with Payload Indexes

```python
def create_collection(self, recreate: bool = False):
    """Create collection with proper indexes for filtered retrieval."""
    self.client.create_collection(
        collection_name=self.collection,
        vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
    )

    # Index ALL fields you filter by
    indexes = [
        ("book_id", PayloadSchemaType.KEYWORD),      # Tenant isolation
        ("module", PayloadSchemaType.KEYWORD),       # Content filter
        ("chapter", PayloadSchemaType.INTEGER),      # Range filter
        ("hardware_tier", PayloadSchemaType.INTEGER),# Personalization
        ("proficiency_level", PayloadSchemaType.KEYWORD),
        ("parent_doc_id", PayloadSchemaType.KEYWORD),# Context expansion
        ("source_file_hash", PayloadSchemaType.KEYWORD),  # Change detection
    ]

    for field, schema in indexes:
        self.client.create_payload_index(
            collection_name=self.collection,
            field_name=field,
            field_schema=schema,
        )
```

---

## Retrieval Patterns

### Comprehensive Filter Builder

```python
def build_filter(self, query: SearchQuery) -> Filter:
    """Build Qdrant filter with all conditions (AND logic)."""
    conditions = []

    # Required: Tenant isolation
    conditions.append(FieldCondition(
        key="book_id", match=MatchValue(value=query.book_id)
    ))

    # Required: Hardware tier (lte = "tier X or lower")
    conditions.append(FieldCondition(
        key="hardware_tier", range=Range(lte=query.hardware_tier)
    ))

    # Optional: Module exact match
    if query.module:
        conditions.append(FieldCondition(
            key="module", match=MatchValue(value=query.module)
        ))

    # Optional: Chapter range
    if query.chapter_min or query.chapter_max:
        chapter_range = Range()
        if query.chapter_min:
            chapter_range.gte = query.chapter_min
        if query.chapter_max:
            chapter_range.lte = query.chapter_max
        conditions.append(FieldCondition(key="chapter", range=chapter_range))

    # Optional: Proficiency OR logic
    if query.proficiency_levels:
        conditions.append(FieldCondition(
            key="proficiency_level",
            match=MatchAny(any=query.proficiency_levels),
        ))

    return Filter(must=conditions)
```

### Context Expansion (Walk Chunk Chain)

```python
def expand_context(self, chunk_id: str, prev: int = 1, next: int = 1) -> list[Chunk]:
    """Walk prev_chunk_id/next_chunk_id chain for surrounding context."""
    current = self.get_chunk_by_id(chunk_id)
    if not current:
        return []

    # Walk backwards
    prev_chunks = []
    prev_id = current.prev_chunk_id
    for _ in range(prev):
        if not prev_id:
            break
        chunk = self.get_chunk_by_id(prev_id)
        if not chunk:
            break
        prev_chunks.insert(0, chunk)
        prev_id = chunk.prev_chunk_id

    # Walk forwards
    next_chunks = []
    next_id = current.next_chunk_id
    for _ in range(next):
        if not next_id:
            break
        chunk = self.get_chunk_by_id(next_id)
        if not chunk:
            break
        next_chunks.append(chunk)
        next_id = chunk.next_chunk_id

    return prev_chunks + [current] + next_chunks
```

### Full Document Retrieval

```python
def get_document_chunks(self, parent_doc_id: str) -> list[Chunk]:
    """Get all chunks for a document, ordered by chunk_index."""
    points, _ = self.client.scroll(
        collection_name=self.collection,
        scroll_filter=Filter(must=[
            FieldCondition(key="parent_doc_id", match=MatchValue(value=parent_doc_id))
        ]),
        limit=100,
        with_payload=True,
        with_vectors=False,
    )

    chunks = [self._to_chunk(p) for p in points]
    chunks.sort(key=lambda c: c.chunk_index)
    return chunks
```

---

## Payload Schema

```python
class ChunkPayload(BaseModel):
    """Complete payload for filtered retrieval and context expansion."""

    # Tenant isolation
    book_id: str

    # Content filters (all indexed)
    module: str
    chapter: int
    lesson: int
    hardware_tier: int
    proficiency_level: str

    # Display content
    text: str
    section_title: Optional[str]
    source_file: str

    # Context expansion
    parent_doc_id: str
    chunk_index: int
    total_chunks: int
    prev_chunk_id: Optional[str]
    next_chunk_id: Optional[str]

    # Change detection
    content_hash: str
    source_file_hash: str
```

---

## Anti-Patterns

| Don't | Do Instead |
|-------|------------|
| Fixed character chunking | Semantic boundaries (## headers) |
| Position-based chunk IDs | Content hash for stable IDs |
| No overlap between chunks | 10-20% overlap for continuity |
| Full re-index on every change | Incremental with file hash detection |
| Missing payload indexes | Index every field you filter by |
| Synchronous embedding | Batch with background jobs |
| External state database | Qdrant-native state tracking |

---

## Verification

Run: `python scripts/verify.py`

## Related Skills

- `scaffolding-fastapi-dapr` - API patterns for search endpoints
- `streaming-llm-responses` - Streaming RAG responses

## References

- [references/ingestion-patterns.md](references/ingestion-patterns.md) - Full ingestion pipeline
- [references/retrieval-patterns.md](references/retrieval-patterns.md) - Filter strategies, context expansion

Overview

This skill teaches how to build production-ready Retrieval-Augmented Generation (RAG) systems with semantic chunking, incremental indexing, and filtered retrieval. It focuses on robust document ingestion pipelines, batched embedding, and Qdrant collections with payload indexes for context-aware search. The guidance targets production constraints like change detection, tenant isolation, and context expansion.

How this skill works

The pipeline discovers files, parses frontmatter and file hashes, semantically chunks content on header boundaries, and generates batched embeddings. Chunks include stable IDs based on content hashes and linked prev/next pointers to enable context expansion. Uploads are batched into Qdrant with indexed payload fields to support efficient filtered retrieval and incremental updates using file-hash change detection.

When to use it

  • Implementing document ingestion pipelines for knowledge bases or product docs.
  • Building vector search with Qdrant that requires tenant isolation and filters.
  • Needing production features: incremental reindexing, chunk linking, and context expansion.
  • Creating personalized retrieval (hardware tiers, proficiency levels, modules).
  • When full document context or ordered chunk retrieval is required.

Best practices

  • Chunk on semantic boundaries (e.g., ## headers) rather than fixed characters.
  • Target ~400 tokens per chunk with 10–20% overlap to preserve continuity.
  • Use content-hash-based chunk IDs for stable identifiers across edits.
  • Batch embeddings (OpenAI recommended batch sizes) and run asynchronously.
  • Index every payload field you plan to filter on inside Qdrant.
  • Detect changes by comparing filesystem SHA-256 hashes to indexed source_file_hash.

Example use cases

  • Courseware search where filters constrain by module, chapter range, and proficiency.
  • Tenant-isolated knowledge bases that require per-tenant book_id filtering.
  • Documentation search with context expansion to assemble surrounding sections.
  • Incremental indexing workflow that avoids full re-index on small edits.
  • Personalized retrieval that respects hardware tier or user capability constraints.

FAQ

Why semantic chunking instead of fixed-size chunks?

Semantic chunking preserves logical boundaries, yields better embeddings, and reduces irrelevant splits; overlap ensures context continuity.

How do I avoid re-indexing everything on every change?

Compute and store SHA-256 file hashes, compare filesystem vs indexed hashes, and only upsert new or modified files while deleting removed ones.