home / skills / yonatangross / orchestkit / llm-streaming
/plugins/ork/skills/llm-streaming
This skill helps you implement real-time LLM streaming with token-by-token updates, SSE endpoints, and tool-call coordination for responsive apps.
npx playbooks add skill yonatangross/orchestkit --skill llm-streamingReview the files below or copy the command above to add this skill to your agents.
---
name: llm-streaming
description: LLM streaming response patterns. Use when implementing real-time token streaming, Server-Sent Events for AI responses, or streaming with tool calls.
tags: [llm, streaming, sse, real-time]
context: fork
agent: llm-integrator
version: 1.0.0
author: OrchestKit
user-invocable: false
---
# LLM Streaming
Deliver LLM responses in real-time for better UX.
## Basic Streaming (OpenAI)
```python
from openai import OpenAI
client = OpenAI()
async def stream_response(prompt: str):
"""Stream tokens as they're generated."""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
```
## Streaming with Async
```python
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def async_stream(prompt: str):
"""Async streaming for better concurrency."""
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
```
## FastAPI SSE Endpoint
```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
@app.get("/chat/stream")
async def stream_chat(prompt: str):
"""Server-Sent Events endpoint for streaming."""
async def generate():
async for token in async_stream(prompt):
yield {
"event": "token",
"data": token
}
yield {"event": "done", "data": ""}
return EventSourceResponse(generate())
```
## Frontend SSE Consumer
```typescript
async function streamChat(prompt: string, onToken: (t: string) => void) {
const response = await fetch("/chat/stream?prompt=" + encodeURIComponent(prompt));
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data !== '[DONE]') {
onToken(data);
}
}
}
}
}
// Usage
let fullResponse = '';
await streamChat('Hello', (token) => {
fullResponse += token;
setDisplayText(fullResponse); // Update UI incrementally
});
```
## Streaming with Tool Calls
```python
async def stream_with_tools(messages: list, tools: list):
"""Handle streaming responses that include tool calls."""
stream = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
stream=True
)
collected_content = ""
collected_tool_calls = []
async for chunk in stream:
delta = chunk.choices[0].delta
# Collect content tokens
if delta.content:
collected_content += delta.content
yield {"type": "content", "data": delta.content}
# Collect tool call chunks
if delta.tool_calls:
for tc in delta.tool_calls:
# Tool calls come in chunks, accumulate them
if tc.index >= len(collected_tool_calls):
collected_tool_calls.append({
"id": tc.id,
"function": {"name": "", "arguments": ""}
})
if tc.function.name:
collected_tool_calls[tc.index]["function"]["name"] += tc.function.name
if tc.function.arguments:
collected_tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments
# If tool calls, execute them
if collected_tool_calls:
yield {"type": "tool_calls", "data": collected_tool_calls}
```
## Backpressure Handling
```python
import asyncio
async def stream_with_backpressure(prompt: str, max_buffer: int = 100):
"""Handle slow consumers with backpressure."""
buffer = asyncio.Queue(maxsize=max_buffer)
async def producer():
async for token in async_stream(prompt):
await buffer.put(token) # Blocks if buffer full
await buffer.put(None) # Signal completion
async def consumer():
while True:
token = await buffer.get()
if token is None:
break
yield token
await asyncio.sleep(0) # Yield control
# Start producer in background
asyncio.create_task(producer())
# Return consumer generator
async for token in consumer():
yield token
```
## Key Decisions
| Decision | Recommendation |
|----------|----------------|
| Protocol | SSE for web, WebSocket for bidirectional |
| Buffer size | 50-200 tokens |
| Timeout | 30-60s for long responses |
| Retry | Reconnect on disconnect |
## Common Mistakes
- No timeout (hangs on network issues)
- Missing error handling in stream
- Not closing connections properly
- Buffering entire response (defeats purpose)
## Related Skills
- `streaming-api-patterns` - SSE/WebSocket deep dive
- `function-calling` - Tool calls in streams
- `react-streaming-ui` - React streaming components
## Capability Details
### token-streaming
**Keywords:** streaming, token, stream response, real-time, incremental
**Solves:**
- Stream tokens as they're generated
- Display real-time LLM output
- Reduce time to first byte
### sse-responses
**Keywords:** SSE, Server-Sent Events, event stream, text/event-stream
**Solves:**
- Implement SSE for streaming
- Handle SSE reconnection
- Parse SSE event data
### streaming-with-tools
**Keywords:** stream tools, tool streaming, function call stream
**Solves:**
- Stream responses with tool calls
- Handle partial tool call data
- Coordinate streaming and tool execution
### partial-json-parsing
**Keywords:** partial JSON, incremental parse, streaming JSON
**Solves:**
- Parse JSON as it streams
- Handle incomplete JSON safely
- Display partial structured data
### stream-cancellation
**Keywords:** cancel, abort, stop stream, AbortController
**Solves:**
- Cancel ongoing streams
- Handle user interrupts
- Clean up stream resources
This skill documents production-ready patterns for streaming LLM responses in real time. It covers token-level streaming, Server-Sent Events (SSE), frontend consumers, tool-call streaming, backpressure, and key operational decisions for reliable delivery.
The skill shows how to request streaming from an LLM client and yield tokens as they arrive, both synchronously and asynchronously. It demonstrates wrapping streams in a FastAPI SSE endpoint, consuming stream bytes in a browser, accumulating partial tool-call payloads, and applying backpressure via an internal buffer to protect slow consumers.
Should I use SSE or WebSocket?
Use SSE for simple one-way streams to browsers. Choose WebSocket for bidirectional workflows or lower-latency interactive features.
How do I handle incomplete tool-call data?
Accumulate tool-call chunks by index, append partial name/arguments, and only invoke the tool once the full payload is assembled.
What buffer size is recommended?
Start with 50–200 tokens based on traffic and latency characteristics; tune for memory and consumer speed.