home / skills / andrueandersoncs / claude-skill-effect-ts / streams
This skill helps you understand and work with Effect's Streams API, including creation, transformation, backpressure, and consumption of real-time data.
npx playbooks add skill andrueandersoncs/claude-skill-effect-ts --skill streamsReview the files below or copy the command above to add this skill to your agents.
---
name: Streams
description: This skill should be used when the user asks about "Effect Stream", "Stream.from", "Stream.map", "Stream.filter", "Stream.run", "streaming data", "async iteration", "Sink", "Channel", "Stream.concat", "Stream.merge", "backpressure", "Stream.fromIterable", "chunked processing", "real-time data", or needs to understand how Effect handles streaming data processing.
version: 1.0.0
---
# Streams in Effect
## Overview
Effect Streams provide:
- **Lazy evaluation** - Elements produced on demand
- **Resource safety** - Automatic cleanup
- **Backpressure** - Producer/consumer coordination
- **Composition** - Transform, filter, merge streams
- **Error handling** - Typed errors in stream pipeline
```typescript
Stream<A, E, R>
// Produces values of type A
// May fail with error E
// Requires environment R
```
## Creating Streams
### From Values
```typescript
import { Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5)
const fromArray = Stream.fromIterable([1, 2, 3])
const empty = Stream.empty
const single = Stream.succeed(42)
const infinite = Stream.iterate(1, (n) => n + 1)
```
### From Effects
```typescript
const fromEffect = Stream.fromEffect(fetchData())
const polling = Stream.repeatEffect(checkStatus())
const scheduled = Stream.repeatEffectWithSchedule(
checkStatus(),
Schedule.spaced("5 seconds")
)
```
### From Async Sources
```typescript
// From async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(
asyncGenerator(),
(error) => new StreamError({ cause: error })
)
// From callback/event emitter
const fromCallback = Stream.async<number, never>((emit) => {
const handler = (value: number) => emit.single(value)
eventEmitter.on("data", handler)
return Effect.sync(() => eventEmitter.off("data", handler))
})
// From queue
const fromQueue = Stream.fromQueue(queue)
```
### Generating Streams
```typescript
const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1]))
const range = Stream.range(1, 100)
const repeated = Stream.repeat(Stream.succeed("ping")).pipe(
Stream.take(5)
)
```
## Transforming Streams
### map - Transform Elements
```typescript
const doubled = numbers.pipe(
Stream.map((n) => n * 2)
)
const enriched = users.pipe(
Stream.mapEffect((user) => fetchProfile(user.id))
)
const parallel = items.pipe(
Stream.mapEffect(process, { concurrency: 10 })
)
```
### filter - Select Elements
```typescript
const evens = numbers.pipe(
Stream.filter((n) => n % 2 === 0)
)
const valid = items.pipe(
Stream.filterEffect((item) => validate(item))
)
```
### flatMap - Nested Streams
```typescript
const expanded = numbers.pipe(
Stream.flatMap((n) => Stream.make(n, n * 10, n * 100))
)
// 1, 10, 100, 2, 20, 200, ...
```
### take/drop
```typescript
const first5 = numbers.pipe(Stream.take(5))
const skip5 = numbers.pipe(Stream.drop(5))
const firstWhile = numbers.pipe(Stream.takeWhile((n) => n < 10))
const dropWhile = numbers.pipe(Stream.dropWhile((n) => n < 10))
```
## Combining Streams
### concat - Sequential
```typescript
const combined = Stream.concat(stream1, stream2)
// or
const combined = stream1.pipe(Stream.concat(stream2))
```
### merge - Interleaved
```typescript
// Interleave elements from both
const merged = Stream.merge(stream1, stream2)
// Merge multiple
const allMerged = Stream.mergeAll([s1, s2, s3], { concurrency: 3 })
```
### zip - Pair Elements
```typescript
const zipped = Stream.zip(names, ages)
// Stream<[string, number]>
// With function
const combined = Stream.zipWith(
names,
ages,
(name, age) => ({ name, age })
)
```
### interleave
```typescript
const interleaved = Stream.interleave(stream1, stream2)
// a1, b1, a2, b2, ...
```
## Consuming Streams
### Running to Collection
```typescript
const array = yield* Stream.runCollect(numbers)
const first = yield* Stream.runHead(numbers)
const sum = yield* Stream.runFold(
numbers,
0,
(acc, n) => acc + n
)
```
### Running for Effects
```typescript
yield* numbers.pipe(
Stream.runForEach((n) => Effect.log(`Got: ${n}`))
)
yield* numbers.pipe(Stream.runDrain)
```
### Running to Sink
```typescript
import { Sink } from "effect"
const sum = yield* numbers.pipe(
Stream.run(Sink.sum)
)
const array = yield* numbers.pipe(
Stream.run(Sink.collectAll())
)
```
## Chunking
Streams process elements in chunks for efficiency:
```typescript
const chunked = numbers.pipe(
Stream.grouped(10)
)
const processed = numbers.pipe(
Stream.mapChunks((chunk) => Chunk.map(chunk, (n) => n * 2))
)
const rechunked = numbers.pipe(
Stream.rechunk(100)
)
```
## Error Handling
```typescript
const safe = stream.pipe(
Stream.catchAll((error) => Stream.succeed(fallbackValue))
)
const handled = stream.pipe(
Stream.catchTag("NetworkError", (error) =>
Stream.succeed(cachedValue)
)
)
const resilient = stream.pipe(
Stream.retry(Schedule.exponential("1 second"))
)
const withFallback = stream.pipe(
Stream.orElse(() => fallbackStream)
)
```
## Resource Management
```typescript
// Stream with resource lifecycle
const fileStream = Stream.acquireRelease(
Effect.sync(() => fs.openSync("data.txt", "r")),
(fd) => Effect.sync(() => fs.closeSync(fd))
).pipe(
Stream.flatMap((fd) =>
Stream.repeatEffectOption(
Effect.sync(() => {
const buffer = Buffer.alloc(1024)
const bytes = fs.readSync(fd, buffer)
return bytes > 0 ? Option.some(buffer.slice(0, bytes)) : Option.none()
})
)
)
)
// Scoped streams
const scoped = Stream.scoped(
Effect.acquireRelease(openConnection, closeConnection)
)
```
## Sinks
Sinks consume stream elements:
```typescript
import { Sink } from "effect"
Sink.sum
Sink.count
Sink.head
Sink.last
Sink.collectAll()
Sink.forEach(f)
const maxSink = Sink.foldLeft(
Number.NEGATIVE_INFINITY,
(max, n: number) => Math.max(max, n)
)
```
## Common Patterns
### Batched Processing
```typescript
const batchProcess = stream.pipe(
Stream.grouped(100),
Stream.mapEffect((batch) =>
Effect.tryPromise(() => api.processBatch(Chunk.toArray(batch)))
)
)
```
### Rate Limiting
```typescript
const rateLimited = stream.pipe(
Stream.throttle({
units: 1,
duration: "100 millis",
strategy: "shape"
})
)
```
### Debouncing
```typescript
const debounced = stream.pipe(
Stream.debounce("500 millis")
)
```
### Windowing
```typescript
// Time-based windows
const windows = stream.pipe(
Stream.groupedWithin(1000, "1 second")
)
```
## Best Practices
1. **Use chunking for efficiency** - Batch operations when possible
2. **Handle backpressure** - Use appropriate buffer strategies
3. **Clean up resources** - Use acquireRelease for external resources
4. **Process in parallel** - Use concurrency option in mapEffect
5. **Handle errors early** - Catch/retry before final consumption
## Additional Resources
For comprehensive stream documentation, consult `${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt`.
Search for these sections:
- "Creating Streams" for stream construction
- "Consuming Streams" for running streams
- "Operations" for transformations
- "Error Handling in Streams" for error patterns
- "Resourceful Streams" for resource management
- "Sink" for custom sinks
This skill explains how Effect handles streaming data processing, including creating, transforming, composing, and consuming streams with resource safety and backpressure. It summarizes core primitives like Stream, Sink, Channel, and common operations such as map, filter, flatMap, concat, and merge. The guidance highlights chunking, error handling, and patterns for real-time and batched processing.
Streams produce elements lazily and on demand, carrying types for values, errors, and required environment (Stream<A, E, R>). They coordinate producer and consumer via backpressure and process elements in chunks for efficiency. Streams can be built from values, effects, async iterables, callbacks, or queues, and are consumed by running to collections, running for side effects, or piping into Sinks.
How do I create a stream from an async callback or event emitter?
Use Stream.async to register a handler that emits elements and returns a cleanup Effect to remove listeners.
When should I use merge vs concat?
Use concat to run streams sequentially (finish one then start the next). Use merge to interleave elements concurrently with configurable concurrency.
How can I handle errors without terminating the whole pipeline?
Use Stream.catchAll, catchTag, or orElse to convert errors into fallback values or fallback streams; use retry with a Schedule for transient failures.