home / skills / andrueandersoncs / claude-skill-effect-ts / streams

streams skill

/skills/streams

This skill helps you understand and work with Effect's Streams API, including creation, transformation, backpressure, and consumption of real-time data.

npx playbooks add skill andrueandersoncs/claude-skill-effect-ts --skill streams

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
7.1 KB
---
name: Streams
description: This skill should be used when the user asks about "Effect Stream", "Stream.from", "Stream.map", "Stream.filter", "Stream.run", "streaming data", "async iteration", "Sink", "Channel", "Stream.concat", "Stream.merge", "backpressure", "Stream.fromIterable", "chunked processing", "real-time data", or needs to understand how Effect handles streaming data processing.
version: 1.0.0
---

# Streams in Effect

## Overview

Effect Streams provide:

- **Lazy evaluation** - Elements produced on demand
- **Resource safety** - Automatic cleanup
- **Backpressure** - Producer/consumer coordination
- **Composition** - Transform, filter, merge streams
- **Error handling** - Typed errors in stream pipeline

```typescript
Stream<A, E, R>
// Produces values of type A
// May fail with error E
// Requires environment R
```

## Creating Streams

### From Values

```typescript
import { Stream } from "effect"

const numbers = Stream.make(1, 2, 3, 4, 5)

const fromArray = Stream.fromIterable([1, 2, 3])

const empty = Stream.empty

const single = Stream.succeed(42)

const infinite = Stream.iterate(1, (n) => n + 1)
```

### From Effects

```typescript
const fromEffect = Stream.fromEffect(fetchData())

const polling = Stream.repeatEffect(checkStatus())

const scheduled = Stream.repeatEffectWithSchedule(
  checkStatus(),
  Schedule.spaced("5 seconds")
)
```

### From Async Sources

```typescript
// From async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(
  asyncGenerator(),
  (error) => new StreamError({ cause: error })
)

// From callback/event emitter
const fromCallback = Stream.async<number, never>((emit) => {
  const handler = (value: number) => emit.single(value)
  eventEmitter.on("data", handler)
  return Effect.sync(() => eventEmitter.off("data", handler))
})

// From queue
const fromQueue = Stream.fromQueue(queue)
```

### Generating Streams

```typescript
const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1]))

const range = Stream.range(1, 100)

const repeated = Stream.repeat(Stream.succeed("ping")).pipe(
  Stream.take(5)
)
```

## Transforming Streams

### map - Transform Elements

```typescript
const doubled = numbers.pipe(
  Stream.map((n) => n * 2)
)

const enriched = users.pipe(
  Stream.mapEffect((user) => fetchProfile(user.id))
)

const parallel = items.pipe(
  Stream.mapEffect(process, { concurrency: 10 })
)
```

### filter - Select Elements

```typescript
const evens = numbers.pipe(
  Stream.filter((n) => n % 2 === 0)
)

const valid = items.pipe(
  Stream.filterEffect((item) => validate(item))
)
```

### flatMap - Nested Streams

```typescript
const expanded = numbers.pipe(
  Stream.flatMap((n) => Stream.make(n, n * 10, n * 100))
)
// 1, 10, 100, 2, 20, 200, ...
```

### take/drop

```typescript
const first5 = numbers.pipe(Stream.take(5))
const skip5 = numbers.pipe(Stream.drop(5))
const firstWhile = numbers.pipe(Stream.takeWhile((n) => n < 10))
const dropWhile = numbers.pipe(Stream.dropWhile((n) => n < 10))
```

## Combining Streams

### concat - Sequential

```typescript
const combined = Stream.concat(stream1, stream2)
// or
const combined = stream1.pipe(Stream.concat(stream2))
```

### merge - Interleaved

```typescript
// Interleave elements from both
const merged = Stream.merge(stream1, stream2)

// Merge multiple
const allMerged = Stream.mergeAll([s1, s2, s3], { concurrency: 3 })
```

### zip - Pair Elements

```typescript
const zipped = Stream.zip(names, ages)
// Stream<[string, number]>

// With function
const combined = Stream.zipWith(
  names,
  ages,
  (name, age) => ({ name, age })
)
```

### interleave

```typescript
const interleaved = Stream.interleave(stream1, stream2)
// a1, b1, a2, b2, ...
```

## Consuming Streams

### Running to Collection

```typescript
const array = yield* Stream.runCollect(numbers)

const first = yield* Stream.runHead(numbers)

const sum = yield* Stream.runFold(
  numbers,
  0,
  (acc, n) => acc + n
)
```

### Running for Effects

```typescript
yield* numbers.pipe(
  Stream.runForEach((n) => Effect.log(`Got: ${n}`))
)

yield* numbers.pipe(Stream.runDrain)
```

### Running to Sink

```typescript
import { Sink } from "effect"

const sum = yield* numbers.pipe(
  Stream.run(Sink.sum)
)

const array = yield* numbers.pipe(
  Stream.run(Sink.collectAll())
)
```

## Chunking

Streams process elements in chunks for efficiency:

```typescript
const chunked = numbers.pipe(
  Stream.grouped(10)
)

const processed = numbers.pipe(
  Stream.mapChunks((chunk) => Chunk.map(chunk, (n) => n * 2))
)

const rechunked = numbers.pipe(
  Stream.rechunk(100)
)
```

