home / skills / andrelandgraf / fullstackrecipes / using-workflows

using-workflows skill

/.agents/skills/using-workflows

This skill helps you design and run durable workflows with steps, streaming, and agent execution for resilient, resumable automation.

npx playbooks add skill andrelandgraf/fullstackrecipes --skill using-workflows

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
7.2 KB
---
name: using-workflows
description: Create and run durable workflows with steps, streaming, and agent execution. Covers starting, resuming, and persisting workflow results.
---

# Working with Workflows

Create and run durable workflows with steps, streaming, and agent execution. Covers starting, resuming, and persisting workflow results.

## Working with Workflows

Create and run durable workflows with steps, streaming, and agent execution. Covers starting, resuming, and persisting workflow results.

**See:**

- Resource: `using-workflows` in Fullstack Recipes
- URL: https://fullstackrecipes.com/recipes/using-workflows

---

### Workflow Folder Structure

Each workflow has its own subfolder in `src/workflows/`:

```
src/workflows/
  steps/           # Shared step functions
    stream.ts      # UI message stream helpers
  chat/
    index.ts       # Workflow orchestration function ("use workflow")
    steps/         # Workflow-specific steps ("use step")
      history.ts
      logger.ts
      name-chat.ts
    types.ts       # Workflow-specific types
```

- **`workflows/steps/`** - Shared step functions reusable across workflows (e.g., stream helpers).
- **`index.ts`** - Contains the main workflow function with the `"use workflow"` directive. Orchestrates the workflow by calling step functions.
- **`steps/`** - Contains individual step functions with the `"use step"` directive. Each step is a durable checkpoint.
- **`types.ts`** - Type definitions for the workflow's UI messages.

---

### Creating a Workflow

Define workflows with the `"use workflow"` directive:

```typescript
// src/workflows/chat/index.ts
import { getWorkflowMetadata, getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";

export async function chatWorkflow({ chatId, userMessage }) {
  "use workflow";

  const { workflowRunId } = getWorkflowMetadata();

  // Persist user message
  await persistUserMessage({ chatId, message: userMessage });

  // Create assistant placeholder with runId for resumption
  const messageId = await createAssistantMessage({
    chatId,
    runId: workflowRunId,
  });

  // Get message history
  const history = await getMessageHistory(chatId);

  // Start the UI message stream
  await startStream(messageId);

  // Run agent with streaming
  const { parts } = await chatAgent.run(history, {
    maxSteps: 10,
    writable: getWritable(),
  });

  // Persist and finalize
  await persistMessageParts({ chatId, messageId, parts });

  // Finish the UI message stream
  await finishStream();

  await removeRunId(messageId);
}
```

### Starting a Workflow

Use the `start` function from `workflow/api`:

```typescript
import { start } from "workflow/api";
import { chatWorkflow } from "@/workflows/chat";

const run = await start(chatWorkflow, [{ chatId, userMessage }]);

// run.runId - unique identifier for this run
// run.readable - stream of UI message chunks
```

### Resuming a Workflow Stream

Use `getRun` to reconnect to an in-progress or completed workflow:

```typescript
import { getRun } from "workflow/api";

const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });
```

### Using Steps

Steps are durable checkpoints that persist their results:

```typescript
async function getMessageHistory(chatId: string) {
  "use step";

  const dbMessages = await getChatMessages(chatId);
  return convertDbMessagesToUIMessages(dbMessages);
}
```

---

### Streaming UIMessageChunks

When streaming `UIMessageChunk` responses to clients (e.g., chat messages), you must signal the start and end of the stream. This is required for proper stream framing with `WorkflowChatTransport`.

**Always call `startStream()` before `agent.run()` and `finishStream()` after:**

```typescript
import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";

export async function chatWorkflow({ chatId, messageId }) {
  "use workflow";

  const history = await getMessageHistory(chatId);

  // Signal stream start with the message ID
  await startStream(messageId);

  // Run agent - streams UIMessageChunks to the client
  const { parts } = await chatAgent.run(history, {
    maxSteps: 10,
    writable: getWritable(),
  });

  await persistMessageParts({ chatId, messageId, parts });

  // Signal stream end and close the writable
  await finishStream();
}
```

The stream step functions write `UIMessageChunk` messages:

- `startStream(messageId)` - Writes `{ type: "start", messageId }` to signal a new message
- `finishStream()` - Writes `{ type: "finish", finishReason: "stop" }` and closes the stream

Without these signals, the client's `WorkflowChatTransport` cannot properly parse the streamed response.

---

### Getting Workflow Metadata

