home / skills / bobmatnyc / claude-mpm-skills / langgraph

langgraph skill

/toolchains/ai/frameworks/langgraph

This skill enables building complex multi-agent workflows with state persistence, human-in-the-loop, and time-travel debugging using LangGraph.

npx playbooks add skill bobmatnyc/claude-mpm-skills --skill langgraph

Review the files below or copy the command above to add this skill to your agents.

Files (2)
SKILL.md
57.7 KB
---
name: langgraph
description: LangGraph framework for building stateful, multi-agent AI applications with cyclical workflows, human-in-the-loop patterns, and persistent checkpointing.
progressive_disclosure:
  entry_point:
    - summary
    - when_to_use
    - quick_start
  sections:
    - core_concepts
    - graph_construction
    - state_management
    - conditional_routing
    - multi_agent_patterns
    - human_in_loop
    - checkpointing_persistence
    - time_travel_debugging
    - streaming
    - tool_integration
    - error_handling
    - subgraphs_composition
    - map_reduce
    - production_deployment
    - langsmith_integration
    - real_world_examples
    - performance_optimization
    - comparison_simple_chains
    - migration_langchain
    - best_practices
estimated_tokens:
  entry: 80
  full: 5800
---

# LangGraph Workflows

## Summary

LangGraph is a framework for building **stateful, multi-agent applications** with LLMs. It implements state machines and directed graphs for orchestration, enabling complex workflows with persistent state management, human-in-the-loop support, and time-travel debugging.

**Key Innovation**: Transforms agent coordination from sequential chains into cyclic graphs with persistent state, conditional branching, and production-grade debugging capabilities.

## When to Use

✅ **Use LangGraph When**:
- Multi-agent coordination required
- Complex state management needs
- Human-in-the-loop workflows (approval gates, reviews)
- Need debugging/observability (time-travel, replay)
- Conditional branching based on outputs
- Building production agent systems
- State persistence across sessions

❌ **Don't Use LangGraph When**:
- Simple single-agent tasks
- No state persistence needed
- Prototyping/experimentation phase (use simple chains)
- Team lacks graph/state machine expertise
- Stateless request-response patterns

## Quick Start

```python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

# 1. Define state schema
class AgentState(TypedDict):
    messages: Annotated[list, operator.add]
    current_step: str

# 2. Create graph
workflow = StateGraph(AgentState)

# 3. Add nodes (agents)
def researcher(state):
    return {"messages": ["Research complete"], "current_step": "research"}

def writer(state):
    return {"messages": ["Article written"], "current_step": "writing"}

workflow.add_node("researcher", researcher)
workflow.add_node("writer", writer)

# 4. Add edges (transitions)
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", END)

# 5. Set entry point and compile
workflow.set_entry_point("researcher")
app = workflow.compile()

# 6. Execute
result = app.invoke({"messages": [], "current_step": "start"})
print(result)
```

---

## Core Concepts

### StateGraph

The fundamental building block representing a directed graph of agents with shared state.

```python
from langgraph.graph import StateGraph, END
from typing import TypedDict

class AgentState(TypedDict):
    """State schema shared across all nodes."""
    messages: list
    user_input: str
    final_output: str
    metadata: dict

# Create graph with state schema
workflow = StateGraph(AgentState)
```

**Key Properties**:
- **Nodes**: Agent functions that transform state
- **Edges**: Transitions between nodes (static or conditional)
- **State**: Shared data structure passed between nodes
- **Entry Point**: Starting node of execution
- **END**: Terminal node signaling completion

### Nodes

Nodes are functions that receive current state and return state updates.

```python
def research_agent(state: AgentState) -> dict:
    """Node function: receives state, returns updates."""
    query = state["user_input"]

    # Perform research (simplified)
    results = search_web(query)

    # Return state updates (partial state)
    return {
        "messages": state["messages"] + [f"Research: {results}"],
        "metadata": {"research_complete": True}
    }

# Add node to graph
workflow.add_node("researcher", research_agent)
```

**Node Behavior**:
- Receives **full state** as input
- Returns **partial state** (only fields to update)
- Can be sync or async functions
- Can invoke LLMs, call APIs, run computations

### Edges

Edges define transitions between nodes.

#### Static Edges

```python
# Direct transition: researcher → writer
workflow.add_edge("researcher", "writer")

# Transition to END
workflow.add_edge("writer", END)
```

#### Conditional Edges

```python
def should_continue(state: AgentState) -> str:
    """Routing function: decides next node based on state."""
    last_message = state["messages"][-1]

    if "APPROVED" in last_message:
        return END
    elif "NEEDS_REVISION" in last_message:
        return "writer"
    else:
        return "reviewer"

workflow.add_conditional_edges(
    "reviewer",  # Source node
    should_continue,  # Routing function
    {
        END: END,
        "writer": "writer",
        "reviewer": "reviewer"
    }
)
```

---

## Graph Construction

### Complete Workflow Example

```python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_anthropic import ChatAnthropic

# State schema with reducer
class ResearchState(TypedDict):
    topic: str
    research_notes: Annotated[list, operator.add]  # Reducer: appends to list
    draft: str
    revision_count: int
    approved: bool

# Initialize LLM
llm = ChatAnthropic(model="claude-sonnet-4")

# Node 1: Research
def research_node(state: ResearchState) -> dict:
    """Research the topic and gather information."""
    topic = state["topic"]

    prompt = f"Research key points about: {topic}"
    response = llm.invoke(prompt)

    return {
        "research_notes": [response.content]
    }

# Node 2: Write
def write_node(state: ResearchState) -> dict:
    """Write draft based on research."""
    notes = "\n".join(state["research_notes"])

    prompt = f"Write article based on:\n{notes}"
    response = llm.invoke(prompt)

    return {
        "draft": response.content,
        "revision_count": state.get("revision_count", 0)
    }

# Node 3: Review
def review_node(state: ResearchState) -> dict:
    """Review the draft and decide if approved."""
    draft = state["draft"]

    prompt = f"Review this draft. Reply APPROVED or NEEDS_REVISION:\n{draft}"
    response = llm.invoke(prompt)

    approved = "APPROVED" in response.content

    return {
        "approved": approved,
        "revision_count": state["revision_count"] + 1,
        "research_notes": [f"Review feedback: {response.content}"]
    }

# Routing logic
def should_continue_writing(state: ResearchState) -> str:
    """Decide next step after review."""
    if state["approved"]:
        return END
    elif state["revision_count"] >= 3:
        return END  # Max revisions reached
    else:
        return "writer"

# Build graph
workflow = StateGraph(ResearchState)

workflow.add_node("researcher", research_node)
workflow.add_node("writer", write_node)
workflow.add_node("reviewer", review_node)

# Static edges
workflow.add_edge("researcher", "writer")
workflow.add_edge("writer", "reviewer")

# Conditional edge from reviewer
workflow.add_conditional_edges(
    "reviewer",
    should_continue_writing,
    {
        END: END,
        "writer": "writer"
    }
)

workflow.set_entry_point("researcher")

# Compile
app = workflow.compile()

# Execute
result = app.invoke({
    "topic": "AI Safety",
    "research_notes": [],
    "draft": "",
    "revision_count": 0,
    "approved": False
})

print(f"Final draft: {result['draft']}")
print(f"Revisions: {result['revision_count']}")
```

