home / skills / catlog22 / claude-code-workflow / flow-coordinator

flow-coordinator skill

/.claude/skills/flow-coordinator

This skill coordinates JSON-driven workflows by executing legacy templates or unified DAG-based flows with dynamic status tracking and parallel task execution.

npx playbooks add skill catlog22/claude-code-workflow --skill flow-coordinator

Review the files below or copy the command above to add this skill to your agents.

Files (18)
SKILL.md
19.6 KB
---
name: flow-coordinator
description: Template-driven workflow coordinator with minimal state tracking. Executes command chains from workflow templates OR unified PromptTemplate workflows. Supports slash-command and DAG-based execution. Triggers on "flow-coordinator", "workflow template", "orchestrate".
allowed-tools: Task, AskUserQuestion, Read, Write, Bash, Glob, Grep
---

# Flow Coordinator

Lightweight workflow coordinator supporting two workflow formats:
1. **Legacy Templates**: Command chains with slash-command execution
2. **Unified Workflows**: DAG-based PromptTemplate nodes (spec: `spec/unified-workflow-spec.md`)

## Specification Reference

- **Unified Workflow Spec**: @spec/unified-workflow-spec.md
- **Demo Workflow**: `ccw/data/flows/demo-unified-workflow.json`

## Architecture

```
User Task → Detect Format → Select Workflow → Init Status → Execute → Complete
                │                                              │
                ├─ Legacy Template                             │
                │   └─ Sequential cmd execution                │
                │                                              │
                └─ Unified Workflow                            │
                    └─ DAG traversal with contextRefs          │
                                                               │
     └──────────────── Resume (from status.json) ──────────────┘

Execution Modes:
  ├─ analysis     → Read-only, CLI --mode analysis
  ├─ write        → File changes, CLI --mode write
  ├─ mainprocess  → Blocking, synchronous
  └─ async        → Background, ccw cli
```

## Core Concepts

**Dual Format Support**:
- Legacy: `templates/*.json` with `cmd`, `args`, `execution`
- Unified: `ccw/data/flows/*.json` with `nodes`, `edges`, `contextRefs`

**Unified PromptTemplate Model**: All workflow steps are natural language instructions with:
- `instruction`: What to execute (natural language)
- `slashCommand`: Optional slash command name (e.g., "workflow:plan")
- `slashArgs`: Optional arguments for slash command (supports {{variable}})
- `outputName`: Name for output reference
- `contextRefs`: References to previous step outputs
- `tool`: Optional CLI tool (gemini/qwen/codex/claude)
- `mode`: Execution mode (analysis/write/mainprocess/async)

**DAG Execution**: Unified workflows execute as directed acyclic graphs with parallel branches and conditional edges.

**Dynamic Discovery**: Both formats discovered at runtime via Glob.

---

## Execution Flow

```javascript
async function execute(task) {
  // 1. Discover and select template
  const templates = await discoverTemplates();
  const template = await selectTemplate(templates);

  // 2. Init status
  const sessionId = `fc-${timestamp()}`;
  const statusPath = `.workflow/.flow-coordinator/${sessionId}/status.json`;
  const status = initStatus(template, task);
  write(statusPath, JSON.stringify(status, null, 2));

  // 3. Execute steps based on execution config
  await executeSteps(status, statusPath);
}

async function executeSteps(status, statusPath) {
  for (let i = status.current; i < status.steps.length; i++) {
    const step = status.steps[i];
    status.current = i;

    // Execute based on step mode (all steps use slash-command type)
    const execConfig = step.execution || { type: 'slash-command', mode: 'mainprocess' };

    if (execConfig.mode === 'async') {
      // Async execution - stop and wait for hook callback
      await executeSlashCommandAsync(step, status, statusPath);
      break;
    } else {
      // Mainprocess execution - continue immediately
      await executeSlashCommandSync(step, status);
      step.status = 'done';
      write(statusPath, JSON.stringify(status, null, 2));
    }
  }

  // All steps complete
  if (status.current >= status.steps.length) {
    status.complete = true;
    write(statusPath, JSON.stringify(status, null, 2));
  }
}
```

---

## Unified Workflow Execution

For workflows using the unified PromptTemplate format (`ccw/data/flows/*.json`):

