home / skills / yonatangross / orchestkit / llm-streaming

llm-streaming skill

/plugins/ork/skills/llm-streaming

This skill helps you implement real-time LLM streaming with token-by-token updates, SSE endpoints, and tool-call coordination for responsive apps.

npx playbooks add skill yonatangross/orchestkit --skill llm-streaming

Review the files below or copy the command above to add this skill to your agents.

Files (2)
SKILL.md
6.3 KB
---
name: llm-streaming
description: LLM streaming response patterns. Use when implementing real-time token streaming, Server-Sent Events for AI responses, or streaming with tool calls.
tags: [llm, streaming, sse, real-time]
context: fork
agent: llm-integrator
version: 1.0.0
author: OrchestKit
user-invocable: false
---

# LLM Streaming

Deliver LLM responses in real-time for better UX.

## Basic Streaming (OpenAI)

```python
from openai import OpenAI

client = OpenAI()

async def stream_response(prompt: str):
    """Stream tokens as they're generated."""
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content
```

## Streaming with Async

```python
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def async_stream(prompt: str):
    """Async streaming for better concurrency."""
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content
```

## FastAPI SSE Endpoint

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse

app = FastAPI()

@app.get("/chat/stream")
async def stream_chat(prompt: str):
    """Server-Sent Events endpoint for streaming."""
    async def generate():
        async for token in async_stream(prompt):
            yield {
                "event": "token",
                "data": token
            }
        yield {"event": "done", "data": ""}

    return EventSourceResponse(generate())
```

## Frontend SSE Consumer

```typescript
async function streamChat(prompt: string, onToken: (t: string) => void) {
  const response = await fetch("/chat/stream?prompt=" + encodeURIComponent(prompt));
  const reader = response.body?.getReader();
  const decoder = new TextDecoder();

  while (reader) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    const lines = text.split('\n');

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6);
        if (data !== '[DONE]') {
          onToken(data);
        }
      }
    }
  }
}

// Usage
let fullResponse = '';
await streamChat('Hello', (token) => {
  fullResponse += token;
  setDisplayText(fullResponse);  // Update UI incrementally
});
```

## Streaming with Tool Calls

```python
async def stream_with_tools(messages: list, tools: list):
    """Handle streaming responses that include tool calls."""
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        tools=tools,
        stream=True
    )

    collected_content = ""
    collected_tool_calls = []

    async for chunk in stream:
        delta = chunk.choices[0].delta

        # Collect content tokens
        if delta.content:
            collected_content += delta.content
            yield {"type": "content", "data": delta.content}

        # Collect tool call chunks
        if delta.tool_calls:
            for tc in delta.tool_calls:
                # Tool calls come in chunks, accumulate them
                if tc.index >= len(collected_tool_calls):
                    collected_tool_calls.append({
                        "id": tc.id,
                        "function": {"name": "", "arguments": ""}
                    })

                if tc.function.name:
                    collected_tool_calls[tc.index]["function"]["name"] += tc.function.name
                if tc.function.arguments:
                    collected_tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments

    # If tool calls, execute them
    if collected_tool_calls:
        yield {"type": "tool_calls", "data": collected_tool_calls}
```

## Backpressure Handling

```python
import asyncio

async def stream_with_backpressure(prompt: str, max_buffer: int = 100):
    """Handle slow consumers with backpressure."""
    buffer = asyncio.Queue(maxsize=max_buffer)

    async def producer():
        async for token in async_stream(prompt):
            await buffer.put(token)  # Blocks if buffer full
        await buffer.put(None)  # Signal completion

    async def consumer():
        while True:
            token = await buffer.get()
            if token is None:
                break
            yield token
            await asyncio.sleep(0)  # Yield control

    # Start producer in background
    asyncio.create_task(producer())

    # Return consumer generator
    async for token in consumer():
        yield token
```

## Key Decisions

| Decision | Recommendation |
|----------|----------------|
| Protocol | SSE for web, WebSocket for bidirectional |
| Buffer size | 50-200 tokens |
| Timeout | 30-60s for long responses |
| Retry | Reconnect on disconnect |

## Common Mistakes

- No timeout (hangs on network issues)
- Missing error handling in stream
- Not closing connections properly
- Buffering entire response (defeats purpose)

## Related Skills

- `streaming-api-patterns` - SSE/WebSocket deep dive
- `function-calling` - Tool calls in streams
- `react-streaming-ui` - React streaming components

## Capability Details

### token-streaming
**Keywords:** streaming, token, stream response, real-time, incremental
**Solves:**
- Stream tokens as they're generated
- Display real-time LLM output
- Reduce time to first byte

### sse-responses
**Keywords:** SSE, Server-Sent Events, event stream, text/event-stream
**Solves:**
- Implement SSE for streaming
- Handle SSE reconnection
- Parse SSE event data

### streaming-with-tools
**Keywords:** stream tools, tool streaming, function call stream
**Solves:**
- Stream responses with tool calls
- Handle partial tool call data
- Coordinate streaming and tool execution

### partial-json-parsing
**Keywords:** partial JSON, incremental parse, streaming JSON
**Solves:**
- Parse JSON as it streams
- Handle incomplete JSON safely
- Display partial structured data

### stream-cancellation
**Keywords:** cancel, abort, stop stream, AbortController
**Solves:**
- Cancel ongoing streams
- Handle user interrupts
- Clean up stream resources

Overview

This skill documents production-ready patterns for streaming LLM responses in real time. It covers token-level streaming, Server-Sent Events (SSE), frontend consumers, tool-call streaming, backpressure, and key operational decisions for reliable delivery.

How this skill works

The skill shows how to request streaming from an LLM client and yield tokens as they arrive, both synchronously and asynchronously. It demonstrates wrapping streams in a FastAPI SSE endpoint, consuming stream bytes in a browser, accumulating partial tool-call payloads, and applying backpressure via an internal buffer to protect slow consumers.

When to use it

  • When you need low time-to-first-byte UX for chat or assistant interfaces
  • When responses must be rendered incrementally in the browser or CLI
  • When integrating tool/function calls that may arrive incrementally
  • When building server endpoints that push tokens (SSE or WebSocket)
  • When you expect slow consumers and must manage backpressure

Best practices

  • Use SSE for one-way browser streaming and WebSocket for bidirectional interactions
  • Enforce timeouts and reconnect/retry logic to avoid hanging streams
  • Keep a bounded buffer (e.g., 50–200 tokens) and apply backpressure for slow clients
  • Accumulate partial tool-call chunks and only execute once the call payload is complete
  • Stream errors and finalization events explicitly, and always close connections cleanly

Example use cases

  • A chat UI that displays tokens as they arrive to improve perceived responsiveness
  • A FastAPI SSE endpoint that streams LLM tokens to multiple frontends
  • A pipeline that parses partial JSON from an LLM output while streaming
  • Executing tool calls whose arguments are received incrementally from the model
  • A server that buffers tokens and applies backpressure to protect downstream services

FAQ

Should I use SSE or WebSocket?

Use SSE for simple one-way streams to browsers. Choose WebSocket for bidirectional workflows or lower-latency interactive features.

How do I handle incomplete tool-call data?

Accumulate tool-call chunks by index, append partial name/arguments, and only invoke the tool once the full payload is assembled.

What buffer size is recommended?

Start with 50–200 tokens based on traffic and latency characteristics; tune for memory and consumer speed.