---

## State Management

### State Schema with TypedDict

```python
from typing import TypedDict, Annotated, Literal
import operator

class WorkflowState(TypedDict):
    # Simple fields (replaced on update)
    user_id: str
    request_id: str
    status: Literal["pending", "processing", "complete", "failed"]

    # List with reducer (appends instead of replacing)
    messages: Annotated[list, operator.add]

    # Dict with custom reducer
    metadata: Annotated[dict, lambda x, y: {**x, **y}]

    # Optional fields
    error: str | None
    result: dict | None
```

### State Reducers

Reducers control how state updates are merged.

```python
import operator
from typing import Annotated

# Built-in reducers
Annotated[list, operator.add]  # Append to list
Annotated[set, operator.or_]   # Union of sets
Annotated[int, operator.add]   # Sum integers

# Custom reducer
def merge_dicts(existing: dict, update: dict) -> dict:
    """Deep merge dictionaries."""
    result = existing.copy()
    for key, value in update.items():
        if key in result and isinstance(result[key], dict) and isinstance(value, dict):
            result[key] = merge_dicts(result[key], value)
        else:
            result[key] = value
    return result

class State(TypedDict):
    config: Annotated[dict, merge_dicts]
```

### State Updates

Nodes return **partial state** - only fields to update:

```python
def node_function(state: State) -> dict:
    """Return only fields that should be updated."""
    return {
        "messages": ["New message"],  # Will be appended
        "status": "processing"  # Will be replaced
    }
    # Other fields remain unchanged
```

---

## Conditional Routing

### Basic Routing Function

```python
def route_based_on_score(state: State) -> str:
    """Route to different nodes based on state."""
    score = state["quality_score"]

    if score >= 0.9:
        return "publish"
    elif score >= 0.6:
        return "review"
    else:
        return "revise"

workflow.add_conditional_edges(
    "evaluator",
    route_based_on_score,
    {
        "publish": "publisher",
        "review": "reviewer",
        "revise": "reviser"
    }
)
```

### Dynamic Routing with LLM

```python
from langchain_anthropic import ChatAnthropic

def llm_router(state: State) -> str:
    """Use LLM to decide next step."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    prompt = f"""
    Current state: {state['current_step']}
    User request: {state['user_input']}

    Which agent should handle this next?
    Options: researcher, coder, writer, FINISH

    Return only one word.
    """

    response = llm.invoke(prompt)
    next_agent = response.content.strip().lower()

    if next_agent == "finish":
        return END
    else:
        return next_agent

workflow.add_conditional_edges(
    "supervisor",
    llm_router,
    {
        "researcher": "researcher",
        "coder": "coder",
        "writer": "writer",
        END: END
    }
)
```

### Multi-Condition Routing

```python
def complex_router(state: State) -> str:
    """Route based on multiple conditions."""
    # Check multiple conditions
    has_errors = bool(state.get("errors"))
    is_approved = state.get("approved", False)
    iteration_count = state.get("iterations", 0)

    # Priority-based routing
    if has_errors:
        return "error_handler"
    elif is_approved:
        return END
    elif iteration_count >= 5:
        return "escalation"
    else:
        return "processor"

workflow.add_conditional_edges(
    "validator",
    complex_router,
    {
        "error_handler": "error_handler",
        "processor": "processor",
        "escalation": "escalation",
        END: END
    }
)
```

---

## Multi-Agent Patterns

### Supervisor Pattern

One supervisor coordinates multiple specialized agents.

```python
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from typing import TypedDict, Literal

class SupervisorState(TypedDict):
    task: str
    agent_history: list
    result: dict
    next_agent: str

# Specialized agents
class ResearchAgent:
    def __init__(self):
        self.llm = ChatAnthropic(model="claude-sonnet-4")

    def run(self, state: SupervisorState) -> dict:
        response = self.llm.invoke(f"Research: {state['task']}")
        return {
            "agent_history": [f"Research: {response.content}"],
            "result": {"research": response.content}
        }

class CodingAgent:
    def __init__(self):
        self.llm = ChatAnthropic(model="claude-sonnet-4")

    def run(self, state: SupervisorState) -> dict:
        response = self.llm.invoke(f"Code: {state['task']}")
        return {
            "agent_history": [f"Code: {response.content}"],
            "result": {"code": response.content}
        }

class SupervisorAgent:
    def __init__(self):
        self.llm = ChatAnthropic(model="claude-sonnet-4")

    def route(self, state: SupervisorState) -> dict:
        """Decide which agent to use next."""
        history = "\n".join(state.get("agent_history", []))

        prompt = f"""
        Task: {state['task']}
        Progress: {history}

        Which agent should handle the next step?
        Options: researcher, coder, FINISH

        Return only one word.
        """

        response = self.llm.invoke(prompt)
        next_agent = response.content.strip().lower()

        return {"next_agent": next_agent}

# Build supervisor workflow
def create_supervisor_workflow():
    workflow = StateGraph(SupervisorState)

    # Initialize agents
    research_agent = ResearchAgent()
    coding_agent = CodingAgent()
    supervisor = SupervisorAgent()

    # Add nodes
    workflow.add_node("supervisor", supervisor.route)
    workflow.add_node("researcher", research_agent.run)
    workflow.add_node("coder", coding_agent.run)

    # Conditional routing from supervisor
    def route_from_supervisor(state: SupervisorState) -> str:
        next_agent = state.get("next_agent", "FINISH")
        if next_agent == "finish":
            return END
        return next_agent

    workflow.add_conditional_edges(
        "supervisor",
        route_from_supervisor,
        {
            "researcher": "researcher",
            "coder": "coder",
            END: END
        }
    )

    # Loop back to supervisor after each agent
    workflow.add_edge("researcher", "supervisor")
    workflow.add_edge("coder", "supervisor")

    workflow.set_entry_point("supervisor")

    return workflow.compile()

# Execute
app = create_supervisor_workflow()
result = app.invoke({
    "task": "Build a REST API for user management",
    "agent_history": [],
    "result": {},
    "next_agent": ""
})
```

### Hierarchical Multi-Agent

Nested supervisor pattern with sub-teams.

