home / skills / aaronontheweb / dotnet-skills / akka-best-practices

akka-best-practices skill

/skills/akka-best-practices

This skill helps .NET developers implement robust Akka.NET patterns for event streams, supervision, error handling, DI, and testable cluster abstractions.

npx playbooks add skill aaronontheweb/dotnet-skills --skill akka-best-practices

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
33.1 KB
---
name: akka-net-best-practices
description: Critical Akka.NET best practices including EventStream vs DistributedPubSub, supervision strategies, error handling, Props vs DependencyResolver, work distribution patterns, and cluster/local mode abstractions for testability.
invocable: false
---

# Akka.NET Best Practices

## When to Use This Skill

Use this skill when:
- Designing actor communication patterns
- Deciding between EventStream and DistributedPubSub
- Implementing error handling in actors
- Understanding supervision strategies
- Choosing between Props patterns and DependencyResolver
- Designing work distribution across nodes
- Creating testable actor systems that can run with or without cluster infrastructure
- Abstracting over Cluster Sharding for local testing scenarios

---

## 1. EventStream vs DistributedPubSub

### Critical: EventStream is LOCAL ONLY

`Context.System.EventStream` is **local to a single ActorSystem process**. It does NOT work across cluster nodes.

```csharp
// BAD: This only works on a single server
// When you add a second server, subscribers on server 2 won't receive events from server 1
Context.System.EventStream.Subscribe(Self, typeof(PostCreated));
Context.System.EventStream.Publish(new PostCreated(postId, authorId));
```

**When EventStream is appropriate:**
- Logging and diagnostics within a single process
- Local event bus for truly single-process applications
- Development/testing scenarios

### Use DistributedPubSub for Multi-Node

For events that must reach actors across multiple cluster nodes, use `Akka.Cluster.Tools.PublishSubscribe`:

```csharp
using Akka.Cluster.Tools.PublishSubscribe;

public class TimelineUpdatePublisher : ReceiveActor
{
    private readonly IActorRef _mediator;

    public TimelineUpdatePublisher()
    {
        // Get the DistributedPubSub mediator
        _mediator = DistributedPubSub.Get(Context.System).Mediator;

        Receive<PublishTimelineUpdate>(msg =>
        {
            // Publish to a topic - reaches all subscribers across all nodes
            _mediator.Tell(new Publish($"timeline:{msg.UserId}", msg.Update));
        });
    }
}

public class TimelineSubscriber : ReceiveActor
{
    public TimelineSubscriber(UserId userId)
    {
        var mediator = DistributedPubSub.Get(Context.System).Mediator;

        // Subscribe to user-specific topic
        mediator.Tell(new Subscribe($"timeline:{userId}", Self));

        Receive<TimelineUpdate>(update =>
        {
            // Handle the update - this works across cluster nodes
        });

        Receive<SubscribeAck>(ack =>
        {
            // Subscription confirmed
        });
    }
}
```

### Akka.Hosting Configuration for DistributedPubSub

```csharp
builder.WithDistributedPubSub(role: null); // Available on all roles, or specify a role
```

### Topic Design Patterns

| Pattern | Topic Format | Use Case |
|---------|--------------|----------|
| Per-user | `timeline:{userId}` | Timeline updates, notifications |
| Per-entity | `post:{postId}` | Post engagement updates |
| Broadcast | `system:announcements` | System-wide notifications |
| Role-based | `workers:rss-poller` | Work distribution |

---

## 2. Supervision Strategies

### Key Clarification: Supervision is for CHILDREN

A supervision strategy defined on an actor dictates **how that actor supervises its children**, NOT how the actor itself is supervised.

```csharp
public class ParentActor : ReceiveActor
{
    // This strategy applies to children of ParentActor, NOT to ParentActor itself
    protected override SupervisorStrategy SupervisorStrategy()
    {
        return new OneForOneStrategy(
            maxNrOfRetries: 10,
            withinTimeRange: TimeSpan.FromSeconds(30),
            decider: ex => ex switch
            {
                ArithmeticException => Directive.Resume,
                NullReferenceException => Directive.Restart,
                ArgumentException => Directive.Stop,
                _ => Directive.Escalate
            });
    }
}
```

### Default Supervision Strategy

The default `OneForOneStrategy` already includes rate limiting:
- **10 restarts within 1 second** = actor is permanently stopped
- This prevents infinite restart loops

