home / skills / lookatitude / beluga-ai / streaming-patterns

streaming-patterns skill

/.claude/skills/streaming-patterns

This skill helps implement streaming patterns in Go using iter.Seq2, enabling backpressure-aware streaming, transforms, and efficient data flow.

npx playbooks add skill lookatitude/beluga-ai --skill streaming-patterns

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
1.6 KB
---
name: streaming-patterns
description: Go 1.23 iter.Seq2 streaming patterns for Beluga AI v2. Use when implementing streaming, transforms, or backpressure.
---

# Streaming Patterns

## Primary Primitive: iter.Seq2[T, error]

```go
func (m *Model) Stream(ctx context.Context, msgs []schema.Message) iter.Seq2[schema.StreamChunk, error] {
    return func(yield func(schema.StreamChunk, error) bool) {
        stream, err := m.client.Stream(ctx, msgs)
        if err != nil { yield(schema.StreamChunk{}, err); return }
        defer stream.Close()
        for {
            select {
            case <-ctx.Done(): yield(schema.StreamChunk{}, ctx.Err()); return
            default:
            }
            chunk, err := stream.Recv()
            if err == io.EOF { return }
            if err != nil { yield(schema.StreamChunk{}, err); return }
            if !yield(convertChunk(chunk), nil) { return } // consumer stopped
        }
    }
}
```

## Composition

- **Pipe**: `func Pipe[A, B any](first iter.Seq2[A, error], transform func(A) (B, error)) iter.Seq2[B, error]`
- **Collect**: Stream to slice — `func Collect[T any](stream iter.Seq2[T, error]) ([]T, error)`
- **Invoke from Stream**: Stream, collect, return last.
- **Fan-out**: `iter.Pull2()` to get next/stop, broadcast to N consumers.
- **BufferedStream**: Channel-backed buffer for backpressure.

## Rules

1. Public API: `iter.Seq2[T, error]` — never `<-chan`.
2. Internal goroutine communication: channels are fine.
3. Always check context cancellation in producers.
4. `yield` returning false = consumer stopped — respect immediately.
5. Use `iter.Pull2` only when pull semantics are genuinely needed.

Overview

This skill documents idiomatic streaming patterns for Go 1.23 using iter.Seq2[T, error], tailored for Beluga AI v2 streaming, transforms, and backpressure. It presents a primary primitive implementation, composition helpers, and concrete rules to keep streaming code safe, cancelable, and composable. Use it as a practical reference when implementing producers, transformers, or buffered streams in Go.

How this skill works

The core primitive is iter.Seq2[T, error], a function that accepts a yield callback to push items or an error to a consumer. Producers implement a loop that checks context cancellation, receives or generates chunks, and calls yield; yield returning false signals the consumer stopped and the producer must return. Composition helpers (Pipe, Collect, Fan-out, BufferedStream) wrap or coordinate Seq2 streams without exposing raw channels in the public API.

When to use it

  • Implementing a network or model inference stream that must send data incrementally to a consumer.
  • Transforming or mapping streamed items with backpressure awareness.
  • Collecting streamed results into a slice or returning the last received item.
  • Broadcasting a single producer to multiple consumers with fan-out semantics.
  • Adding buffering to smooth rate mismatches while respecting cancellation and stop signals.

Best practices

  • Expose iter.Seq2[T, error] in public APIs; avoid returning <-chan to callers.
  • Always check ctx.Done() in producers and yield a context error if canceled.
  • Respect yield returning false: stop work and return immediately to avoid leaks.
  • Use internal channels and goroutines freely, but keep public boundaries as Seq2 functions.
  • Reserve iter.Pull2 and explicit pull semantics for cases that truly need on-demand consumption.

Example use cases

  • Stream model token chunks from a remote client and transform each chunk before yielding to the caller.
  • Pipe a generator stream through a sanitizer transform that can return errors per item.
  • Collect a stream into a slice to run a post-processing step on all chunks once the stream completes.
  • Implement a buffered stream that decouples producer and consumer rates while propagating context cancellation.

FAQ

Why use iter.Seq2 instead of channels?

iter.Seq2 keeps public APIs pull/push-agnostic, simplifies cancellation handling via the yield return value, and prevents misuse that leads to goroutine leaks or uncontrolled concurrency when consumers stop early.

How do I handle backpressure?

Use a BufferedStream implemented with internal channels to buffer a bounded number of items, and ensure producers respect ctx.Done() and the yield false signal to stop when consumers back out.