home / skills / lookatitude / beluga-ai / streaming-patterns
This skill helps implement streaming patterns in Go using iter.Seq2, enabling backpressure-aware streaming, transforms, and efficient data flow.
npx playbooks add skill lookatitude/beluga-ai --skill streaming-patternsReview the files below or copy the command above to add this skill to your agents.
---
name: streaming-patterns
description: Go 1.23 iter.Seq2 streaming patterns for Beluga AI v2. Use when implementing streaming, transforms, or backpressure.
---
# Streaming Patterns
## Primary Primitive: iter.Seq2[T, error]
```go
func (m *Model) Stream(ctx context.Context, msgs []schema.Message) iter.Seq2[schema.StreamChunk, error] {
return func(yield func(schema.StreamChunk, error) bool) {
stream, err := m.client.Stream(ctx, msgs)
if err != nil { yield(schema.StreamChunk{}, err); return }
defer stream.Close()
for {
select {
case <-ctx.Done(): yield(schema.StreamChunk{}, ctx.Err()); return
default:
}
chunk, err := stream.Recv()
if err == io.EOF { return }
if err != nil { yield(schema.StreamChunk{}, err); return }
if !yield(convertChunk(chunk), nil) { return } // consumer stopped
}
}
}
```
## Composition
- **Pipe**: `func Pipe[A, B any](first iter.Seq2[A, error], transform func(A) (B, error)) iter.Seq2[B, error]`
- **Collect**: Stream to slice — `func Collect[T any](stream iter.Seq2[T, error]) ([]T, error)`
- **Invoke from Stream**: Stream, collect, return last.
- **Fan-out**: `iter.Pull2()` to get next/stop, broadcast to N consumers.
- **BufferedStream**: Channel-backed buffer for backpressure.
## Rules
1. Public API: `iter.Seq2[T, error]` — never `<-chan`.
2. Internal goroutine communication: channels are fine.
3. Always check context cancellation in producers.
4. `yield` returning false = consumer stopped — respect immediately.
5. Use `iter.Pull2` only when pull semantics are genuinely needed.
This skill documents idiomatic streaming patterns for Go 1.23 using iter.Seq2[T, error], tailored for Beluga AI v2 streaming, transforms, and backpressure. It presents a primary primitive implementation, composition helpers, and concrete rules to keep streaming code safe, cancelable, and composable. Use it as a practical reference when implementing producers, transformers, or buffered streams in Go.
The core primitive is iter.Seq2[T, error], a function that accepts a yield callback to push items or an error to a consumer. Producers implement a loop that checks context cancellation, receives or generates chunks, and calls yield; yield returning false signals the consumer stopped and the producer must return. Composition helpers (Pipe, Collect, Fan-out, BufferedStream) wrap or coordinate Seq2 streams without exposing raw channels in the public API.
Why use iter.Seq2 instead of channels?
iter.Seq2 keeps public APIs pull/push-agnostic, simplifies cancellation handling via the yield return value, and prevents misuse that leads to goroutine leaks or uncontrolled concurrency when consumers stop early.
How do I handle backpressure?
Use a BufferedStream implemented with internal channels to buffer a bounded number of items, and ensure producers respect ctx.Done() and the yield false signal to stop when consumers back out.