**You rarely need a custom strategy** unless you have specific requirements.

### When to Define Custom Supervision

**Good reasons:**
- Actor throws exceptions indicating irrecoverable state corruption → Restart
- Actor throws exceptions that should NOT cause restart (expected failures) → Resume
- Child failures should affect siblings → Use `AllForOneStrategy`
- Need different retry limits than the default

**Bad reasons:**
- "Just to be safe" - the default is already safe
- Don't understand what the actor does - understand it first

### Example: When Custom Supervision Makes Sense

```csharp
public class RssFeedCoordinator : ReceiveActor
{
    protected override SupervisorStrategy SupervisorStrategy()
    {
        return new OneForOneStrategy(
            maxNrOfRetries: -1, // Unlimited retries
            withinTimeRange: TimeSpan.FromMinutes(1),
            decider: ex => ex switch
            {
                // HTTP timeout - transient, resume and let the actor retry via its own timer
                HttpRequestException => Directive.Resume,

                // Feed URL permanently invalid - stop this child, don't restart forever
                InvalidFeedUrlException => Directive.Stop,

                // Unknown error - restart to clear potentially corrupt state
                _ => Directive.Restart
            });
    }
}
```

---

## 3. Error Handling: Supervision vs Try-Catch

### When to Use Try-Catch (Most Cases)

**Use try-catch when:**
- The failure is **expected** (network timeout, invalid input, external service down)
- You know **exactly why** the exception occurred
- You can handle it **gracefully** (retry, return error response, log and continue)
- Restarting would **not help** (same error would occur again)

```csharp
public class RssFeedPollerActor : ReceiveActor
{
    public RssFeedPollerActor()
    {
        ReceiveAsync<PollFeed>(async msg =>
        {
            try
            {
                var feed = await _httpClient.GetStringAsync(msg.FeedUrl);
                var items = ParseFeed(feed);
                // Process items...
            }
            catch (HttpRequestException ex)
            {
                // Expected failure - log and schedule retry
                _log.Warning("Feed {Url} unavailable: {Error}", msg.FeedUrl, ex.Message);
                Context.System.Scheduler.ScheduleTellOnce(
                    TimeSpan.FromMinutes(5),
                    Self,
                    msg,
                    Self);
            }
            catch (XmlException ex)
            {
                // Invalid feed format - log and mark as bad
                _log.Error("Feed {Url} has invalid format: {Error}", msg.FeedUrl, ex.Message);
                Sender.Tell(new FeedPollResult.InvalidFormat(msg.FeedUrl));
            }
        });
    }
}
```

### When to Let Supervision Handle It

**Let exceptions propagate (trigger supervision) when:**
- You have **no idea** why the exception occurred
- The actor's **state might be corrupt**
- A **restart would help** (fresh state, reconnect resources)
- It's a **programming error** (NullReferenceException, InvalidOperationException from bad logic)

```csharp
public class OrderActor : ReceiveActor
{
    private OrderState _state;

    public OrderActor()
    {
        Receive<ProcessPayment>(msg =>
        {
            // If this throws, we have no idea why - let supervision restart us
            // A restart will reload state from persistence and might fix the issue
            var result = _state.ApplyPayment(msg.Amount);
            Persist(new PaymentApplied(msg.Amount), evt =>
            {
                _state = _state.With(evt);
            });
        });
    }
}
```

### Anti-Pattern: Swallowing Unknown Exceptions

```csharp
// BAD: Swallowing exceptions hides problems
public class BadActor : ReceiveActor
{
    public BadActor()
    {
        ReceiveAsync<DoWork>(async msg =>
        {
            try
            {
                await ProcessWork(msg);
            }
            catch (Exception ex)
            {
                // This hides all errors - you'll never know something is broken
                _log.Error(ex, "Error processing work");
                // Actor continues with potentially corrupt state
            }
        });
    }
}

// GOOD: Handle known exceptions, let unknown ones propagate
public class GoodActor : ReceiveActor
{
    public GoodActor()
    {
        ReceiveAsync<DoWork>(async msg =>
        {
            try
            {
                await ProcessWork(msg);
            }
            catch (HttpRequestException ex)
            {
                // Known, expected failure - handle gracefully
                _log.Warning("HTTP request failed: {Error}", ex.Message);
                Sender.Tell(new WorkResult.TransientFailure());
            }
            // Unknown exceptions propagate to supervision
        });
    }
}
```

---