Access the current run's metadata:

```typescript
import { getWorkflowMetadata } from "workflow";

export async function chatWorkflow({ chatId }) {
  "use workflow";

  const { workflowRunId } = getWorkflowMetadata();

  // Store runId for resumption
  await createAssistantMessage({ chatId, runId: workflowRunId });
}
```

### Workflow-Safe Logging

The workflow runtime doesn't support Node.js modules. Wrap logger calls in steps:

```typescript
// src/workflows/chat/steps/logger.ts
import { logger } from "@/lib/logging/logger";

export async function log(
  level: "info" | "warn" | "error" | "debug",
  message: string,
  data?: Record<string, unknown>,
): Promise<void> {
  "use step";

  if (data) {
    logger[level](data, message);
  } else {
    logger[level](message);
  }
}
```

### Running Agents in Workflows

Use the custom `Agent` class for full streaming control:

```typescript
import { getWritable } from "workflow";
import { startStream, finishStream } from "../steps/stream";
import { chatAgent } from "@/lib/ai/chat-agent";

export async function chatWorkflow({ chatId, userMessage }) {
  "use workflow";

  const messageId = await createAssistantMessage({ chatId, runId });
  const history = await getMessageHistory(chatId);

  await startStream(messageId);

  const { parts } = await chatAgent.run(history, {
    maxSteps: 10,
    writable: getWritable(),
  });

  await persistMessageParts({ chatId, messageId, parts });
  await finishStream();
}
```

### Persisting Workflow Results

Save agent output using step functions. The `assertChatAgentParts` function validates that generic `UIMessage["parts"]` (returned by agents) match your application's specific tool and data types:

```typescript
// src/workflows/chat/steps/history.ts
import type { UIMessage } from "ai";
import { insertMessageParts } from "@/lib/chat/queries";
import { assertChatAgentParts, type ChatAgentUIMessage } from "../types";

export async function persistMessageParts({
  chatId,
  messageId,
  parts,
}: {
  chatId: string;
  messageId: string;
  parts: UIMessage["parts"];
}): Promise<void> {
  "use step";

  assertChatAgentParts(parts);

  await insertMessageParts(chatId, messageId, parts);

  // Update chat timestamp
  await db
    .update(chats)
    .set({ updatedAt: new Date() })
    .where(eq(chats.id, chatId));
}
```

---

## References

- [Workflow Development Kit](https://useworkflow.dev/docs)
- [Workflow API Reference](https://useworkflow.dev/docs/api-reference)

Overview

This skill shows how to create and run durable TypeScript workflows with step-level checkpoints, streaming UI message chunks, and agent execution. It explains folder structure, starting and resuming runs, persisting results, and safe logging patterns for production-ready full‑stack AI apps. The content focuses on practical patterns for streaming chat, durable steps, and reconnectable runs.

How this skill works

Workflows are defined with a "use workflow" directive and orchestrate step functions ("use step") that act as durable checkpoints. Streams are framed by explicit startStream and finishStream steps so clients can parse UIMessageChunk sequences. Agents run inside workflows with a writable stream from getWritable(), and results are validated and persisted via step functions for safe resumption.

When to use it

  • Building chat apps that stream assistant responses to clients in real time.
  • Running long-running or resumable multi-step AI processes that need durability.
  • Coordinating agent-driven decision flows with persistent checkpoints.
  • Needing reconnectable runs so clients can resume in-progress streams.
  • Persisting structured agent outputs to your application database reliably.

Best practices

  • Organize each workflow in src/workflows/<name>/ with shared steps and types for clarity.
  • Wrap any external I/O or logging in "use step" functions so they run in the workflow runtime safely.
  • Always call startStream(messageId) before agent.run() and finishStream() after to frame the stream.
  • Use getWorkflowMetadata() to store runId with created records so runs can be resumed.
  • Validate agent output (e.g., assertChatAgentParts) before persisting to prevent schema drift.

Example use cases

  • A realtime chat assistant that streams partial messages to clients and stores final parts in the DB.
  • A multi-step form assistant that checkpoints progress so users can resume later.
  • An agent orchestration that runs tools sequentially and persists each tool result as a step.
  • Reconnect logic for mobile clients that lost connection: fetch run via getRun(runId) and replay stream.

FAQ

How do I reconnect to an in-progress workflow stream?

Call getRun(runId) and then run.getReadable({ startIndex }) to resume reading UIMessageChunks from the stored run.

What happens if I forget to call startStream or finishStream?

The client transport cannot parse the streamed response correctly. Always emit a start and a finish chunk to frame the stream.