home / skills / andrueandersoncs / claude-skill-effect-ts / concurrency

concurrency skill

/skills/concurrency

This skill helps you understand and apply concurrent and fiber-based execution in Effect, including all, race, fibers, queues, and synchronization.

npx playbooks add skill andrueandersoncs/claude-skill-effect-ts --skill concurrency

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
6.8 KB
---
name: Concurrency
description: This skill should be used when the user asks about "Effect concurrency", "fibers", "Fiber", "forking", "Effect.fork", "Effect.forkDaemon", "parallel execution", "Effect.all concurrency", "Deferred", "Queue", "PubSub", "Semaphore", "Latch", "fiber interruption", "Effect.race", "Effect.raceAll", "concurrent effects", or needs to understand how Effect handles parallel and concurrent execution.
version: 1.0.0
---

# Concurrency in Effect

## Overview

Effect provides lightweight fiber-based concurrency:

- **Fibers** - Lightweight threads managed by Effect runtime
- **Structured concurrency** - Parent fibers supervise children
- **Safe interruption** - Clean cancellation with resource cleanup
- **Concurrent primitives** - Queue, Deferred, Semaphore, PubSub

## Basic Parallel Execution

### Effect.all with Concurrency

```typescript
import { Effect } from "effect"

const results = yield* Effect.all(
  [fetchUser(1), fetchUser(2), fetchUser(3)],
  { concurrency: "unbounded" }
)

const results = yield* Effect.all(tasks, { concurrency: 5 })

const results = yield* Effect.all(tasks)
```

### Effect.forEach with Concurrency

```typescript
const users = yield* Effect.forEach(
  userIds,
  (id) => fetchUser(id),
  { concurrency: 10 }
)
```

## Fibers

### Creating Fibers with fork

```typescript
const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(longRunningTask)

  yield* doOtherWork()

  const result = yield* Fiber.join(fiber)
})
```

### Fork Variants

```typescript
const fiber = yield* Effect.fork(task)

const fiber = yield* Effect.forkDaemon(task)

const fiber = yield* Effect.forkIn(scope)(task)

const fiber = yield* Effect.forkWithErrorHandler(task, onError)
```

### Fiber Operations

```typescript
import { Fiber } from "effect"

const result = yield* Fiber.join(fiber)

const exit = yield* Fiber.await(fiber)

yield* Fiber.interrupt(fiber)

const maybeResult = yield* Fiber.poll(fiber)
```

## Racing

### Effect.race - First to Complete

```typescript
const fastest = yield* Effect.race(
  fetchFromServer1(),
  fetchFromServer2()
)
```

### Effect.raceAll - Race Many

```typescript
const fastest = yield* Effect.raceAll([
  fetchFromCDN1(),
  fetchFromCDN2(),
  fetchFromCDN3()
])
```

### Effect.raceFirst - Include Failures

```typescript
const first = yield* Effect.raceFirst(task1, task2)
```

## Deferred - One-Time Promise

```typescript
import { Deferred } from "effect"

const program = Effect.gen(function* () {
  const deferred = yield* Deferred.make<string, never>()

  const fiber = yield* Effect.fork(
    Effect.gen(function* () {
      const value = yield* Deferred.await(deferred)
      yield* Effect.log(`Got: ${value}`)
    })
  )

  yield* Deferred.succeed(deferred, "Hello!")

  yield* Fiber.join(fiber)
})
```

## Queue - Concurrent Queue

```typescript
import { Queue } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(100)

  yield* Effect.fork(
    Effect.forEach(
      [1, 2, 3, 4, 5],
      (n) => Queue.offer(queue, n)
    )
  )

  const items = yield* Effect.forEach(
    Array.from({ length: 5 }),
    () => Queue.take(queue)
  )
})
```

### Queue Variants

```typescript
const bounded = yield* Queue.bounded<number>(100)

const unbounded = yield* Queue.unbounded<number>()

const dropping = yield* Queue.dropping<number>(100)

const sliding = yield* Queue.sliding<number>(100)
```

## PubSub - Publish/Subscribe

```typescript
import { PubSub } from "effect"

const program = Effect.gen(function* () {
  const pubsub = yield* PubSub.bounded<string>(100)

  const sub1 = yield* PubSub.subscribe(pubsub)
  const sub2 = yield* PubSub.subscribe(pubsub)

  yield* PubSub.publish(pubsub, "Hello!")

  const msg1 = yield* Queue.take(sub1)
  const msg2 = yield* Queue.take(sub2)
})
```

## Semaphore - Limit Concurrency

```typescript
import { Effect } from "effect"

const program = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(3)

  yield* Effect.forEach(
    tasks,
    (task) => semaphore.withPermits(1)(task),
    { concurrency: "unbounded" }
  )
})
```

## Latch - Coordination Point

```typescript
import { Latch } from "effect"

const program = Effect.gen(function* () {
  const latch = yield* Latch.make(false)

  yield* Effect.fork(
    Effect.forEach(
      workers,
      (worker) =>
        Effect.gen(function* () {
          yield* Latch.await(latch)
          yield* worker.start()
        }),
      { concurrency: "unbounded" }
    )
  )

  yield* Latch.open(latch)
})
```

## Interruption