## 4. Props vs DependencyResolver

### When to Use Plain Props

**Use `Props.Create()` when:**
- Actor doesn't need `IServiceProvider` or `IRequiredActor<T>`
- All dependencies can be passed via constructor
- Actor is simple and self-contained

```csharp
// Simple actor with no DI needs
public static Props Props(PostId postId, IPostWriteStore store)
    => Akka.Actor.Props.Create(() => new PostEngagementActor(postId, store));

// Usage
var actor = Context.ActorOf(PostEngagementActor.Props(postId, store), postId.ToString());
```

### When to Use DependencyResolver

**Use `resolver.Props<T>()` when:**
- Actor needs `IServiceProvider` to create scoped services
- Actor uses `IRequiredActor<T>` to get references to other actors
- Actor has many dependencies that are already in DI container

```csharp
// Actor that needs scoped database connections
public class OrderProcessorActor : ReceiveActor
{
    public OrderProcessorActor(IServiceProvider serviceProvider)
    {
        ReceiveAsync<ProcessOrder>(async msg =>
        {
            // Create a scope for this operation
            using var scope = serviceProvider.CreateScope();
            var dbContext = scope.ServiceProvider.GetRequiredService<OrderDbContext>();
            // Process order...
        });
    }
}

// Registration with DI
builder.WithActors((system, registry, resolver) =>
{
    var actor = system.ActorOf(resolver.Props<OrderProcessorActor>(), "order-processor");
    registry.Register<OrderProcessorActor>(actor);
});
```

### Remote Deployment Considerations

**You almost never need remote deployment.** Remote deployment means deploying an actor to run on a different node than the one creating it.

If you're not doing remote deployment (and you probably aren't):
- `Props.Create(() => new Actor(...))` with closures is fine
- The "serialization issue" warning doesn't apply

**When you would use remote deployment:**
- Distributing compute-intensive work to specific nodes
- Running actors on nodes with specific hardware (GPU, etc.)

For most applications, use **cluster sharding** instead of remote deployment - it handles distribution automatically.

---

## 5. Work Distribution Patterns

### Problem: Thundering Herd

When you have many background jobs (RSS feeds, email sending, etc.), don't process them all at once:

```csharp
// BAD: Polls all feeds simultaneously on startup
public class BadRssCoordinator : ReceiveActor
{
    public BadRssCoordinator(IRssFeedRepository repo)
    {
        ReceiveAsync<StartPolling>(async _ =>
        {
            var feeds = await repo.GetAllFeedsAsync();
            foreach (var feed in feeds) // 2000 feeds = 2000 simultaneous HTTP requests
            {
                Context.ActorOf(RssFeedPollerActor.Props(feed.Url));
            }
        });
    }
}
```

### Pattern 1: Database-Driven Work Queue

Use the database as a work queue with `FOR UPDATE SKIP LOCKED`:

```csharp
public class RssPollerWorker : ReceiveActor
{
    public RssPollerWorker(IRssFeedRepository repo)
    {
        ReceiveAsync<PollBatch>(async _ =>
        {
            // Each worker claims a batch - naturally distributes across nodes
            var feeds = await repo.ClaimFeedsForPollingAsync(
                batchSize: 10,
                staleAfter: TimeSpan.FromMinutes(10));

            foreach (var feed in feeds)
            {
                try
                {
                    await PollFeed(feed);
                    await repo.MarkPolledAsync(feed.Id, success: true);
                }
                catch (Exception ex)
                {
                    await repo.MarkPolledAsync(feed.Id, success: false, error: ex.Message);
                }
            }

            // Schedule next batch
            Context.System.Scheduler.ScheduleTellOnce(
                TimeSpan.FromSeconds(5),
                Self,
                PollBatch.Instance,
                Self);
        });
    }
}
```

```sql
-- ClaimFeedsForPollingAsync implementation
UPDATE rss_feeds
SET status = 'processing',
    processing_started_at = NOW()
WHERE id IN (
    SELECT id FROM rss_feeds
    WHERE status = 'pending'
      AND (next_poll_at IS NULL OR next_poll_at <= NOW())
    ORDER BY next_poll_at NULLS FIRST
    LIMIT @batchSize
    FOR UPDATE SKIP LOCKED
)
RETURNING *;
```

**Benefits:**
- Naturally distributes work across multiple server nodes
- No coordination needed - database handles locking
- Easy to monitor (query the table)
- Survives server restarts