```javascript
async function executeUnifiedWorkflow(workflow, task) {
  // 1. Initialize execution state
  const sessionId = `ufc-${timestamp()}`;
  const statusPath = `.workflow/.flow-coordinator/${sessionId}/status.json`;
  const state = {
    id: sessionId,
    workflow: workflow.id,
    goal: task,
    nodeStates: {},  // nodeId -> { status, result, error }
    outputs: {},     // outputName -> result
    complete: false
  };

  // 2. Topological sort for execution order
  const executionOrder = topologicalSort(workflow.nodes, workflow.edges);

  // 3. Execute nodes respecting DAG dependencies
  await executeDAG(workflow, executionOrder, state, statusPath);
}

async function executeDAG(workflow, order, state, statusPath) {
  for (const nodeId of order) {
    const node = workflow.nodes.find(n => n.id === nodeId);
    const data = node.data;

    // Check if all dependencies are satisfied
    if (!areDependenciesSatisfied(nodeId, workflow.edges, state)) {
      continue; // Will be executed when dependencies complete
    }

    // Build instruction from slashCommand or raw instruction
    let instruction = buildNodeInstruction(data, state.outputs);

    // Execute based on mode
    state.nodeStates[nodeId] = { status: 'running' };
    write(statusPath, JSON.stringify(state, null, 2));

    const result = await executeNode(instruction, data.tool, data.mode);

    // Store output for downstream nodes
    state.nodeStates[nodeId] = { status: 'completed', result };
    if (data.outputName) {
      state.outputs[data.outputName] = result;
    }
    write(statusPath, JSON.stringify(state, null, 2));
  }

  state.complete = true;
  write(statusPath, JSON.stringify(state, null, 2));
}

/**
 * Build node instruction from slashCommand or raw instruction
 * Handles slashCommand/slashArgs fields from frontend orchestrator
 */
function buildNodeInstruction(data, outputs) {
  const refs = data.contextRefs || [];

  // If slashCommand is set, construct instruction from it
  if (data.slashCommand) {
    // Resolve variables in slashArgs
    const args = data.slashArgs
      ? resolveContextRefs(data.slashArgs, refs, outputs)
      : '';

    // Build slash command instruction
    let instruction = `/${data.slashCommand}${args ? ' ' + args : ''}`;

    // Append additional instruction if provided
    if (data.instruction) {
      const additionalInstruction = resolveContextRefs(data.instruction, refs, outputs);
      instruction = `${instruction}\n\n${additionalInstruction}`;
    }

    return instruction;
  }

  // Fallback: use raw instruction with context refs resolved
  return resolveContextRefs(data.instruction || '', refs, outputs);
}

function resolveContextRefs(instruction, refs, outputs) {
  let resolved = instruction;
  for (const ref of refs) {
    const value = outputs[ref];
    const placeholder = `{{${ref}}}`;
    resolved = resolved.replace(new RegExp(placeholder, 'g'),
      typeof value === 'object' ? JSON.stringify(value) : String(value));
  }
  return resolved;
}

async function executeNode(instruction, tool, mode) {
  // Build CLI command based on tool and mode
  const cliTool = tool || 'gemini';
  const cliMode = mode === 'write' ? 'write' : 'analysis';

  if (mode === 'async') {
    // Background execution
    return Bash(
      `ccw cli -p "${escapePrompt(instruction)}" --tool ${cliTool} --mode ${cliMode}`,
      { run_in_background: true }
    );
  } else {
    // Synchronous execution
    return Bash(
      `ccw cli -p "${escapePrompt(instruction)}" --tool ${cliTool} --mode ${cliMode}`
    );
  }
}
```

### Unified Workflow Discovery

```javascript
async function discoverUnifiedWorkflows() {
  const files = Glob('*.json', { path: 'ccw/data/flows/' });

  const workflows = [];
  for (const file of files) {
    const content = JSON.parse(Read(file));
    // Detect unified format by checking for 'nodes' array
    if (content.nodes && Array.isArray(content.nodes)) {
      workflows.push({
        id: content.id,
        name: content.name,
        description: content.description,
        nodeCount: content.nodes.length,
        format: 'unified',
        file: file
      });
    }
  }
  return workflows;
}
```

### Format Detection

```javascript
function detectWorkflowFormat(content) {
  if (content.nodes && content.edges) {
    return 'unified';  // PromptTemplate DAG format
  }
  if (content.steps && content.steps[0]?.cmd) {
    return 'legacy';   // Command chain format
  }
  throw new Error('Unknown workflow format');
}
```

---

## Legacy Template Discovery

**Dynamic query** - never hardcode template list:

```javascript
async function discoverTemplates() {
  // Discover all JSON templates
  const files = Glob('*.json', { path: 'templates/' });

  // Parse each template
  const templates = [];
  for (const file of files) {
    const content = JSON.parse(Read(file));
    templates.push({
      name: content.name,
      description: content.description,
      steps: content.steps.map(s => s.cmd).join(' → '),
      file: file
    });
  }

  return templates;
}
```

