home / skills / yonatangross / orchestkit / langgraph-parallel
This skill enables you to design and optimize parallel execution patterns for fan out fan in workflows, map reduce, and independent agents.
npx playbooks add skill yonatangross/orchestkit --skill langgraph-parallelReview the files below or copy the command above to add this skill to your agents.
---
name: langgraph-parallel
description: LangGraph parallel execution patterns. Use when implementing fan-out/fan-in workflows, map-reduce over tasks, or running independent agents concurrently.
tags: [langgraph, parallel, concurrency, fan-out]
context: fork
agent: workflow-architect
version: 1.0.0
author: OrchestKit
user-invocable: false
---
# LangGraph Parallel Execution
Run independent nodes concurrently for performance.
## Fan-Out/Fan-In Pattern
```python
from langgraph.graph import StateGraph
def fan_out(state):
"""Split work into parallel tasks."""
state["tasks"] = [{"id": 1}, {"id": 2}, {"id": 3}]
return state
def worker(state):
"""Process one task."""
task = state["current_task"]
result = process(task)
return {"results": [result]}
def fan_in(state):
"""Combine parallel results."""
combined = aggregate(state["results"])
return {"final": combined}
workflow = StateGraph(State)
workflow.add_node("fan_out", fan_out)
workflow.add_node("worker", worker)
workflow.add_node("fan_in", fan_in)
workflow.add_edge("fan_out", "worker")
workflow.add_edge("worker", "fan_in") # Waits for all workers
```
## Using Send API
```python
from langgraph.constants import Send
def router(state):
"""Route to multiple workers in parallel."""
return [
Send("worker", {"task": task})
for task in state["tasks"]
]
workflow.add_conditional_edges("router", router)
```
## Parallel Agent Analysis
```python
from typing import Annotated
from operator import add
class AnalysisState(TypedDict):
content: str
findings: Annotated[list[dict], add] # Accumulates
async def run_parallel_agents(state: AnalysisState):
"""Run multiple agents in parallel."""
agents = [security_agent, tech_agent, quality_agent]
# Run all concurrently
tasks = [agent.analyze(state["content"]) for agent in agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter successful results
findings = [r for r in results if not isinstance(r, Exception)]
return {"findings": findings}
```
## Map-Reduce Pattern
```python
def map_node(state):
"""Map: Process each item independently."""
items = state["items"]
results = []
for item in items:
result = process_item(item)
results.append(result)
return {"mapped_results": results}
def reduce_node(state):
"""Reduce: Combine all results."""
results = state["mapped_results"]
summary = {
"total": len(results),
"passed": sum(1 for r in results if r["passed"]),
"failed": sum(1 for r in results if not r["passed"])
}
return {"summary": summary}
```
## Error Isolation
```python
async def parallel_with_isolation(tasks: list):
"""Run parallel tasks, isolate failures."""
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = []
failures = []
for task, result in zip(tasks, results):
if isinstance(result, Exception):
failures.append({"task": task, "error": str(result)})
else:
successes.append(result)
return {"successes": successes, "failures": failures}
```
## Timeout per Branch
```python
import asyncio
async def parallel_with_timeout(agents: list, content: str, timeout: int = 30):
"""Run agents with per-agent timeout."""
async def run_with_timeout(agent):
try:
return await asyncio.wait_for(
agent.analyze(content),
timeout=timeout
)
except asyncio.TimeoutError:
return {"agent": agent.name, "error": "timeout"}
tasks = [run_with_timeout(a) for a in agents]
return await asyncio.gather(*tasks)
```
## Key Decisions
| Decision | Recommendation |
|----------|----------------|
| Max parallel | 5-10 concurrent (avoid overwhelming APIs) |
| Error handling | return_exceptions=True (don't fail all) |
| Timeout | 30-60s per branch |
| Accumulator | Use `Annotated[list, add]` for results |
## Common Mistakes
- No error isolation (one failure kills all)
- No timeout (one slow branch blocks)
- Sequential where parallel possible
- Forgetting to wait for all branches
## Related Skills
- `langgraph-state` - Accumulating state
- `multi-agent-orchestration` - Coordination patterns
- `langgraph-supervisor` - Supervised parallel execution
## Capability Details
### fanout-pattern
**Keywords:** fanout, parallel, concurrent, scatter
**Solves:**
- Run agents in parallel
- Implement fan-out pattern
- Distribute work across workers
### fanin-pattern
**Keywords:** fanin, gather, aggregate, collect
**Solves:**
- Aggregate parallel results
- Implement fan-in pattern
- Collect worker outputs
### parallel-template
**Keywords:** template, implementation, parallel, agent
**Solves:**
- Parallel agent fanout template
- Production-ready code
- Copy-paste implementation
This skill provides production-ready LangGraph patterns for parallel execution: fan-out/fan-in, map-reduce, and concurrent agent runs. It is built for TypeScript projects using LangGraph and focuses on safe, performant parallelism for agents and workers. Use it to scale independent tasks, run multiple analyzers concurrently, or aggregate results reliably.
The skill supplies node patterns and helper functions that split work into parallel branches, run workers concurrently, and then gather results into a single state. It demonstrates using Send-style routing, asyncio-style concurrent execution with timeout and isolation, and accumulator patterns for safe merging. Decisions like max concurrency, per-branch timeout, and return_exceptions are applied to avoid cascading failures and API overload.
How many parallel workers should I run?
Aim for 5–10 concurrent branches as a practical default. Increase only after load-testing and verifying API rate limits and cost impact.
What happens if one parallel branch fails?
Use error isolation (gather with return_exceptions) to capture failures per-branch. Collect successes and failures separately and decide whether to retry, skip, or include partial results in aggregation.