```python
class TeamState(TypedDict):
    task: str
    team_results: dict

def create_backend_team():
    """Sub-graph for backend development."""
    workflow = StateGraph(TeamState)

    workflow.add_node("api_designer", design_api)
    workflow.add_node("database_designer", design_db)
    workflow.add_node("implementer", implement_backend)

    workflow.add_edge("api_designer", "database_designer")
    workflow.add_edge("database_designer", "implementer")
    workflow.add_edge("implementer", END)

    workflow.set_entry_point("api_designer")

    return workflow.compile()

def create_frontend_team():
    """Sub-graph for frontend development."""
    workflow = StateGraph(TeamState)

    workflow.add_node("ui_designer", design_ui)
    workflow.add_node("component_builder", build_components)
    workflow.add_node("integrator", integrate_frontend)

    workflow.add_edge("ui_designer", "component_builder")
    workflow.add_edge("component_builder", "integrator")
    workflow.add_edge("integrator", END)

    workflow.set_entry_point("ui_designer")

    return workflow.compile()

# Top-level coordinator
def create_project_workflow():
    workflow = StateGraph(TeamState)

    # Add team sub-graphs as nodes
    backend_team = create_backend_team()
    frontend_team = create_frontend_team()

    workflow.add_node("backend_team", backend_team)
    workflow.add_node("frontend_team", frontend_team)
    workflow.add_node("integrator", integrate_teams)

    # Parallel execution of teams
    workflow.add_edge("backend_team", "integrator")
    workflow.add_edge("frontend_team", "integrator")
    workflow.add_edge("integrator", END)

    workflow.set_entry_point("backend_team")

    return workflow.compile()
```

### Swarm Pattern (2025)

Dynamic agent hand-offs with collaborative decision-making.

```python
from typing import Literal

class SwarmState(TypedDict):
    task: str
    messages: list
    current_agent: str
    handoff_reason: str

def create_swarm():
    """Agents can dynamically hand off to each other."""

    def research_agent(state: SwarmState) -> dict:
        llm = ChatAnthropic(model="claude-sonnet-4")

        # Perform research
        response = llm.invoke(f"Research: {state['task']}")

        # Decide if handoff needed
        needs_code = "implementation" in response.content.lower()

        if needs_code:
            return {
                "messages": [response.content],
                "current_agent": "coder",
                "handoff_reason": "Need implementation"
            }
        else:
            return {
                "messages": [response.content],
                "current_agent": "writer",
                "handoff_reason": "Ready for documentation"
            }

    def coding_agent(state: SwarmState) -> dict:
        llm = ChatAnthropic(model="claude-sonnet-4")

        response = llm.invoke(f"Implement: {state['task']}")

        return {
            "messages": [response.content],
            "current_agent": "tester",
            "handoff_reason": "Code complete, needs testing"
        }

    def tester_agent(state: SwarmState) -> dict:
        # Test the code
        tests_pass = True  # Simplified

        if tests_pass:
            return {
                "messages": ["Tests passed"],
                "current_agent": "DONE",
                "handoff_reason": "All tests passing"
            }
        else:
            return {
                "messages": ["Tests failed"],
                "current_agent": "coder",
                "handoff_reason": "Fix failing tests"
            }

    # Build graph
    workflow = StateGraph(SwarmState)

    workflow.add_node("researcher", research_agent)
    workflow.add_node("coder", coding_agent)
    workflow.add_node("tester", tester_agent)

    # Dynamic routing based on current_agent
    def route_swarm(state: SwarmState) -> str:
        next_agent = state.get("current_agent", "DONE")
        if next_agent == "DONE":
            return END
        return next_agent

    workflow.add_conditional_edges(
        "researcher",
        route_swarm,
        {"coder": "coder", "writer": "writer"}
    )

    workflow.add_conditional_edges(
        "coder",
        route_swarm,
        {"tester": "tester"}
    )

    workflow.add_conditional_edges(
        "tester",
        route_swarm,
        {"coder": "coder", END: END}
    )

    workflow.set_entry_point("researcher")

    return workflow.compile()
```

---

## Human-in-the-Loop

### Interrupt for Approval

```python
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, END

# Enable checkpointing for interrupts
memory = SqliteSaver.from_conn_string(":memory:")

def create_approval_workflow():
    workflow = StateGraph(dict)

    workflow.add_node("draft", create_draft)
    workflow.add_node("approve", human_approval)  # Interrupt point
    workflow.add_node("publish", publish_content)

    workflow.add_edge("draft", "approve")
    workflow.add_edge("approve", "publish")
    workflow.add_edge("publish", END)

    workflow.set_entry_point("draft")

    # Compile with interrupt before "approve"
    return workflow.compile(
        checkpointer=memory,
        interrupt_before=["approve"]  # Pause here
    )

# Usage
app = create_approval_workflow()
config = {"configurable": {"thread_id": "1"}}

# Step 1: Run until interrupt
for event in app.stream({"content": "Draft article..."}, config):
    print(event)
    # Workflow pauses at "approve" node

# Human reviews draft
print("Draft ready for review. Approve? (y/n)")
approval = input()

# Step 2: Resume with approval decision
if approval == "y":
    # Continue execution
    result = app.invoke(None, config)  # Resume from checkpoint
    print("Published:", result)
else:
    # Optionally update state and retry
    app.update_state(config, {"needs_revision": True})
    result = app.invoke(None, config)
```

### Interactive Approval Gates

```python
class ApprovalState(TypedDict):
    draft: str
    approved: bool
    feedback: str

def approval_node(state: ApprovalState) -> dict:
    """This node requires human interaction."""
    # This is where workflow pauses
    # Human provides input through update_state
    return state  # No automatic changes

def create_interactive_workflow():
    workflow = StateGraph(ApprovalState)

    workflow.add_node("writer", write_draft)
    workflow.add_node("approval_gate", approval_node)
    workflow.add_node("reviser", revise_draft)
    workflow.add_node("publisher", publish_final)

    workflow.add_edge("writer", "approval_gate")

    # Route based on approval
    def route_after_approval(state: ApprovalState) -> str:
        if state.get("approved"):
            return "publisher"
        else:
            return "reviser"

    workflow.add_conditional_edges(
        "approval_gate",
        route_after_approval,
        {"publisher": "publisher", "reviser": "reviser"}
    )

    workflow.add_edge("reviser", "approval_gate")  # Loop back
    workflow.add_edge("publisher", END)

    workflow.set_entry_point("writer")

    memory = SqliteSaver.from_conn_string("approvals.db")
    return workflow.compile(
        checkpointer=memory,
        interrupt_before=["approval_gate"]
    )

# Execute with human intervention
app = create_interactive_workflow()
config = {"configurable": {"thread_id": "article_123"}}

# Step 1: Generate draft (pauses at approval_gate)
for event in app.stream({"draft": "", "approved": False, "feedback": ""}, config):
    print(event)

# Step 2: Human reviews and provides feedback
current_state = app.get_state(config)
print(f"Review this draft: {current_state.values['draft']}")

# Human decides
app.update_state(config, {
    "approved": False,
    "feedback": "Add more examples in section 2"
})

# Step 3: Resume (goes to reviser due to approved=False)
result = app.invoke(None, config)
```

---

## Checkpointing and Persistence

### MemorySaver (In-Memory)