### Pattern 2: Akka.Streams for Rate Limiting

Use Akka.Streams to throttle processing within a single node:

```csharp
public class ThrottledRssProcessor : ReceiveActor
{
    public ThrottledRssProcessor(IRssFeedRepository repo)
    {
        var materializer = Context.System.Materializer();

        ReceiveAsync<StartProcessing>(async _ =>
        {
            await Source.From(await repo.GetPendingFeedsAsync())
                .Throttle(10, TimeSpan.FromSeconds(1)) // Max 10 per second
                .SelectAsync(4, async feed => // Max 4 concurrent
                {
                    await PollFeed(feed);
                    return feed;
                })
                .RunWith(Sink.Ignore<RssFeed>(), materializer);
        });
    }
}
```

### Pattern 3: Durable Queue (Email Outbox Pattern)

For work that must be reliably processed, use a database-backed outbox:

```csharp
// Enqueue work transactionally with business operation
public async Task CreatePostAsync(Post post)
{
    await using var transaction = await _db.BeginTransactionAsync();

    await _postStore.CreateAsync(post);

    // Enqueue notification emails in same transaction
    foreach (var follower in await _followStore.GetFollowersAsync(post.AuthorId))
    {
        await _emailOutbox.EnqueueAsync(new EmailJob
        {
            To = follower.Email,
            Template = "new-post",
            Data = JsonSerializer.Serialize(new { PostId = post.Id })
        });
    }

    await transaction.CommitAsync();
}

// Worker processes outbox
public class EmailOutboxWorker : ReceiveActor
{
    public EmailOutboxWorker(IEmailOutboxStore outbox, IEmailSender sender)
    {
        ReceiveAsync<ProcessBatch>(async _ =>
        {
            var batch = await outbox.ClaimBatchAsync(10);
            foreach (var job in batch)
            {
                try
                {
                    await sender.SendAsync(job);
                    await outbox.MarkSentAsync(job.Id);
                }
                catch (Exception ex)
                {
                    await outbox.MarkFailedAsync(job.Id, ex.Message);
                }
            }
        });
    }
}
```

---

## 6. Common Mistakes Summary

| Mistake | Why It's Wrong | Fix |
|---------|----------------|-----|
| Using EventStream for cross-node pub/sub | EventStream is local only | Use DistributedPubSub |
| Defining supervision to "protect" an actor | Supervision protects children | Understand the hierarchy |
| Catching all exceptions | Hides bugs, corrupts state | Only catch expected errors |
| Always using DependencyResolver | Adds unnecessary complexity | Use plain Props when possible |
| Processing all background jobs at once | Thundering herd, resource exhaustion | Use database queue + rate limiting |
| Throwing exceptions for expected failures | Triggers unnecessary restarts | Return result types, use messaging |

---

## 7. Quick Reference

### Communication Pattern Decision Tree

```
Need to communicate between actors?
├── Same process only? → EventStream is fine
├── Across cluster nodes?
│   ├── Point-to-point? → Use ActorSelection or known IActorRef
│   └── Pub/sub? → Use DistributedPubSub
└── Fire-and-forget to external system? → Consider outbox pattern
```

### Error Handling Decision Tree

```
Exception occurred in actor?
├── Expected failure (HTTP timeout, invalid input)?
│   └── Try-catch, handle gracefully, continue
├── State might be corrupt?
│   └── Let supervision restart
├── Unknown cause?
│   └── Let supervision restart
└── Programming error (null ref, bad logic)?
    └── Let supervision restart, fix the bug
```

### Props Decision Tree

```
Creating actor Props?
├── Actor needs IServiceProvider?
│   └── Use resolver.Props<T>()
├── Actor needs IRequiredActor<T>?
│   └── Use resolver.Props<T>()
├── Simple actor with constructor params?
│   └── Use Props.Create(() => new Actor(...))
└── Remote deployment needed?
    └── Probably not - use cluster sharding instead
```

---

## 8. Cluster/Local Mode Abstractions

For applications that need to run both in clustered production environments and local/test environments without cluster infrastructure, use abstraction patterns to toggle between implementations.

### AkkaExecutionMode Enum

Define an execution mode that controls which implementations are used:

```csharp
/// <summary>
/// Determines how Akka.NET infrastructure features are configured.
/// </summary>
public enum AkkaExecutionMode
{
    /// <summary>
    /// Local test mode - no cluster infrastructure.
    /// Uses in-memory implementations for pub/sub and local parent actors
    /// instead of cluster sharding.
    /// </summary>
    LocalTest,

    /// <summary>
    /// Full cluster mode with sharding, singletons, and distributed pub/sub.
    /// </summary>
    Clustered
}
```

### GenericChildPerEntityParent - Local Sharding Alternative

When testing locally, you can't use Cluster Sharding. This actor mimics sharding behavior by creating child actors per entity ID using the same `IMessageExtractor` interface:

```csharp
/// <summary>
/// A local parent actor that mimics Cluster Sharding behavior.
/// Creates and manages child actors per entity ID using the same IMessageExtractor
/// that would be used with real sharding, enabling seamless switching between modes.
/// </summary>
public sealed class GenericChildPerEntityParent : ReceiveActor
{
    private readonly IMessageExtractor _extractor;
    private readonly Func<string, Props> _propsFactory;
    private readonly Dictionary<string, IActorRef> _children = new();
    private readonly ILoggingAdapter _log = Context.GetLogger();

    public GenericChildPerEntityParent(
        IMessageExtractor extractor,
        Func<string, Props> propsFactory)
    {
        _extractor = extractor;
        _propsFactory = propsFactory;

        ReceiveAny(msg =>
        {
            var entityId = _extractor.EntityId(msg);
            if (string.IsNullOrEmpty(entityId))
            {
                _log.Warning("Could not extract entity ID from message {0}", msg.GetType().Name);
                Unhandled(msg);
                return;
            }

            var child = GetOrCreateChild(entityId);

            // Unwrap the message if it's a ShardingEnvelope
            var unwrapped = _extractor.EntityMessage(msg);
            child.Forward(unwrapped);
        });
    }

    private IActorRef GetOrCreateChild(string entityId)
    {
        if (_children.TryGetValue(entityId, out var existing))
            return existing;

        var props = _propsFactory(entityId);
        var child = Context.ActorOf(props, entityId);
        Context.Watch(child);
        _children[entityId] = child;

        _log.Debug("Created child actor for entity {0}", entityId);
        return child;
    }

    protected override void PreRestart(Exception reason, object message)
    {
        // Don't stop children on restart
    }

    public static Props CreateProps(
        IMessageExtractor extractor,
        Func<string, Props> propsFactory)
    {
        return Props.Create(() => new GenericChildPerEntityParent(extractor, propsFactory));
    }
}
```

### IPubSubMediator - Abstracting DistributedPubSub

Create an interface to abstract over pub/sub so tests can use a local implementation:

```csharp
/// <summary>
/// Abstraction over pub/sub messaging that allows swapping between
/// DistributedPubSub (clustered) and local implementations (testing).
/// </summary>
public interface IPubSubMediator
{
    /// <summary>
    /// Subscribe an actor to a topic.
    /// </summary>
    void Subscribe(string topic, IActorRef subscriber);

    /// <summary>
    /// Unsubscribe an actor from a topic.
    /// </summary>
    void Unsubscribe(string topic, IActorRef subscriber);

    /// <summary>
    /// Publish a message to all subscribers of a topic.
    /// </summary>
    void Publish(string topic, object message);

    /// <summary>
    /// Send a message to one subscriber of a topic (load balanced).
    /// </summary>
    void Send(string topic, object message);
}
```

### LocalPubSubMediator - In-Memory Implementation

```csharp
/// <summary>
/// In-memory pub/sub implementation for local testing without cluster.
/// Uses the EventStream internally for simplicity.
/// </summary>
public sealed class LocalPubSubMediator : IPubSubMediator
{
    private readonly ActorSystem _system;
    private readonly ConcurrentDictionary<string, HashSet<IActorRef>> _subscriptions = new();
    private readonly object _lock = new();

    public LocalPubSubMediator(ActorSystem system)
    {
        _system = system;
    }

    public void Subscribe(string topic, IActorRef subscriber)
    {
        lock (_lock)
        {
            var subs = _subscriptions.GetOrAdd(topic, _ => new HashSet<IActorRef>());
            subs.Add(subscriber);
        }

        // Send acknowledgement like real DistributedPubSub does
        subscriber.Tell(new SubscribeAck(new Subscribe(topic, subscriber)));
    }

    public void Unsubscribe(string topic, IActorRef subscriber)
    {
        lock (_lock)
        {
            if (_subscriptions.TryGetValue(topic, out var subs))
            {
                subs.Remove(subscriber);
            }
        }

        subscriber.Tell(new UnsubscribeAck(new Unsubscribe(topic, subscriber)));
    }

    public void Publish(string topic, object message)
    {
        HashSet<IActorRef> subscribers;
        lock (_lock)
        {
            if (!_subscriptions.TryGetValue(topic, out var subs))
                return;
            subscribers = new HashSet<IActorRef>(subs);
        }

        foreach (var subscriber in subscribers)
        {
            subscriber.Tell(message);
        }
    }

    public void Send(string topic, object message)
    {
        IActorRef? target = null;
        lock (_lock)
        {
            if (_subscriptions.TryGetValue(topic, out var subs) && subs.Count > 0)
            {
                // Simple round-robin - pick first available
                target = subs.FirstOrDefault();
            }
        }

        target?.Tell(message);
    }
}
```

