home / skills / yonatangross / orchestkit / langgraph-parallel

langgraph-parallel skill

/plugins/ork/skills/langgraph-parallel

This skill enables you to design and optimize parallel execution patterns for fan out fan in workflows, map reduce, and independent agents.

npx playbooks add skill yonatangross/orchestkit --skill langgraph-parallel

Review the files below or copy the command above to add this skill to your agents.

Files (6)
SKILL.md
4.8 KB
---
name: langgraph-parallel
description: LangGraph parallel execution patterns. Use when implementing fan-out/fan-in workflows, map-reduce over tasks, or running independent agents concurrently.
tags: [langgraph, parallel, concurrency, fan-out]
context: fork
agent: workflow-architect
version: 1.0.0
author: OrchestKit
user-invocable: false
---

# LangGraph Parallel Execution

Run independent nodes concurrently for performance.

## Fan-Out/Fan-In Pattern

```python
from langgraph.graph import StateGraph

def fan_out(state):
    """Split work into parallel tasks."""
    state["tasks"] = [{"id": 1}, {"id": 2}, {"id": 3}]
    return state

def worker(state):
    """Process one task."""
    task = state["current_task"]
    result = process(task)
    return {"results": [result]}

def fan_in(state):
    """Combine parallel results."""
    combined = aggregate(state["results"])
    return {"final": combined}

workflow = StateGraph(State)
workflow.add_node("fan_out", fan_out)
workflow.add_node("worker", worker)
workflow.add_node("fan_in", fan_in)

workflow.add_edge("fan_out", "worker")
workflow.add_edge("worker", "fan_in")  # Waits for all workers
```

## Using Send API

```python
from langgraph.constants import Send

def router(state):
    """Route to multiple workers in parallel."""
    return [
        Send("worker", {"task": task})
        for task in state["tasks"]
    ]

workflow.add_conditional_edges("router", router)
```

## Parallel Agent Analysis

```python
from typing import Annotated
from operator import add

class AnalysisState(TypedDict):
    content: str
    findings: Annotated[list[dict], add]  # Accumulates

async def run_parallel_agents(state: AnalysisState):
    """Run multiple agents in parallel."""
    agents = [security_agent, tech_agent, quality_agent]

    # Run all concurrently
    tasks = [agent.analyze(state["content"]) for agent in agents]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Filter successful results
    findings = [r for r in results if not isinstance(r, Exception)]

    return {"findings": findings}
```

## Map-Reduce Pattern

```python
def map_node(state):
    """Map: Process each item independently."""
    items = state["items"]
    results = []

    for item in items:
        result = process_item(item)
        results.append(result)

    return {"mapped_results": results}

def reduce_node(state):
    """Reduce: Combine all results."""
    results = state["mapped_results"]

    summary = {
        "total": len(results),
        "passed": sum(1 for r in results if r["passed"]),
        "failed": sum(1 for r in results if not r["passed"])
    }

    return {"summary": summary}
```

## Error Isolation

```python
async def parallel_with_isolation(tasks: list):
    """Run parallel tasks, isolate failures."""
    results = await asyncio.gather(*tasks, return_exceptions=True)

    successes = []
    failures = []

    for task, result in zip(tasks, results):
        if isinstance(result, Exception):
            failures.append({"task": task, "error": str(result)})
        else:
            successes.append(result)

    return {"successes": successes, "failures": failures}
```

## Timeout per Branch

```python
import asyncio

async def parallel_with_timeout(agents: list, content: str, timeout: int = 30):
    """Run agents with per-agent timeout."""
    async def run_with_timeout(agent):
        try:
            return await asyncio.wait_for(
                agent.analyze(content),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            return {"agent": agent.name, "error": "timeout"}

    tasks = [run_with_timeout(a) for a in agents]
    return await asyncio.gather(*tasks)
```

## Key Decisions

| Decision | Recommendation |
|----------|----------------|
| Max parallel | 5-10 concurrent (avoid overwhelming APIs) |
| Error handling | return_exceptions=True (don't fail all) |
| Timeout | 30-60s per branch |
| Accumulator | Use `Annotated[list, add]` for results |

## Common Mistakes

- No error isolation (one failure kills all)
- No timeout (one slow branch blocks)
- Sequential where parallel possible
- Forgetting to wait for all branches

## Related Skills

- `langgraph-state` - Accumulating state
- `multi-agent-orchestration` - Coordination patterns
- `langgraph-supervisor` - Supervised parallel execution

## Capability Details

### fanout-pattern
**Keywords:** fanout, parallel, concurrent, scatter
**Solves:**
- Run agents in parallel
- Implement fan-out pattern
- Distribute work across workers

### fanin-pattern
**Keywords:** fanin, gather, aggregate, collect
**Solves:**
- Aggregate parallel results
- Implement fan-in pattern
- Collect worker outputs

### parallel-template
**Keywords:** template, implementation, parallel, agent
**Solves:**
- Parallel agent fanout template
- Production-ready code
- Copy-paste implementation

Overview

This skill provides production-ready LangGraph patterns for parallel execution: fan-out/fan-in, map-reduce, and concurrent agent runs. It is built for TypeScript projects using LangGraph and focuses on safe, performant parallelism for agents and workers. Use it to scale independent tasks, run multiple analyzers concurrently, or aggregate results reliably.

How this skill works

The skill supplies node patterns and helper functions that split work into parallel branches, run workers concurrently, and then gather results into a single state. It demonstrates using Send-style routing, asyncio-style concurrent execution with timeout and isolation, and accumulator patterns for safe merging. Decisions like max concurrency, per-branch timeout, and return_exceptions are applied to avoid cascading failures and API overload.

When to use it

  • Implement fan-out/fan-in workflows to parallelize independent work
  • Run multiple analysis agents (security, quality, tech) concurrently
  • Perform map-reduce over a collection of items for faster processing
  • Isolate and aggregate partial failures without aborting the whole flow
  • Enforce per-branch timeouts when integrating with external LLM APIs

Best practices

  • Limit concurrent branches (recommend 5–10) to avoid throttling and cost spikes
  • Use error isolation (gather with return_exceptions) so single failures don’t kill the workflow
  • Apply per-branch timeouts (30–60s) to prevent slow branches from blocking aggregation
  • Accumulate results with an explicit accumulator (e.g., Annotated[list, add]) to safely merge outputs
  • Validate and sanitize worker outputs before reduce/aggregate steps

Example use cases

  • Security, technical, and quality agents run in parallel to produce independent findings, then aggregated into a single report
  • Fan-out a list of tasks to worker nodes, process concurrently, and fan-in to compute summary metrics
  • Map over thousands of items in chunks using parallel workers and then reduce to aggregated statistics
  • Run multiple generative models or prompt variants in parallel with timeouts and choose the best response based on scoring
  • Execute supervised parallel runs where a supervisor node restarts failed branches or applies retries

FAQ

How many parallel workers should I run?

Aim for 5–10 concurrent branches as a practical default. Increase only after load-testing and verifying API rate limits and cost impact.

What happens if one parallel branch fails?

Use error isolation (gather with return_exceptions) to capture failures per-branch. Collect successes and failures separately and decide whether to retry, skip, or include partial results in aggregation.