```python
from langgraph.checkpoint.memory import MemorySaver

# In-memory checkpointing (lost on restart)
memory = MemorySaver()

app = workflow.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "conversation_1"}}

# State automatically saved at each step
result = app.invoke(initial_state, config)
```

### SQLite Persistence

```python
from langgraph.checkpoint.sqlite import SqliteSaver

# Persistent checkpointing with SQLite
checkpointer = SqliteSaver.from_conn_string("./workflow_state.db")

app = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "user_session_456"}}

# State persists across restarts
result = app.invoke(initial_state, config)

# Later: Resume from same thread
result2 = app.invoke(None, config)  # Continues from last state
```

### PostgreSQL for Production

```python
from langgraph.checkpoint.postgres import PostgresSaver

# Production-grade persistence
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost/langgraph_db"
)

app = workflow.compile(checkpointer=checkpointer)

# Supports concurrent workflows with isolation
config1 = {"configurable": {"thread_id": "user_1"}}
config2 = {"configurable": {"thread_id": "user_2"}}

# Each thread maintains independent state
result1 = app.invoke(state1, config1)
result2 = app.invoke(state2, config2)
```

### Redis for Distributed Systems

```python
from langgraph.checkpoint.redis import RedisSaver

# Distributed checkpointing
checkpointer = RedisSaver.from_conn_string(
    "redis://localhost:6379",
    ttl=3600  # 1 hour TTL
)

app = workflow.compile(checkpointer=checkpointer)

# Supports horizontal scaling
# Multiple workers can process different threads
```

---

## Time-Travel Debugging

### View State History

```python
from langgraph.checkpoint.sqlite import SqliteSaver

checkpointer = SqliteSaver.from_conn_string("workflow.db")
app = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "debug_session"}}

# Run workflow
result = app.invoke(initial_state, config)

# Retrieve full history
history = app.get_state_history(config)

for i, state_snapshot in enumerate(history):
    print(f"Step {i}:")
    print(f"  Node: {state_snapshot.next}")
    print(f"  State: {state_snapshot.values}")
    print(f"  Metadata: {state_snapshot.metadata}")
    print()
```

### Replay from Checkpoint

```python
# Get state at specific step
history = list(app.get_state_history(config))
checkpoint_3 = history[3]  # Get state at step 3

# Replay from that checkpoint
config_replay = {
    "configurable": {
        "thread_id": "debug_session",
        "checkpoint_id": checkpoint_3.config["configurable"]["checkpoint_id"]
    }
}

# Resume from step 3
result = app.invoke(None, config_replay)
```

### Rewind and Modify

```python
# Rewind to step 5
history = list(app.get_state_history(config))
step_5 = history[5]

# Update state at that point
app.update_state(
    config,
    {"messages": ["Modified message"]},
    as_node="researcher"  # Apply update as if from this node
)

# Continue execution with modified state
result = app.invoke(None, config)
```

### Debug Workflow

```python
def debug_workflow():
    """Debug workflow step-by-step."""
    checkpointer = SqliteSaver.from_conn_string("debug.db")
    app = workflow.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "debug"}}

    # Execute step-by-step
    state = initial_state
    for step in range(10):  # Max 10 steps
        print(f"\n=== Step {step} ===")

        # Get current state
        current = app.get_state(config)
        print(f"Current state: {current.values}")
        print(f"Next node: {current.next}")

        if not current.next:
            print("Workflow complete")
            break

        # Execute one step
        for event in app.stream(None, config):
            print(f"Event: {event}")

        # Pause for inspection
        input("Press Enter to continue...")

    # View full history
    print("\n=== Full History ===")
    for i, snapshot in enumerate(app.get_state_history(config)):
        print(f"Step {i}: {snapshot.next} -> {snapshot.values}")

debug_workflow()
```

---

## Streaming

### Token Streaming

```python
from langchain_anthropic import ChatAnthropic

def streaming_node(state: dict) -> dict:
    """Node that streams LLM output."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    # Use streaming
    response = ""
    for chunk in llm.stream(state["prompt"]):
        content = chunk.content
        print(content, end="", flush=True)
        response += content

    return {"response": response}

# Stream events from workflow
for event in app.stream(initial_state):
    print(event)
```

### Event Streaming

```python
# Stream all events (node execution, state updates)
for event in app.stream(initial_state, stream_mode="updates"):
    node_name = event.keys()[0]
    state_update = event[node_name]
    print(f"Node '{node_name}' updated state: {state_update}")
```

### Custom Streaming

```python
from typing import AsyncGenerator

async def streaming_workflow(input_data: dict) -> AsyncGenerator:
    """Stream workflow progress in real-time."""
    config = {"configurable": {"thread_id": "stream"}}

    async for event in app.astream(input_data, config):
        # Stream each state update
        yield {
            "type": "state_update",
            "data": event
        }

    # Stream final result
    final_state = app.get_state(config)
    yield {
        "type": "complete",
        "data": final_state.values
    }

# Usage in FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app_api = FastAPI()

@app_api.post("/workflow/stream")
async def stream_workflow(input: dict):
    return StreamingResponse(
        streaming_workflow(input),
        media_type="text/event-stream"
    )
```

---

## Tool Integration

### LangChain Tools

```python
from langchain.tools import Tool
from langchain_community.tools import DuckDuckGoSearchRun
from langchain.agents import AgentExecutor, create_react_agent

# Define tools
search = DuckDuckGoSearchRun()

tools = [
    Tool(
        name="Search",
        func=search.run,
        description="Search the web for information"
    )
]

def agent_with_tools(state: dict) -> dict:
    """Node that uses LangChain tools."""
    from langchain_anthropic import ChatAnthropic

    llm = ChatAnthropic(model="claude-sonnet-4")

    # Create agent with tools
    agent = create_react_agent(llm, tools, prompt_template)
    agent_executor = AgentExecutor(agent=agent, tools=tools)

    # Execute with tools
    result = agent_executor.invoke({"input": state["query"]})

    return {"result": result["output"]}

# Add to graph
workflow.add_node("agent_with_tools", agent_with_tools)
```

### Custom Tools

```python
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field

class CalculatorInput(BaseModel):
    expression: str = Field(description="Mathematical expression to evaluate")

def calculate(expression: str) -> str:
    """Evaluate mathematical expression."""
    try:
        result = eval(expression)  # Simplified
        return f"Result: {result}"
    except Exception as e:
        return f"Error: {e}"

calculator_tool = StructuredTool.from_function(
    func=calculate,
    name="Calculator",
    description="Evaluate mathematical expressions",
    args_schema=CalculatorInput
)

tools = [calculator_tool]

def tool_using_node(state: dict) -> dict:
    """Use custom tools."""
    result = calculator_tool.invoke({"expression": state["math_problem"]})
    return {"solution": result}
```

### Anthropic Tool Use