### ClusterPubSubMediator - Production Implementation

```csharp
/// <summary>
/// Production implementation wrapping Akka.Cluster.Tools.PublishSubscribe.
/// </summary>
public sealed class ClusterPubSubMediator : IPubSubMediator
{
    private readonly IActorRef _mediator;

    public ClusterPubSubMediator(ActorSystem system)
    {
        _mediator = DistributedPubSub.Get(system).Mediator;
    }

    public void Subscribe(string topic, IActorRef subscriber)
    {
        _mediator.Tell(new Subscribe(topic, subscriber));
    }

    public void Unsubscribe(string topic, IActorRef subscriber)
    {
        _mediator.Tell(new Unsubscribe(topic, subscriber));
    }

    public void Publish(string topic, object message)
    {
        _mediator.Tell(new Publish(topic, message));
    }

    public void Send(string topic, object message)
    {
        _mediator.Tell(new Send(topic, message, localAffinity: true));
    }
}
```

### Wiring It All Together

Configure your ActorSystem based on execution mode:

```csharp
public static class AkkaHostingExtensions
{
    public static AkkaConfigurationBuilder ConfigureActorSystem(
        this AkkaConfigurationBuilder builder,
        AkkaExecutionMode mode,
        IServiceCollection services)
    {
        if (mode == AkkaExecutionMode.Clustered)
        {
            builder
                .WithClustering()
                .WithShardRegion<MyEntity>(
                    "my-entity",
                    (system, registry, resolver) => entityId =>
                        resolver.Props<MyEntityActor>(entityId),
                    new MyEntityMessageExtractor(),
                    new ShardOptions())
                .WithDistributedPubSub();

            // Register cluster pub/sub mediator
            services.AddSingleton<IPubSubMediator>(sp =>
            {
                var system = sp.GetRequiredService<ActorSystem>();
                return new ClusterPubSubMediator(system);
            });
        }
        else // LocalTest mode
        {
            // Register local pub/sub mediator
            services.AddSingleton<IPubSubMediator>(sp =>
            {
                var system = sp.GetRequiredService<ActorSystem>();
                return new LocalPubSubMediator(system);
            });

            // Use GenericChildPerEntityParent instead of sharding
            builder.WithActors((system, registry, resolver) =>
            {
                var parent = system.ActorOf(
                    GenericChildPerEntityParent.CreateProps(
                        new MyEntityMessageExtractor(),
                        entityId => resolver.Props<MyEntityActor>(entityId)),
                    "my-entity");

                registry.Register<MyEntityParent>(parent);
            });
        }

        return builder;
    }
}
```

### Usage in Application Code

Application code uses the abstractions and doesn't need to know which mode is active:

```csharp
public class MyService
{
    private readonly IPubSubMediator _pubSub;
    private readonly IRequiredActor<MyEntityParent> _entityParent;

    public MyService(
        IPubSubMediator pubSub,
        IRequiredActor<MyEntityParent> entityParent)
    {
        _pubSub = pubSub;
        _entityParent = entityParent;
    }

    public async Task ProcessAsync(string entityId, MyCommand command)
    {
        // Works identically in both modes
        var parent = await _entityParent.GetAsync();
        parent.Tell(new ShardingEnvelope(entityId, command));

        // Publish event - works with both local and distributed pub/sub
        _pubSub.Publish($"entity:{entityId}", new EntityUpdated(entityId));
    }
}
```

### Benefits of This Pattern

