home / skills / martinffx / claude-code-atelier / temporal

temporal skill

/plugins/atelier-python/skills/temporal

This skill helps you design and orchestrate durable Python workflows with Temporal, enabling reliable activities, retries, and state management.

This is most likely a fork of the atelier-python-temporal skill from martinffx
npx playbooks add skill martinffx/claude-code-atelier --skill temporal

Review the files below or copy the command above to add this skill to your agents.

Files (3)
SKILL.md
2.9 KB
---
name: python:temporal
description: Temporal workflow orchestration in Python. Use when designing workflows, implementing activities, handling retries, managing workflow state, or building durable distributed systems.
user-invocable: false
---

# Temporal Workflow Orchestration

Temporal SDK patterns for building durable, distributed workflows in Python.

## Worker Setup

```python
from temporalio.client import Client
from temporalio.worker import Worker

async def main():
    client = await Client.connect("localhost:7233")

    worker = Worker(
        client,
        task_queue="my-task-queue",
        workflows=[MyWorkflow],
        activities=[my_activity],
    )

    await worker.run()
```

## Workflow Definition

```python
from temporalio import workflow
from datetime import timedelta

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        """Workflow run method"""
        # Execute activity
        result = await workflow.execute_activity(
            my_activity,
            name,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return f"Hello {result}"
```

## Activity Implementation

```python
from temporalio import activity

@activity.defn
async def my_activity(name: str) -> str:
    """Activity - can fail and retry"""
    # Do work (database, API, etc.)
    return name.upper()
```

## Starting Workflows

```python
from temporalio.client import Client

async def start_workflow():
    client = await Client.connect("localhost:7233")

    handle = await client.start_workflow(
        MyWorkflow.run,
        "World",
        id="my-workflow-id",
        task_queue="my-task-queue",
    )

    result = await handle.result()
    print(result)  # "Hello WORLD"
```

## Error Handling

```python
from temporalio.exceptions import ActivityError

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self) -> str:
        try:
            result = await workflow.execute_activity(
                risky_activity,
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=RetryPolicy(maximum_attempts=3),
            )
        except ActivityError as e:
            # Handle failure after retries exhausted
            return "Failed"

        return result
```

## Signals and Queries

```python
@workflow.defn
class OrderWorkflow:
    def __init__(self):
        self.status = "pending"

    @workflow.run
    async def run(self, order_id: str) -> str:
        await workflow.wait_condition(lambda: self.status == "approved")
        return "Order processed"

    @workflow.signal
    def approve(self):
        """Signal to approve order"""
        self.status = "approved"

    @workflow.query
    def get_status(self) -> str:
        """Query current status"""
        return self.status
```

See references/ for testing patterns and common workflow patterns.

Overview

This skill provides patterns and examples for building durable, distributed workflows with Temporal in Python. It covers worker setup, workflow and activity definitions, starting workflows, error handling, and using signals and queries to manage state. It’s focused on practical, production-ready orchestration tasks.

How this skill works

The skill demonstrates how to connect a Temporal client, run a Worker that hosts workflows and activities, and start workflow executions with durable handles. It shows activity invocation with timeouts and retry policies, workflow-level error handling, and how to expose signals and queries to control and observe running workflows. Examples use asyncio-friendly Temporal Python SDK primitives and common orchestration patterns.

When to use it

  • Designing long-running, stateful business processes that must survive crashes and restarts.
  • Coordinating distributed tasks with retries, timeouts, and durable state.
  • Implementing activities that call external services (databases, APIs) and require reliable execution.
  • Exposing live control paths with signals and observability via queries.
  • Managing workflow restarts, failure handling, and retry strategies in production.

Best practices

  • Keep activities idempotent and side-effect-safe to tolerate retries.
  • Use workflow state only for small, deterministic data; persist large payloads externally.
  • Set sensible timeouts and retry policies per activity based on external system SLAs.
  • Use signals for external triggers and queries for read-only visibility into workflow state.
  • Run workers with appropriate task queues and autoscaling to match load and throughput.

Example use cases

  • Order processing pipeline: approve via signal, wait for inventory, then charge payment.
  • ETL pipelines that orchestrate extract, transform, and load activities with retries.
  • Human-in-the-loop approvals where workflows pause and resume on external signals.
  • Scheduled jobs with durable retries and backoff for flaky third-party APIs.
  • Microservice choreography: coordinate multiple services with durable workflow state.

FAQ

How do I handle flaky external services?

Use activity retry policies with backoff and make activities idempotent so retries are safe. Consider circuit-breaker logic inside activities and external persistent storage for large results.

When should I use signals vs. workflow inputs?

Use inputs for initial parameters at start time. Use signals to change workflow state or trigger mid-flight behavior without restarting the workflow.