home / skills / martinffx / claude-code-atelier / atelier-python-temporal

This skill helps you design and manage durable Python Temporal workflows, activities, and retries with practice-ready patterns.

npx playbooks add skill martinffx/claude-code-atelier --skill atelier-python-temporal

Review the files below or copy the command above to add this skill to your agents.

Files (3)
SKILL.md
2.9 KB
---
name: python:temporal
description: Temporal workflow orchestration in Python. Use when designing workflows, implementing activities, handling retries, managing workflow state, or building durable distributed systems.
user-invocable: false
---

# Temporal Workflow Orchestration

Temporal SDK patterns for building durable, distributed workflows in Python.

## Worker Setup

```python
from temporalio.client import Client
from temporalio.worker import Worker

async def main():
    client = await Client.connect("localhost:7233")

    worker = Worker(
        client,
        task_queue="my-task-queue",
        workflows=[MyWorkflow],
        activities=[my_activity],
    )

    await worker.run()
```

## Workflow Definition

```python
from temporalio import workflow
from datetime import timedelta

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        """Workflow run method"""
        # Execute activity
        result = await workflow.execute_activity(
            my_activity,
            name,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return f"Hello {result}"
```

## Activity Implementation

```python
from temporalio import activity

@activity.defn
async def my_activity(name: str) -> str:
    """Activity - can fail and retry"""
    # Do work (database, API, etc.)
    return name.upper()
```

## Starting Workflows

```python
from temporalio.client import Client

async def start_workflow():
    client = await Client.connect("localhost:7233")

    handle = await client.start_workflow(
        MyWorkflow.run,
        "World",
        id="my-workflow-id",
        task_queue="my-task-queue",
    )

    result = await handle.result()
    print(result)  # "Hello WORLD"
```

## Error Handling

```python
from temporalio.exceptions import ActivityError

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self) -> str:
        try:
            result = await workflow.execute_activity(
                risky_activity,
                start_to_close_timeout=timedelta(seconds=30),
                retry_policy=RetryPolicy(maximum_attempts=3),
            )
        except ActivityError as e:
            # Handle failure after retries exhausted
            return "Failed"

        return result
```

## Signals and Queries

```python
@workflow.defn
class OrderWorkflow:
    def __init__(self):
        self.status = "pending"

    @workflow.run
    async def run(self, order_id: str) -> str:
        await workflow.wait_condition(lambda: self.status == "approved")
        return "Order processed"

    @workflow.signal
    def approve(self):
        """Signal to approve order"""
        self.status = "approved"

    @workflow.query
    def get_status(self) -> str:
        """Query current status"""
        return self.status
```

See references/ for testing patterns and common workflow patterns.

Overview

This skill provides practical patterns and examples for building durable Temporal workflows in Python. It covers worker setup, workflow and activity definitions, starting workflows, error handling, signals, and queries to help design resilient distributed systems. Use it to implement long-running business logic with retries, state management, and external interaction.

How this skill works

The skill demonstrates how to connect a Temporal client to a worker, register workflows and activities, and run the worker loop. It shows idiomatic Python workflow and activity definitions using decorators, how to invoke activities with timeouts and retry policies, and how to start and await workflow results. It also covers signals and queries for external control and inspection of running workflows.

When to use it

  • Designing long-running or stateful business processes that must survive crashes and restarts
  • Implementing activities that call external services, databases, or perform IO with retry behavior
  • Coordinating multiple distributed tasks with durable state and event-driven transitions
  • Exposing workflow control paths via signals and runtime inspection via queries
  • Testing and debugging workflow logic locally before deploying to Temporal Server

Best practices

  • Keep workflow code deterministic: avoid non-deterministic calls and side effects inside workflows
  • Push blocking IO and external calls into activities; workflows should orchestrate, activities should perform work
  • Use explicit timeouts and RetryPolicy for activities to manage transient failures
  • Model state clearly and use signals to mutate workflow state safely from external systems
  • Use unique workflow IDs and task queues for clarity and idempotence during restarts

Example use cases

  • Order processing pipeline where approval comes from an external user via a signal, and queries report status
  • ETL job orchestration with retries for transient network or database failures handled by activities
  • Human-in-the-loop processes that wait for signals before advancing workflow steps
  • Microservice saga coordination with durable compensation steps implemented as activities
  • Scheduled or periodic workflows that perform recurring maintenance or reporting tasks

FAQ

How do I test workflows locally?

Run a local Temporal server or test harness, start workers in a test fixture, and use the client to execute workflows and assert results. Keep activities small and mock external dependencies where possible.

Where should I put retry logic?

Put retry policies on activities so retries are tied to external interactions. Workflow-level retries are typically for orchestration-level failures, but activity retries handle transient external errors.

Can workflows call blocking code?

No. Workflows must remain deterministic and non-blocking. Place blocking or IO work inside activities and call them from the workflow using execute_activity.

How do I signal a running workflow?

Use the client to send a signal to the workflow by ID or handle. Define @workflow.signal methods in the workflow class to mutate workflow state in a controlled, deterministic way.