```python
from anthropic import Anthropic
import json

def node_with_anthropic_tools(state: dict) -> dict:
    """Use Anthropic's tool use feature."""
    client = Anthropic()

    # Define tools
    tools = [
        {
            "name": "get_weather",
            "description": "Get weather for a location",
            "input_schema": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"}
                },
                "required": ["location"]
            }
        }
    ]

    # First call: Request tool use
    response = client.messages.create(
        model="claude-sonnet-4",
        max_tokens=1024,
        tools=tools,
        messages=[{"role": "user", "content": state["query"]}]
    )

    # Execute tool if requested
    if response.stop_reason == "tool_use":
        tool_use = next(
            block for block in response.content
            if block.type == "tool_use"
        )

        # Execute tool
        tool_result = execute_tool(tool_use.name, tool_use.input)

        # Second call: Provide tool result
        final_response = client.messages.create(
            model="claude-sonnet-4",
            max_tokens=1024,
            tools=tools,
            messages=[
                {"role": "user", "content": state["query"]},
                {"role": "assistant", "content": response.content},
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "tool_result",
                            "tool_use_id": tool_use.id,
                            "content": str(tool_result)
                        }
                    ]
                }
            ]
        )

        return {"response": final_response.content[0].text}

    return {"response": response.content[0].text}
```

---

## Error Handling

### Try-Catch in Nodes

```python
def robust_node(state: dict) -> dict:
    """Node with error handling."""
    try:
        # Main logic
        result = risky_operation(state["input"])

        return {
            "result": result,
            "error": None,
            "status": "success"
        }

    except ValueError as e:
        # Handle specific error
        return {
            "error": f"Validation error: {e}",
            "status": "validation_failed"
        }

    except Exception as e:
        # Handle unexpected errors
        return {
            "error": f"Unexpected error: {e}",
            "status": "failed"
        }

# Route based on error status
def error_router(state: dict) -> str:
    status = state.get("status")

    if status == "success":
        return "next_step"
    elif status == "validation_failed":
        return "validator"
    else:
        return "error_handler"

workflow.add_conditional_edges(
    "robust_node",
    error_router,
    {
        "next_step": "next_step",
        "validator": "validator",
        "error_handler": "error_handler"
    }
)
```

### Retry Logic

```python
from typing import TypedDict

class RetryState(TypedDict):
    input: str
    result: str
    error: str | None
    retry_count: int
    max_retries: int

def node_with_retry(state: RetryState) -> dict:
    """Node that can be retried on failure."""
    try:
        result = unstable_operation(state["input"])

        return {
            "result": result,
            "error": None,
            "retry_count": 0
        }

    except Exception as e:
        return {
            "error": str(e),
            "retry_count": state.get("retry_count", 0) + 1
        }

def should_retry(state: RetryState) -> str:
    """Decide whether to retry or fail."""
    if state.get("error") is None:
        return "success"

    retry_count = state.get("retry_count", 0)
    max_retries = state.get("max_retries", 3)

    if retry_count < max_retries:
        return "retry"
    else:
        return "failed"

# Build retry loop
workflow.add_node("operation", node_with_retry)
workflow.add_node("success_handler", handle_success)
workflow.add_node("failure_handler", handle_failure)

workflow.add_conditional_edges(
    "operation",
    should_retry,
    {
        "success": "success_handler",
        "retry": "operation",  # Loop back
        "failed": "failure_handler"
    }
)
```

### Fallback Strategies

```python
def node_with_fallback(state: dict) -> dict:
    """Try primary method, fall back to secondary."""
    # Try primary LLM
    try:
        result = primary_llm(state["prompt"])
        return {
            "result": result,
            "method": "primary"
        }
    except Exception as e_primary:
        # Fallback to secondary LLM
        try:
            result = secondary_llm(state["prompt"])
            return {
                "result": result,
                "method": "fallback",
                "primary_error": str(e_primary)
            }
        except Exception as e_secondary:
            # Both failed
            return {
                "error": f"Both methods failed: {e_primary}, {e_secondary}",
                "method": "failed"
            }
```

---

## Subgraphs and Composition

### Subgraphs as Nodes

```python
def create_subgraph():
    """Create reusable subgraph."""
    subgraph = StateGraph(dict)

    subgraph.add_node("step1", step1_func)
    subgraph.add_node("step2", step2_func)

    subgraph.add_edge("step1", "step2")
    subgraph.add_edge("step2", END)

    subgraph.set_entry_point("step1")

    return subgraph.compile()

# Use subgraph in main graph
def create_main_graph():
    workflow = StateGraph(dict)

    # Add subgraph as a node
    subgraph_compiled = create_subgraph()
    workflow.add_node("subgraph", subgraph_compiled)

    workflow.add_node("before", before_func)
    workflow.add_node("after", after_func)

    workflow.add_edge("before", "subgraph")
    workflow.add_edge("subgraph", "after")
    workflow.add_edge("after", END)

    workflow.set_entry_point("before")

    return workflow.compile()
```

### Parallel Subgraphs

```python
from langgraph.graph import StateGraph, END

def create_parallel_workflow():
    """Execute multiple subgraphs in parallel."""

    # Create subgraphs
    backend_graph = create_backend_subgraph()
    frontend_graph = create_frontend_subgraph()
    database_graph = create_database_subgraph()

    # Main graph
    workflow = StateGraph(dict)

    # Add subgraphs as parallel nodes
    workflow.add_node("backend", backend_graph)
    workflow.add_node("frontend", frontend_graph)
    workflow.add_node("database", database_graph)

    # Merge results
    workflow.add_node("merge", merge_results)

    # All subgraphs lead to merge
    workflow.add_edge("backend", "merge")
    workflow.add_edge("frontend", "merge")
    workflow.add_edge("database", "merge")

    workflow.add_edge("merge", END)

    # Set all as entry points (parallel execution)
    workflow.set_entry_point("backend")
    workflow.set_entry_point("frontend")
    workflow.set_entry_point("database")

    return workflow.compile()
```

---

## Map-Reduce Patterns

### Map-Reduce with LangGraph

```python
from typing import TypedDict, List

class MapReduceState(TypedDict):
    documents: List[str]
    summaries: List[str]
    final_summary: str

def map_node(state: MapReduceState) -> dict:
    """Map: Summarize each document."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    summaries = []
    for doc in state["documents"]:
        summary = llm.invoke(f"Summarize: {doc}")
        summaries.append(summary.content)

    return {"summaries": summaries}

def reduce_node(state: MapReduceState) -> dict:
    """Reduce: Combine summaries into final summary."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    combined = "\n".join(state["summaries"])
    final = llm.invoke(f"Combine these summaries:\n{combined}")

    return {"final_summary": final.content}

# Build map-reduce workflow
workflow = StateGraph(MapReduceState)

workflow.add_node("map", map_node)
workflow.add_node("reduce", reduce_node)

workflow.add_edge("map", "reduce")
workflow.add_edge("reduce", END)

workflow.set_entry_point("map")

app = workflow.compile()

# Execute
result = app.invoke({
    "documents": ["Doc 1...", "Doc 2...", "Doc 3..."],
    "summaries": [],
    "final_summary": ""
})
```