### Interrupting Fibers

```typescript
const fiber = yield* Effect.fork(longTask)

yield* Fiber.interrupt(fiber)
```

### Uninterruptible Regions

```typescript
const critical = Effect.uninterruptible(
  Effect.gen(function* () {
    yield* beginTransaction()
    yield* performOperations()
    yield* commitTransaction()
  })
)
```

### Interruptible Within Uninterruptible

```typescript
const program = Effect.uninterruptible(
  Effect.gen(function* () {
    yield* criticalSetup()

    // This part can be interrupted
    yield* Effect.interruptible(longOperation)

    yield* criticalTeardown()
  })
)
```

## Supervision

Structured concurrency ensures child fibers are managed:

```typescript
const parent = Effect.gen(function* () {
  const child1 = yield* Effect.fork(task1)
  const child2 = yield* Effect.fork(task2)

  // If parent fails/interrupts, children are interrupted
  yield* failingOperation()
})
// child1 and child2 automatically interrupted
```

### Daemon Fibers

Escape supervision with daemon:

```typescript
const daemon = yield* Effect.forkDaemon(backgroundTask)
```

## Common Patterns

### Timeout with Fallback

```typescript
const withTimeout = task.pipe(
  Effect.timeout("5 seconds"),
  Effect.map(Option.getOrElse(() => defaultValue))
)
```

### Worker Pool

```typescript
const workerPool = Effect.gen(function* () {
  const semaphore = yield* Effect.makeSemaphore(numWorkers)

  return (task: Effect.Effect<A>) =>
    semaphore.withPermits(1)(task)
})
```

### Parallel with Error Collection

```typescript
const results = yield* Effect.all(
  tasks,
  {
    concurrency: "unbounded",
    mode: "either" // Collect all results
  }
)
```

## Best Practices

1. **Use Effect.all concurrency** for simple parallelism
2. **Use Semaphore** to limit concurrent operations
3. **Prefer structured concurrency** over daemon fibers
4. **Handle interruption** in long-running effects
5. **Use Queue for producer/consumer** patterns
6. **Use Deferred for one-time coordination**

## Additional Resources

For comprehensive concurrency documentation, consult `${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt`.

Search for these sections:
- "Fibers" for fiber management
- "Basic Concurrency" for parallel execution
- "Deferred" for synchronization primitives
- "Queue" for concurrent queues
- "PubSub" for publish/subscribe
- "Semaphore" for concurrency limiting

Overview

This skill explains how Effect handles parallel and concurrent execution using lightweight fibers and concurrency primitives. It focuses on creating and supervising fibers, structured concurrency, safe interruption, and common primitives like Deferred, Queue, Semaphore, PubSub, and Latch. Use it to understand forking, racing, parallel combinators, and coordinating concurrent workflows.

How this skill works

The skill inspects Effect's concurrency primitives and combinators: Effect.fork variants, Effect.all/forEach with concurrency options, racing operators, and synchronization types (Deferred, Queue, Semaphore, PubSub, Latch). It explains fiber operations (join, await, interrupt, poll), supervision semantics, uninterruptible regions, and patterns for timeouts, worker pools, and error collection. Examples show typical usage and recommended patterns for safe parallelism and cancellation handling.

When to use it

  • When you need to run multiple Effect tasks in parallel (Effect.all, Effect.forEach).
  • When you want to spawn background work or supervised children (Effect.fork, forkDaemon).
  • When coordinating one-time or multi-consumer synchronization (Deferred, Queue, PubSub).
  • When limiting concurrency or building a worker pool (Semaphore).
  • When you need racing semantics or timeouts (Effect.race, Effect.raceAll, Effect.timeout).

Best practices

  • Prefer structured concurrency and supervised forks over daemon fibers for predictable lifecycle and cleanup.
  • Use Effect.all/forEach with explicit concurrency limits or semaphores to avoid resource exhaustion.
  • Design long-running operations to handle interruption; use uninterruptible regions only for critical setup/teardown.
  • Use Deferred for single-result handoff, Queue or PubSub for producer/consumer patterns, and Semaphore for limiting parallelism.
  • Collect errors intentionally (e.g., mode: "either") when you need to inspect failures across parallel tasks.

Example use cases

  • Parallel fetching of resources with Effect.all and a concurrency cap to avoid overload.
  • Spawning background workers with Effect.fork and joining results or interrupting on shutdown.
  • Producer/consumer pipeline using Queue: producers offer items, consumers take and process them concurrently.
  • One-time coordination with Deferred: waiter fiber blocks until a value is supplied by another fiber.
  • Rate-limited worker pool using Semaphore.withPermits to ensure a fixed number of concurrent workers.

FAQ

When should I use forkDaemon instead of fork?

Use forkDaemon when work must outlive the creating fiber and you explicitly want to opt out of supervision; prefer regular fork for structured concurrency and predictable cleanup.

How do I cancel a running fiber?

Call Fiber.interrupt(fiber) to request cancellation; design effects to be interruptible or wrap only critical sections with Effect.uninterruptible as needed.

How do I limit parallelism for many tasks?

Use Effect.all/forEach with a concurrency option, or wrap tasks with a Semaphore and use withPermits to cap simultaneous executions.