## Error Handling

```typescript
const safe = stream.pipe(
  Stream.catchAll((error) => Stream.succeed(fallbackValue))
)

const handled = stream.pipe(
  Stream.catchTag("NetworkError", (error) =>
    Stream.succeed(cachedValue)
  )
)

const resilient = stream.pipe(
  Stream.retry(Schedule.exponential("1 second"))
)

const withFallback = stream.pipe(
  Stream.orElse(() => fallbackStream)
)
```

## Resource Management

```typescript
// Stream with resource lifecycle
const fileStream = Stream.acquireRelease(
  Effect.sync(() => fs.openSync("data.txt", "r")),
  (fd) => Effect.sync(() => fs.closeSync(fd))
).pipe(
  Stream.flatMap((fd) =>
    Stream.repeatEffectOption(
      Effect.sync(() => {
        const buffer = Buffer.alloc(1024)
        const bytes = fs.readSync(fd, buffer)
        return bytes > 0 ? Option.some(buffer.slice(0, bytes)) : Option.none()
      })
    )
  )
)

// Scoped streams
const scoped = Stream.scoped(
  Effect.acquireRelease(openConnection, closeConnection)
)
```

## Sinks

Sinks consume stream elements:

```typescript
import { Sink } from "effect"

Sink.sum
Sink.count
Sink.head
Sink.last
Sink.collectAll()
Sink.forEach(f)

const maxSink = Sink.foldLeft(
  Number.NEGATIVE_INFINITY,
  (max, n: number) => Math.max(max, n)
)
```

## Common Patterns

### Batched Processing

```typescript
const batchProcess = stream.pipe(
  Stream.grouped(100),
  Stream.mapEffect((batch) =>
    Effect.tryPromise(() => api.processBatch(Chunk.toArray(batch)))
  )
)
```

### Rate Limiting

```typescript
const rateLimited = stream.pipe(
  Stream.throttle({
    units: 1,
    duration: "100 millis",
    strategy: "shape"
  })
)
```

### Debouncing

```typescript
const debounced = stream.pipe(
  Stream.debounce("500 millis")
)
```

### Windowing

```typescript
// Time-based windows
const windows = stream.pipe(
  Stream.groupedWithin(1000, "1 second")
)
```

## Best Practices

1. **Use chunking for efficiency** - Batch operations when possible
2. **Handle backpressure** - Use appropriate buffer strategies
3. **Clean up resources** - Use acquireRelease for external resources
4. **Process in parallel** - Use concurrency option in mapEffect
5. **Handle errors early** - Catch/retry before final consumption

## Additional Resources

For comprehensive stream documentation, consult `${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt`.

Search for these sections:
- "Creating Streams" for stream construction
- "Consuming Streams" for running streams
- "Operations" for transformations
- "Error Handling in Streams" for error patterns
- "Resourceful Streams" for resource management
- "Sink" for custom sinks

Overview

This skill explains how Effect handles streaming data processing, including creating, transforming, composing, and consuming streams with resource safety and backpressure. It summarizes core primitives like Stream, Sink, Channel, and common operations such as map, filter, flatMap, concat, and merge. The guidance highlights chunking, error handling, and patterns for real-time and batched processing.

How this skill works

Streams produce elements lazily and on demand, carrying types for values, errors, and required environment (Stream<A, E, R>). They coordinate producer and consumer via backpressure and process elements in chunks for efficiency. Streams can be built from values, effects, async iterables, callbacks, or queues, and are consumed by running to collections, running for side effects, or piping into Sinks.

When to use it

  • When processing continuous or real-time data sources (events, sockets, sensors).
  • When you need resource-safe reading of files, network connections, or DB cursors.
  • When composing multiple asynchronous producers (merge, concat, zip).
  • When batching or chunked processing improves throughput or reduces latency.
  • When you must control concurrency, backpressure, or retry and error recovery.

Best practices

  • Batch work with grouped/rechunk to reduce per-element overhead and improve throughput.
  • Use acquireRelease or scoped streams to ensure external resources are opened and closed safely.
  • Apply backpressure and buffer strategies; prefer bounded queues to avoid unbounded memory growth.
  • Use mapEffect with concurrency options for parallel processing when side-effects are independent.
  • Catch, retry, or fallback earlier in the pipeline rather than at final consumption.

Example use cases

  • Consume a remote event stream, filter and enrich events, then write to a database sink with batching.
  • Poll an API on a schedule and stream status updates into a real-time dashboard with throttle or debounce.
  • Stream file contents safely using acquireRelease and process in fixed-size chunks for upload or parsing.
  • Merge telemetry streams from multiple sources and aggregate metrics with Sink.foldLeft or Sink.sum.
  • Implement backpressure-aware webhook handling using a queue-backed Stream and bounded concurrency.

FAQ

How do I create a stream from an async callback or event emitter?

Use Stream.async to register a handler that emits elements and returns a cleanup Effect to remove listeners.

When should I use merge vs concat?

Use concat to run streams sequentially (finish one then start the next). Use merge to interleave elements concurrently with configurable concurrency.

How can I handle errors without terminating the whole pipeline?

Use Stream.catchAll, catchTag, or orElse to convert errors into fallback values or fallback streams; use retry with a Schedule for transient failures.