### Parallel Map with Batching

```python
import asyncio
from typing import List

async def parallel_map_node(state: dict) -> dict:
    """Map: Process documents in parallel."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    async def summarize(doc: str) -> str:
        response = await llm.ainvoke(f"Summarize: {doc}")
        return response.content

    # Process all documents in parallel
    summaries = await asyncio.gather(
        *[summarize(doc) for doc in state["documents"]]
    )

    return {"summaries": list(summaries)}

# Use async workflow
app = workflow.compile()

# Execute with asyncio
result = asyncio.run(app.ainvoke({
    "documents": ["Doc 1", "Doc 2", "Doc 3"],
    "summaries": [],
    "final_summary": ""
}))
```

---

## Production Deployment

### LangGraph Cloud

```python
# Deploy to LangGraph Cloud (managed service)

# 1. Install LangGraph CLI
# pip install langgraph-cli

# 2. Create langgraph.json config
langgraph_config = {
    "dependencies": ["langchain", "langchain-anthropic"],
    "graphs": {
        "agent": "./graph.py:app"
    },
    "env": {
        "ANTHROPIC_API_KEY": "$ANTHROPIC_API_KEY"
    }
}

# 3. Deploy
# langgraph deploy

# 4. Use deployed graph via API
import requests

response = requests.post(
    "https://your-deployment.langraph.cloud/invoke",
    json={
        "input": {"messages": ["Hello"]},
        "config": {"thread_id": "user_123"}
    },
    headers={"Authorization": "Bearer YOUR_API_KEY"}
)

result = response.json()
```

### Self-Hosted with FastAPI

```python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langgraph.checkpoint.postgres import PostgresSaver
import uvicorn

# Create FastAPI app
app_api = FastAPI()

# Initialize workflow
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost/langgraph"
)
workflow_app = workflow.compile(checkpointer=checkpointer)

class WorkflowRequest(BaseModel):
    input: dict
    thread_id: str

class WorkflowResponse(BaseModel):
    output: dict
    thread_id: str

@app_api.post("/invoke", response_model=WorkflowResponse)
async def invoke_workflow(request: WorkflowRequest):
    """Invoke workflow synchronously."""
    try:
        config = {"configurable": {"thread_id": request.thread_id}}
        result = workflow_app.invoke(request.input, config)

        return WorkflowResponse(
            output=result,
            thread_id=request.thread_id
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app_api.post("/stream")
async def stream_workflow(request: WorkflowRequest):
    """Stream workflow execution."""
    from fastapi.responses import StreamingResponse

    config = {"configurable": {"thread_id": request.thread_id}}

    async def generate():
        async for event in workflow_app.astream(request.input, config):
            yield f"data: {json.dumps(event)}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

# Run server
if __name__ == "__main__":
    uvicorn.run(app_api, host="0.0.0.0", port=8000)
```

### Docker Deployment

```dockerfile
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app_api", "--host", "0.0.0.0", "--port", "8000"]
```

```yaml
# docker-compose.yml
version: '3.8'

services:
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - POSTGRES_URL=postgresql://user:pass@db:5432/langgraph
    depends_on:
      - db
      - redis

  db:
    image: postgres:15
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=langgraph
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7
    ports:
      - "6379:6379"

volumes:
  postgres_data:
```

---

## LangSmith Integration

### Automatic Tracing

```python
import os
from langchain_anthropic import ChatAnthropic

# Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-key"
os.environ["LANGCHAIN_PROJECT"] = "my-langgraph-project"

# All LangGraph executions automatically traced
app = workflow.compile()

result = app.invoke(initial_state)
# View trace in LangSmith UI: https://smith.langchain.com
```

### Custom Metadata

```python
from langsmith import traceable

@traceable(
    run_type="chain",
    name="research_agent",
    metadata={"version": "1.0.0"}
)
def research_agent(state: dict) -> dict:
    """Node with custom LangSmith metadata."""
    # Function execution automatically traced
    return {"research": "results"}

workflow.add_node("researcher", research_agent)
```

### Evaluation with LangSmith

```python
from langsmith import Client
from langsmith.evaluation import evaluate

client = Client()

# Create test dataset
examples = [
    {
        "inputs": {"topic": "AI Safety"},
        "outputs": {"quality_score": 0.9}
    }
]

dataset = client.create_dataset("langgraph_tests")
for example in examples:
    client.create_example(
        dataset_id=dataset.id,
        inputs=example["inputs"],
        outputs=example["outputs"]
    )

# Evaluate workflow
def workflow_wrapper(inputs: dict) -> dict:
    result = app.invoke(inputs)
    return result

def quality_evaluator(run, example):
    """Custom evaluator."""
    score = calculate_quality(run.outputs)
    return {"key": "quality", "score": score}

results = evaluate(
    workflow_wrapper,
    data="langgraph_tests",
    evaluators=[quality_evaluator],
    experiment_prefix="langgraph_v1"
)

print(f"Average quality: {results['results']['quality']:.2f}")
```

---

## Real-World Examples

### Customer Support Agent

```python
from typing import TypedDict, Literal
from langchain_anthropic import ChatAnthropic

class SupportState(TypedDict):
    customer_query: str
    category: Literal["billing", "technical", "general"]
    priority: Literal["low", "medium", "high"]
    resolution: str
    escalated: bool

def categorize(state: SupportState) -> dict:
    """Categorize customer query."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    prompt = f"""
    Categorize this support query:
    "{state['customer_query']}"

    Categories: billing, technical, general
    Priority: low, medium, high

    Return format: category|priority
    """

    response = llm.invoke(prompt)
    category, priority = response.content.strip().split("|")

    return {
        "category": category,
        "priority": priority
    }

def handle_billing(state: SupportState) -> dict:
    """Handle billing issues."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    response = llm.invoke(f"Resolve billing issue: {state['customer_query']}")

    return {"resolution": response.content}

def handle_technical(state: SupportState) -> dict:
    """Handle technical issues."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    response = llm.invoke(f"Resolve technical issue: {state['customer_query']}")

    return {"resolution": response.content}

def escalate(state: SupportState) -> dict:
    """Escalate to human agent."""
    return {
        "escalated": True,
        "resolution": "Escalated to senior support team"
    }

def route_by_category(state: SupportState) -> str:
    """Route based on category and priority."""
    if state["priority"] == "high":
        return "escalate"

    category = state["category"]
    if category == "billing":
        return "billing"
    elif category == "technical":
        return "technical"
    else:
        return "general"

# Build support workflow
workflow = StateGraph(SupportState)

workflow.add_node("categorize", categorize)
workflow.add_node("billing", handle_billing)
workflow.add_node("technical", handle_technical)
workflow.add_node("general", handle_technical)  # Reuse
workflow.add_node("escalate", escalate)

workflow.add_edge("categorize", "router")

workflow.add_conditional_edges(
    "categorize",
    route_by_category,
    {
        "billing": "billing",
        "technical": "technical",
        "general": "general",
        "escalate": "escalate"
    }
)

workflow.add_edge("billing", END)
workflow.add_edge("technical", END)
workflow.add_edge("general", END)
workflow.add_edge("escalate", END)

workflow.set_entry_point("categorize")

support_app = workflow.compile()

# Usage
result = support_app.invoke({
    "customer_query": "I was charged twice for my subscription",
    "category": "",
    "priority": "",
    "resolution": "",
    "escalated": False
})
```