| Benefit | Description |
|---------|-------------|
| **Fast unit tests** | No cluster startup overhead, tests run in milliseconds |
| **Identical message flow** | Same `IMessageExtractor`, same message types |
| **Easy debugging** | Local mode is simpler to step through |
| **Integration test flexibility** | Choose mode per test scenario |
| **Production confidence** | Abstractions are thin wrappers over real implementations |

### When to Use Each Mode

| Scenario | Recommended Mode |
|----------|------------------|
| Unit tests | LocalTest |
| Integration tests (single node) | LocalTest |
| Integration tests (multi-node) | Clustered |
| Local development | LocalTest or Clustered (your choice) |
| Production | Clustered |

---

## 9. Actor Logging

### Use ILoggingAdapter, Not ILogger<T>

In actors, use `ILoggingAdapter` from `Context.GetLogger()` instead of DI-injected `ILogger<T>`:

```csharp
public class MyActor : ReceiveActor
{
    private readonly ILoggingAdapter _log = Context.GetLogger();

    public MyActor()
    {
        Receive<MyMessage>(msg =>
        {
            // ✅ Akka.NET ILoggingAdapter with semantic logging (v1.5.57+)
            _log.Info("Processing message for user {UserId}", msg.UserId);
            _log.Error(ex, "Failed to process {MessageType}", msg.GetType().Name);
        });
    }
}
```

**Why ILoggingAdapter:**
- Integrates with Akka's logging pipeline and supervision
- Supports semantic/structured logging as of v1.5.57
- Method names: `Info()`, `Debug()`, `Warning()`, `Error()` (not `Log*` variants)
- No DI required - obtained directly from actor context

**Don't inject ILogger<T>:**

```csharp
// ❌ Don't inject ILogger<T> into actors
public class MyActor : ReceiveActor
{
    private readonly ILogger<MyActor> _logger; // Wrong!

    public MyActor(ILogger<MyActor> logger)
    {
        _logger = logger;
    }
}
```

### Semantic Logging (v1.5.57+)

As of Akka.NET v1.5.57, `ILoggingAdapter` supports semantic/structured logging with named placeholders:

```csharp
// Named placeholders for better log aggregation and querying
_log.Info("Order {OrderId} processed for customer {CustomerId}", order.Id, order.CustomerId);

// Prefer named placeholders over positional
// ✅ Good: {OrderId}, {CustomerId}
// ❌ Avoid: {0}, {1}
```

---

## 10. Managing Async Operations with CancellationToken

When actors launch async operations via `PipeTo`, those operations can outlive the actor if not properly managed. Use `CancellationToken` tied to the actor lifecycle.

### Actor-Scoped CancellationTokenSource

Cancel in-flight async work when the actor stops:

```csharp
public class DataSyncActor : ReceiveActor
{
    private CancellationTokenSource? _operationCts;

    public DataSyncActor()
    {
        ReceiveAsync<StartSync>(HandleStartSyncAsync);
    }

    protected override void PostStop()
    {
        // Cancel any in-flight async work when actor stops
        _operationCts?.Cancel();
        _operationCts?.Dispose();
        _operationCts = null;
        base.PostStop();
    }

    private Task HandleStartSyncAsync(StartSync cmd)
    {
        // Cancel any previous operation, create new CTS
        _operationCts?.Cancel();
        _operationCts?.Dispose();
        _operationCts = new CancellationTokenSource();
        var ct = _operationCts.Token;

        async Task<SyncResult> PerformSyncAsync()
        {
            try
            {
                ct.ThrowIfCancellationRequested();

                // Pass token to all async operations
                var data = await _repository.GetDataAsync(ct);
                await _service.ProcessAsync(data, ct);

                return new SyncResult(Success: true);
            }
            catch (OperationCanceledException) when (ct.IsCancellationRequested)
            {
                // Actor is stopping - graceful exit
                return new SyncResult(Success: false, "Cancelled");
            }
        }

        PerformSyncAsync().PipeTo(Self);
        return Task.CompletedTask;
    }
}
```

### Linked CTS for Per-Operation Timeouts

For external API calls that might hang, use linked CTS with short timeouts:

```csharp
private static readonly TimeSpan ApiTimeout = TimeSpan.FromSeconds(30);

async Task<SyncResult> PerformSyncAsync()
{
    // Check actor-level cancellation
    ct.ThrowIfCancellationRequested();

    // Per-operation timeout linked to actor's CTS
    SomeResult result;
    using (var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct))
    {
        opCts.CancelAfter(ApiTimeout);
        result = await _externalApi.FetchDataAsync(opCts.Token);
    }

    // Process result...
}
```