---

## Template Selection

User chooses from discovered templates:

```javascript
async function selectTemplate(templates) {
  // Build options from discovered templates
  const options = templates.slice(0, 4).map(t => ({
    label: t.name,
    description: t.steps
  }));

  const response = await AskUserQuestion({
    questions: [{
      question: 'Select workflow template:',
      header: 'Template',
      options: options,
      multiSelect: false
    }]
  });

  // Handle "Other" - show remaining templates or custom input
  if (response.template === 'Other') {
    return await selectFromRemainingTemplates(templates.slice(4));
  }

  return templates.find(t => t.name === response.template);
}
```

---

## Status Schema

**Creation**: Copy template JSON → Update `id`, `template`, `goal`, set all steps `status: "pending"`

**Location**: `.workflow/.flow-coordinator/{session-id}/status.json`

**Core Fields**:
- `id`: Session ID (fc-YYYYMMDD-HHMMSS)
- `template`: Template name
- `goal`: User task description
- `current`: Current step index
- `steps[]`: Step array from template (with runtime `status`, `session`, `taskId`)
- `complete`: All steps done?

**Step Status**: `pending` → `running` → `done` | `failed` | `skipped`

---

## Extended Template Schema

**Templates stored in**: `templates/*.json` (discovered at runtime via Glob)

**TemplateStep Fields**:
- `cmd`: Skill name or command path (e.g., `workflow-lite-plan`, `workflow:debug-with-file`, `issue:discover`)
- `route?`: Sub-mode for multi-mode Skills (e.g., `lite-execute`, `plan-verify`, `test-cycle-execute`)
- `args?`: Arguments with `{{goal}}` and `{{prev}}` placeholders
- `unit?`: Minimum execution unit name (groups related commands)
- `optional?`: Can be skipped by user
- `execution`: Type and mode configuration
  - `type`: Always `'slash-command'` (invoked via Skill tool)
  - `mode`: `'mainprocess'` (blocking) or `'async'` (background)
- `contextHint?`: Natural language guidance for context assembly

**cmd 命名规则**:
- **Skills(已迁移)**: 使用连字符格式 Skill 名称,如 `workflow-lite-plan`、`review-cycle`
- **Commands(仍存在)**: 使用冒号格式命令路径,如 `workflow:brainstorm-with-file`、`issue:discover`

**route 字段**:
多模式 Skill 通过 `route` 区分子模式。同一 Skill 的不同步骤共享 `cmd`,通过 `route` 路由:
| Skill | 默认模式 (无 route) | route 值 |
|-------|-------------------|----------|
| `workflow-lite-plan` | lite-plan | `lite-execute` |
| `workflow-plan` | plan | `plan-verify`, `replan` |
| `workflow-test-fix` | test-fix-gen | `test-cycle-execute` |
| `workflow-tdd` | tdd-plan | `tdd-verify` |
| `review-cycle` | - | `session`, `module`, `fix` |

**Template Example**:
```json
{
  "name": "rapid",
  "steps": [
    {
      "cmd": "workflow-lite-plan",
      "args": "\"{{goal}}\"",
      "unit": "quick-implementation",
      "execution": { "type": "slash-command", "mode": "mainprocess" },
      "contextHint": "Create lightweight implementation plan"
    },
    {
      "cmd": "workflow-lite-plan",
      "route": "lite-execute",
      "args": "--in-memory",
      "unit": "quick-implementation",
      "execution": { "type": "slash-command", "mode": "async" },
      "contextHint": "Execute plan from previous step"
    }
  ]
}
```

---

## Execution Implementation

### Mainprocess Mode (Blocking)

```javascript
async function executeSlashCommandSync(step, status) {
  // Build Skill invocation args
  const args = buildSkillArgs(step, status);

  // Invoke via Skill tool: step.cmd is skill name or command path
  const result = await Skill({ skill: step.cmd, args: args });

  step.session = result.session_id;
  step.status = 'done';
  return result;
}
```

### Async Mode (Background)

```javascript
async function executeSlashCommandAsync(step, status, statusPath) {
  // Build prompt for ccw cli: /<cmd> [--route <route>] args + context
  const prompt = buildCommandPrompt(step, status);

  step.status = 'running';
  write(statusPath, JSON.stringify(status, null, 2));

  // Execute via ccw cli in background
  const taskId = Bash(
    `ccw cli -p "${escapePrompt(prompt)}" --tool claude --mode write`,
    { run_in_background: true }
  ).task_id;

  step.taskId = taskId;
  write(statusPath, JSON.stringify(status, null, 2));

  console.log(`Executing: ${step.cmd}${step.route ? ' --route ' + step.route : ''} (async)`);
  console.log(`Resume: /flow-coordinator --resume ${status.id}`);
}
```