### Research Assistant

```python
class ResearchState(TypedDict):
    topic: str
    research_notes: list
    outline: str
    draft: str
    final_article: str
    revision_count: int

def research(state: ResearchState) -> dict:
    """Gather information on topic."""
    # Use search tools
    results = search_web(state["topic"])

    return {
        "research_notes": [f"Research: {results}"]
    }

def create_outline(state: ResearchState) -> dict:
    """Create article outline."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    notes = "\n".join(state["research_notes"])
    response = llm.invoke(f"Create outline for: {notes}")

    return {"outline": response.content}

def write_draft(state: ResearchState) -> dict:
    """Write article draft."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    prompt = f"""
    Topic: {state['topic']}
    Outline: {state['outline']}
    Research: {state['research_notes']}

    Write comprehensive article.
    """

    response = llm.invoke(prompt)

    return {"draft": response.content}

def review_and_revise(state: ResearchState) -> dict:
    """Review draft quality."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    response = llm.invoke(f"Review and improve:\n{state['draft']}")

    return {
        "final_article": response.content,
        "revision_count": state.get("revision_count", 0) + 1
    }

# Build research workflow
workflow = StateGraph(ResearchState)

workflow.add_node("research", research)
workflow.add_node("outline", create_outline)
workflow.add_node("write", write_draft)
workflow.add_node("review", review_and_revise)

workflow.add_edge("research", "outline")
workflow.add_edge("outline", "write")
workflow.add_edge("write", "review")

def check_quality(state: ResearchState) -> str:
    if state["revision_count"] >= 2:
        return END

    # Simple quality check
    if len(state.get("final_article", "")) > 1000:
        return END
    else:
        return "write"  # Revise

workflow.add_conditional_edges(
    "review",
    check_quality,
    {END: END, "write": "write"}
)

workflow.set_entry_point("research")

research_app = workflow.compile()
```

### Code Review Workflow

```python
class CodeReviewState(TypedDict):
    code: str
    language: str
    lint_results: list
    test_results: dict
    security_scan: dict
    review_comments: list
    approved: bool

def lint_code(state: CodeReviewState) -> dict:
    """Run linters on code."""
    # Run appropriate linter
    issues = run_linter(state["code"], state["language"])

    return {"lint_results": issues}

def run_tests(state: CodeReviewState) -> dict:
    """Execute test suite."""
    results = execute_tests(state["code"])

    return {"test_results": results}

def security_scan(state: CodeReviewState) -> dict:
    """Scan for security vulnerabilities."""
    scan_results = scan_for_vulnerabilities(state["code"])

    return {"security_scan": scan_results}

def ai_review(state: CodeReviewState) -> dict:
    """AI-powered code review."""
    llm = ChatAnthropic(model="claude-sonnet-4")

    prompt = f"""
    Review this {state['language']} code:

    {state['code']}

    Lint issues: {state['lint_results']}
    Test results: {state['test_results']}
    Security: {state['security_scan']}

    Provide review comments.
    """

    response = llm.invoke(prompt)

    return {"review_comments": [response.content]}

def approve_or_reject(state: CodeReviewState) -> dict:
    """Final approval decision."""
    # Auto-approve if all checks pass
    lint_ok = len(state["lint_results"]) == 0
    tests_ok = state["test_results"].get("passed", 0) == state["test_results"].get("total", 0)
    security_ok = len(state["security_scan"].get("vulnerabilities", [])) == 0

    approved = lint_ok and tests_ok and security_ok

    return {"approved": approved}

# Build code review workflow
workflow = StateGraph(CodeReviewState)

workflow.add_node("lint", lint_code)
workflow.add_node("test", run_tests)
workflow.add_node("security", security_scan)
workflow.add_node("ai_review", ai_review)
workflow.add_node("decision", approve_or_reject)

# Parallel execution of checks
workflow.add_edge("lint", "ai_review")
workflow.add_edge("test", "ai_review")
workflow.add_edge("security", "ai_review")
workflow.add_edge("ai_review", "decision")
workflow.add_edge("decision", END)

workflow.set_entry_point("lint")
workflow.set_entry_point("test")
workflow.set_entry_point("security")

code_review_app = workflow.compile()
```

---

## Performance Optimization

### Parallel Execution

```python
# Multiple nodes with same parent execute in parallel
workflow.add_edge("start", "task_a")
workflow.add_edge("start", "task_b")
workflow.add_edge("start", "task_c")

# task_a, task_b, task_c run concurrently
```

### Caching with Checkpointers

```python
from langgraph.checkpoint.sqlite import SqliteSaver

# Checkpoint results for reuse
checkpointer = SqliteSaver.from_conn_string("cache.db")

app = workflow.compile(checkpointer=checkpointer)

# Same thread_id reuses cached results
config = {"configurable": {"thread_id": "cached_session"}}

# First run: executes all nodes
result1 = app.invoke(input_data, config)

# Second run: uses cached state
result2 = app.invoke(None, config)  # Instant
```

### Batching

```python
def batch_processing_node(state: dict) -> dict:
    """Process items in batches."""
    items = state["items"]
    batch_size = 10

    results = []
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        batch_result = process_batch(batch)
        results.extend(batch_result)

    return {"results": results}
```

### Async Execution

```python
from langgraph.graph import StateGraph
import asyncio

async def async_node(state: dict) -> dict:
    """Async node for I/O-bound operations."""
    results = await asyncio.gather(
        fetch_api_1(state["input"]),
        fetch_api_2(state["input"]),
        fetch_api_3(state["input"])
    )

    return {"results": results}

# Use async invoke
result = await app.ainvoke(input_data)
```

---

## Comparison with Simple Chains

### LangChain LCEL (Simple Chain)

```python
from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4")

# Simple linear chain
chain = (
    RunnablePassthrough()
    | llm
    | {"output": lambda x: x.content}
)

result = chain.invoke("Hello")
```

**Limitations**:
- ❌ No state persistence
- ❌ No conditional branching
- ❌ No cycles/loops
- ❌ No human-in-the-loop
- ❌ Limited debugging

### LangGraph (Cyclic Workflow)

```python
from langgraph.graph import StateGraph, END

workflow = StateGraph(dict)

workflow.add_node("step1", step1_func)
workflow.add_node("step2", step2_func)

# Conditional loop
workflow.add_conditional_edges(
    "step2",
    should_continue,
    {"step1": "step1", END: END}
)

app = workflow.compile(checkpointer=memory)
```