**How linked CTS works:**
- Inherits cancellation from parent (actor stop → cancels immediately)
- Adds its own timeout via `CancelAfter` (hung API → cancels after timeout)
- Whichever fires first wins
- Disposed after each operation (short-lived)

### Graceful Timeout vs Shutdown Handling

Distinguish between actor shutdown and operation timeout:

```csharp
try
{
    using var opCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
    opCts.CancelAfter(ApiTimeout);
    await _api.CallAsync(opCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
    // Timeout (not actor death) - can retry or handle gracefully
    _log.Warning("API call timed out, skipping item");
}
// If ct.IsCancellationRequested is true, let it propagate up
```

### Key Points

| Practice | Description |
|----------|-------------|
| **Actor CTS in PostStop** | Always cancel and dispose in `PostStop()` |
| **New CTS per operation** | Cancel previous before starting new work |
| **Pass token everywhere** | EF Core queries, HTTP calls, etc. all accept `CancellationToken` |
| **Linked CTS for timeouts** | External calls get short timeouts to prevent hanging |
| **Check in loops** | Call `ct.ThrowIfCancellationRequested()` between iterations |
| **Graceful handling** | Distinguish timeout vs shutdown in catch blocks |

### When to Use

- Any actor that launches async work via `PipeTo`
- Long-running operations (sync jobs, batch processing)
- External API calls that might hang
- Database operations in loops

Overview

This skill captures critical Akka.NET best practices for building reliable, testable actor systems. It focuses on choosing the right pub/sub mechanism, supervision and error-handling patterns, Props vs DependencyResolver trade-offs, robust work-distribution approaches, and techniques to make cluster features testable locally. The guidance is pragmatic and oriented toward real production scenarios.

How this skill works

The content explains when to use Context.System.EventStream versus DistributedPubSub, how supervision strategies apply to children, and when to handle exceptions inside actors versus letting supervision restart them. It outlines Props usage and when to inject IServiceProvider via DependencyResolver. It also presents scalable work distribution patterns (database-driven queues, Akka.Streams throttling) and approaches to abstract cluster sharding for local tests.

When to use it

  • Use EventStream only for single-process logging, diagnostics, or dev/testing scenarios.
  • Use DistributedPubSub when events must be delivered across multiple cluster nodes.
  • Handle expected, recoverable errors with try-catch inside actors; let unknown/state-corrupt errors bubble for supervision.
  • Prefer Props.Create for simple actors; use DependencyResolver when you need scoped DI or IRequiredActor<T>.
  • Use database-driven queues or Akka.Streams to avoid thundering-herd problems when polling many jobs.
  • Abstract cluster features so the same logic runs in both cluster and local test mode.

Best practices

  • Remember supervision directives apply to children of the actor declaring the strategy, not to the actor itself.
  • Don’t swallow unknown exceptions—handle known, expected failures and let unexpected ones trigger supervision.
  • Prefer cluster sharding over remote-deployment; remote deployment is rarely needed.
  • Use FOR UPDATE SKIP LOCKED in the DB to claim batches and naturally distribute work across nodes.
  • Throttle and limit concurrency with Akka.Streams to control local resource usage and avoid spikes.

Example use cases

  • Publish per-user timeline updates with DistributedPubSub topic format like `timeline:{userId}` across all nodes.
  • Make an RSS-polling fleet safe by having workers claim batches from a DB and mark progress, avoiding simultaneous requests.
  • Write an OrderProcessor actor that uses DependencyResolver to create a scoped DbContext for each message.
  • Let an actor restart on unknown state-corrupt exceptions while using try-catch for transient HttpRequestException retries.
  • Run the same sharding logic locally by replacing Cluster Sharding with a lightweight in-process abstraction for unit tests.

FAQ

Can I use EventStream across cluster nodes?

No. EventStream is local to an ActorSystem process. Use DistributedPubSub for cross-node publish/subscribe.

When should I define a custom supervision strategy?

Only when you have concrete reasons—different restart/stop/resume behavior for child failure modes or sibling impact; the default strategy is safe for most cases.

Is Props.Create unsafe if I later add clustering?

Props.Create with closures is fine unless you rely on remote deployment; for clustering, prefer cluster sharding rather than remote-deploying actors.