---

## Prompt Building

Prompts are built in format: `/<cmd> [--route <route>] -y args` + context

```javascript
function buildCommandPrompt(step, status) {
  // step.cmd is skill name or command path
  let prompt = `/${step.cmd}`;

  // Add route for multi-mode Skills
  if (step.route) {
    prompt += ` --route ${step.route}`;
  }

  prompt += ' -y';

  // Add arguments (with placeholder replacement)
  if (step.args) {
    const args = step.args
      .replace('{{goal}}', status.goal)
      .replace('{{prev}}', getPreviousSessionId(status));
    prompt += ` ${args}`;
  }

  // Add context based on contextHint
  if (step.contextHint) {
    const context = buildContextFromHint(step.contextHint, status);
    prompt += `\n\nContext:\n${context}`;
  } else {
    // Default context: previous session IDs
    const previousContext = collectPreviousResults(status);
    if (previousContext) {
      prompt += `\n\nPrevious results:\n${previousContext}`;
    }
  }

  return prompt;
}

/**
 * Build args for Skill() invocation (mainprocess mode)
 */
function buildSkillArgs(step, status) {
  let args = '';

  // Add route for multi-mode Skills
  if (step.route) {
    args += `--route ${step.route} `;
  }

  args += '-y';

  // Add step arguments
  if (step.args) {
    const resolvedArgs = step.args
      .replace('{{goal}}', status.goal)
      .replace('{{prev}}', getPreviousSessionId(status));
    args += ` ${resolvedArgs}`;
  }

  return args;
}

function buildContextFromHint(hint, status) {
  // Parse contextHint instruction and build context accordingly
  return parseAndBuildContext(hint, status);
}
```

### Example Prompt Output

```
/workflow-lite-plan -y "Implement user registration"

Context:
Task: Implement user registration
Previous results:
- None (first step)
```

```
/workflow-lite-plan --route lite-execute -y --in-memory

Context:
Task: Implement user registration
Previous results:
- lite-plan: WFS-plan-20250130 (planning-context.md)
```

---

## User Interaction

### Step 1: Select Template

```
Select workflow template:

○ rapid      lite-plan → lite-execute → test-cycle-execute
○ coupled    plan → plan-verify → execute → review → test
○ bugfix     lite-plan --bugfix → lite-execute → test-cycle-execute
○ tdd        tdd-plan → execute → tdd-verify
○ Other      (more templates or custom)
```

### Step 2: Review Execution Plan

```
Template: coupled
Steps:
  1. workflow-plan (mainprocess)
  2. workflow-plan --route plan-verify (mainprocess)
  3. workflow-execute (async)
  4. review-cycle --route session (mainprocess)
  5. review-cycle --route fix (mainprocess)
  6. workflow-test-fix (mainprocess)
  7. workflow-test-fix --route test-cycle-execute (async)

Proceed? [Confirm / Cancel]
```

---

## Resume Capability

```javascript
async function resume(sessionId) {
  const statusPath = `.workflow/.flow-coordinator/${sessionId}/status.json`;
  const status = JSON.parse(Read(statusPath));

  // Find first incomplete step
  status.current = status.steps.findIndex(s => s.status !== 'done');
  if (status.current === -1) {
    console.log('All steps complete');
    return;
  }

  // Continue executing steps
  await executeSteps(status, statusPath);
}
```

---

## Available Templates

Templates discovered from `templates/*.json`:

| Template | Use Case | Steps |
|----------|----------|-------|
| rapid | Simple feature | workflow-lite-plan → workflow-lite-plan[lite-execute] → workflow-test-fix → workflow-test-fix[test-cycle-execute] |
| coupled | Complex feature | workflow-plan → workflow-plan[plan-verify] → workflow-execute → review-cycle[session] → review-cycle[fix] → workflow-test-fix → workflow-test-fix[test-cycle-execute] |
| bugfix | Bug fix | workflow-lite-plan --bugfix → workflow-lite-plan[lite-execute] → workflow-test-fix → workflow-test-fix[test-cycle-execute] |
| bugfix-hotfix | Urgent hotfix | workflow-lite-plan --hotfix |
| tdd | Test-driven | workflow-tdd → workflow-execute → workflow-tdd[tdd-verify] |
| test-fix | Fix failing tests | workflow-test-fix → workflow-test-fix[test-cycle-execute] |
| review | Code review | review-cycle[session] → review-cycle[fix] → workflow-test-fix → workflow-test-fix[test-cycle-execute] |
| multi-cli-plan | Multi-perspective planning | workflow-multi-cli-plan → workflow-lite-plan[lite-execute] → workflow-test-fix → workflow-test-fix[test-cycle-execute] |
| full | Complete workflow | brainstorm → workflow-plan → workflow-plan[plan-verify] → workflow-execute → workflow-test-fix → workflow-test-fix[test-cycle-execute] |
| docs | Documentation | workflow-lite-plan → workflow-lite-plan[lite-execute] |
| brainstorm | Exploration | workflow:brainstorm-with-file |
| debug | Debug with docs | workflow:debug-with-file |
| analyze | Collaborative analysis | workflow:analyze-with-file |
| issue | Issue workflow | issue:discover → issue:plan → issue:queue → issue:execute |
| rapid-to-issue | Plan to issue bridge | workflow-lite-plan → issue:convert-to-plan → issue:queue → issue:execute |
| brainstorm-to-issue | Brainstorm to issue | issue:from-brainstorm → issue:queue → issue:execute |

**注**: `[route]` 表示该步骤使用 `route` 字段路由到多模式 Skill 的特定子模式。

---

## Design Principles

1. **Minimal fields**: Only essential tracking data
2. **Flat structure**: No nested objects beyond steps array
3. **Step-level execution**: Each step defines how it's executed
4. **Resumable**: Any step can be resumed from status
5. **Human readable**: Clear JSON format

---

## Reference Documents

| Document | Purpose |
|----------|---------|
| spec/unified-workflow-spec.md | Unified PromptTemplate workflow specification |
| ccw/data/flows/*.json | Unified workflows (DAG format, dynamic discovery) |
| templates/*.json | Legacy workflow templates (command chain format) |

### Demo Workflows (Unified Format)

| File | Description | Nodes |
|------|-------------|-------|
| `demo-unified-workflow.json` | Auth implementation | 7 nodes: Analyze → Plan → Implement → Review → Tests → Report |
| `parallel-ci-workflow.json` | CI/CD pipeline | 8 nodes: Parallel checks → Merge → Conditional notify |
| `simple-analysis-workflow.json` | Analysis pipeline | 3 nodes: Explore → Analyze → Report |

Overview

This skill is a lightweight, template-driven workflow coordinator that executes command chains or unified PromptTemplate DAG workflows with minimal state tracking. It discovers workflows at runtime, initializes session status files, and runs steps either synchronously or in background async mode. It supports slash-command invocation, multi-tool CLI execution, and resumeable runs via persistent status.json files.

How this skill works

At runtime the coordinator discovers templates (legacy command chains) and unified workflows (PromptTemplate DAGs) via globbing. It initializes a session status file, topologically orders unified nodes when needed, and executes steps according to each step's execution mode (analysis, write, mainprocess, async). Outputs and node results are stored in the session state and used to resolve contextRefs and slashArgs for downstream steps.

When to use it

  • Orchestrating multi-step developer workflows that combine AI skills and CLI tools.
  • Running DAG-style PromptTemplate workflows that need context propagation between nodes.
  • Executing legacy sequential templates with slash-command driven skills.
  • Resuming long-running or background tasks where intermediate state must persist.
  • Switching between read-only analysis and write-enabled execution modes.

Best practices

  • Author unified workflows with explicit outputName and contextRefs to make data flows clear.
  • Prefer topologically valid DAGs to enable parallel branches and conditional edges.
  • Use execution.mode='async' for long-running steps and provide clear resume instructions.
  • Keep template args and contextHint concise to improve prompt assembly and reproducibility.
  • Store templates in designated folders (templates/ and ccw/data/flows/) for dynamic discovery.

Example use cases

  • Run a ‘rapid’ template that plans, executes, and resumes an implementation workflow with one CLI command.
  • Orchestrate a multi-agent code generation DAG where earlier nodes produce JSON outputs consumed by later nodes.
  • Kick off an async test-run step that runs in background and resumes the coordinator when complete.
  • Convert legacy command-chain templates into unified PromptTemplate workflows to enable parallelism and contextRefs.
  • Use mode=analysis to preview a workflow run without making file changes, then rerun in write mode to apply changes.

FAQ

How does resume work for async steps?

Async steps write task IDs and set step.status='running' in the session status.json; the coordinator can be resumed with the session id to check task completion and continue execution.

How are variables passed between nodes?

Unified workflows use contextRefs and outputName fields; placeholders like {{ref}} are resolved against state.outputs and injected into slashArgs or instructions.