**Advantages**:
- ✅ Persistent state across steps
- ✅ Conditional branching
- ✅ Cycles and loops
- ✅ Human-in-the-loop support
- ✅ Time-travel debugging
- ✅ Production-ready

---

## Migration from LangChain LCEL

### Before: LCEL Chain

```python
from langchain_core.runnables import RunnablePassthrough
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-sonnet-4")

chain = (
    {"input": RunnablePassthrough()}
    | llm
    | {"output": lambda x: x.content}
)

result = chain.invoke("Question")
```

### After: LangGraph

```python
from langgraph.graph import StateGraph, END
from typing import TypedDict

class State(TypedDict):
    input: str
    output: str

def llm_node(state: State) -> dict:
    llm = ChatAnthropic(model="claude-sonnet-4")
    response = llm.invoke(state["input"])
    return {"output": response.content}

workflow = StateGraph(State)
workflow.add_node("llm", llm_node)
workflow.add_edge("llm", END)
workflow.set_entry_point("llm")

app = workflow.compile()

result = app.invoke({"input": "Question", "output": ""})
```

### Migration Checklist

- [ ] Identify stateful vs stateless operations
- [ ] Convert chain steps to graph nodes
- [ ] Define state schema (TypedDict)
- [ ] Add edges between nodes
- [ ] Implement conditional routing if needed
- [ ] Add checkpointing for state persistence
- [ ] Test with same inputs
- [ ] Verify output matches original chain

---

## Best Practices

### State Design

```python
# ✅ GOOD: Flat, explicit state
class GoodState(TypedDict):
    user_id: str
    messages: list
    current_step: str
    result: dict

# ❌ BAD: Nested, ambiguous state
class BadState(TypedDict):
    data: dict  # Too generic
    context: Any  # No type safety
```

### Node Granularity

```python
# ✅ GOOD: Single responsibility per node
def validate_input(state): ...
def process_data(state): ...
def format_output(state): ...

# ❌ BAD: Monolithic node
def do_everything(state): ...
```

### Error Handling

```python
# ✅ GOOD: Explicit error states
class State(TypedDict):
    result: dict | None
    error: str | None
    status: Literal["pending", "success", "failed"]

def robust_node(state):
    try:
        result = operation()
        return {"result": result, "status": "success"}
    except Exception as e:
        return {"error": str(e), "status": "failed"}

# ❌ BAD: Silent failures
def fragile_node(state):
    try:
        return operation()
    except:
        return {}  # Error lost
```

### Testing

```python
# Test individual nodes
def test_node():
    state = {"input": "test"}
    result = my_node(state)
    assert result["output"] == "expected"

# Test full workflow
def test_workflow():
    app = workflow.compile()
    result = app.invoke(initial_state)
    assert result["final_output"] == "expected"

# Test error paths
def test_error_handling():
    state = {"input": "invalid"}
    result = my_node(state)
    assert result["error"] is not None
```

### Monitoring

```python
import logging

logger = logging.getLogger(__name__)

def monitored_node(state: dict) -> dict:
    """Node with logging."""
    logger.info(f"Executing node with state: {state.keys()}")

    try:
        result = operation()
        logger.info("Node succeeded")
        return {"result": result}
    except Exception as e:
        logger.error(f"Node failed: {e}")
        raise
```

---

## Summary

LangGraph transforms agent development from linear chains into **cyclic, stateful workflows** with production-grade capabilities:

**Core Strengths**:
- 🔄 Cyclic workflows with loops and branches
- 💾 Persistent state across sessions
- 🕐 Time-travel debugging and replay
- 👤 Human-in-the-loop approval gates
- 🔧 Tool integration (LangChain, Anthropic, custom)
- 📊 LangSmith integration for tracing
- 🏭 Production deployment (cloud or self-hosted)

**When to Choose LangGraph**:
- Multi-agent coordination required
- Complex branching logic
- State persistence needed
- Human approvals in workflow
- Production systems with debugging needs

**Learning Path**:
1. Start with StateGraph basics
2. Add conditional routing
3. Implement checkpointing
4. Build multi-agent patterns
5. Deploy to production

**Production Checklist**:
- [ ] State schema with TypedDict
- [ ] Error handling in all nodes
- [ ] Checkpointing configured
- [ ] LangSmith tracing enabled
- [ ] Monitoring and logging
- [ ] Testing (unit + integration)
- [ ] Deployment strategy (cloud/self-hosted)

For simple request-response patterns, use LangChain LCEL. For complex stateful workflows, LangGraph is the industry-standard solution.

## Related Skills

When using Langgraph, these skills enhance your workflow:
- **dspy**: DSPy for LLM prompt optimization (complementary to LangGraph orchestration)
- **test-driven-development**: Testing LangGraph workflows and state machines
- **systematic-debugging**: Debugging multi-agent workflows and state transitions

[Full documentation available in these skills if deployed in your bundle]

Overview

This skill provides a LangGraph framework for building stateful, multi-agent AI applications with cyclical workflows, human-in-the-loop gates, and persistent checkpointing. It helps orchestrate agents as nodes in directed graphs, enabling conditional branching, retries, and time-travel debugging for production-grade agent systems.

How this skill works

You declare a TypedDict state schema and compose a StateGraph of node functions that receive the full state and return partial updates. Edges link nodes statically or via routing functions (including LLM-powered routers). The graph executes from an entry point, applies reducers to merge updates, persists state across runs, and supports loops, conditional transitions, and terminal END nodes.

When to use it

  • Coordinating multiple specialized agents that share and evolve state
  • Building human-in-the-loop flows with approval, review, or gating steps
  • Implementing workflows that require conditional branching or retries
  • Creating production agent systems with observability and replay
  • Persisting state across sessions or long-running workflows

Best practices

  • Design a clear TypedDict state schema and use Annotated reducers for lists, sets, counters, and dict merges
  • Keep node functions focused: accept full state, return only fields to update
  • Use conditional edges for routing decisions; prefer deterministic routers for critical paths and LLM routers for exploratory routing
  • Limit loop counts with counters or max-revision logic to avoid infinite cycles
  • Add logging and checkpointing around nodes to enable time-travel debugging and replay

Example use cases

  • Research → Draft → Review loop for automated article production with iterative revisions
  • Supervisor pattern where a coordinator routes tasks to researcher, coder, or writer agents and loops until completion
  • Multi-step data pipelines that accumulate results across nodes and persist intermediate state
  • Human approval gates where reviewers can approve, request revisions, or escalate
  • Nested team workflows: sub-graphs for backend/frontend teams coordinated by a higher-level supervisor

FAQ

How do I persist state between runs?

Define a TypedDict state schema and compile the StateGraph; use built-in reducers and external storage hooks to checkpoint state so the graph can resume from persisted snapshots.

Can routing logic use an LLM?

Yes. Routing functions can invoke an LLM to decide the next node, but prefer fallback deterministic checks and map LLM outputs explicitly to node identifiers to avoid unexpected transitions.