Skip to content

Commit

Permalink
Added support for UnrestrictedStash (akkadotnet#6325)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Jan 21, 2023
1 parent 8634a6e commit 35aaa62
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 45 deletions.
20 changes: 15 additions & 5 deletions docs/articles/actors/receive-actor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -790,12 +790,15 @@ static void Main(string[] args)

## Stash

The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox.
The `IWithStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.Become()` or `Context.Unbecome()`, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally.

Here is an example of the `IWithUnboundedStash` interface in action:
> [!NOTE]
> The interface `IWithStash` implements the marker interface `IRequiresMessageQueue<IDequeBasedMessageQueueSemantics>` which requests the system to automatically choose a deque-based mailbox implementation for the actor (defaults to an unbounded deque mailbox). If you want more control over the mailbox, see the documentation on mailboxes: [Mailboxes](xref:mailboxes).

Here is an example of the `IWithStash` interface in action:

```csharp
public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash
public class ActorWithProtocol : ReceiveActor, IWithStash
{
public IStash Stash { get; set; }

Expand Down Expand Up @@ -827,11 +830,18 @@ public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash
}
```

Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `Int`) of the mailbox's configuration.
Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `int`) of the mailbox's configuration.

Invoking `UnstashAll()` enqueues messages from the stash to the actor's mailbox until the capacity of the mailbox (if any) has been reached (note that messages from the stash are prepended to the mailbox). In case a bounded mailbox overflows, a `MessageQueueAppendFailedException` is thrown. The stash is guaranteed to be empty after calling `UnstashAll()`.

Note that the `stash` is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `IWithUnboundedStash` interface implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior.
Note that the stash is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property.

However, the `IWithStash` interface implementation of `PreRestart` will call `UnstashAll()`. This means that before the actor restarts, it will transfer all stashed messages back to the actors mailbox.

The result of this is that when an actor is restarted, any stashed messages will be delivered to the new incarnation of the actor. This is usually the desired behavior.

> [!NOTE]
> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `IWithUnboundedStash` interface instead.

## Killing an Actor

Expand Down
20 changes: 15 additions & 5 deletions docs/articles/actors/untyped-actor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -708,12 +708,15 @@ static void Main(string[] args)

## Stash

The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox.
The `IWithStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.Become()` or `Context.Unbecome()`, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally.

Here is an example of the `IWithUnboundedStash` interface in action:
> [!NOTE]
> The interface `IWithStash` implements the marker interface `IRequiresMessageQueue<IDequeBasedMessageQueueSemantics>` which requests the system to automatically choose a deque-based mailbox implementation for the actor (defaults to an unbounded deque mailbox). If you want more control over the mailbox, see the documentation on mailboxes: [Mailboxes](xref:mailboxes).
Here is an example of the `IWithStash` interface in action:

```csharp
public class ActorWithProtocol : UntypedActor, IWithUnboundedStash
public class ActorWithProtocol : UntypedActor, IWithStash
{
public IStash Stash { get; set; }

Expand Down Expand Up @@ -748,11 +751,18 @@ public class ActorWithProtocol : UntypedActor, IWithUnboundedStash
}
```

Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `Int`) of the mailbox's configuration.
Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `int`) of the mailbox's configuration.

Invoking `UnstashAll()` enqueues messages from the stash to the actor's mailbox until the capacity of the mailbox (if any) has been reached (note that messages from the stash are prepended to the mailbox). In case a bounded mailbox overflows, a `MessageQueueAppendFailedException` is thrown. The stash is guaranteed to be empty after calling `UnstashAll()`.

Note that the `stash` is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `IWithUnboundedStash` interface implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior.
Note that the stash is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property.

However, the `IWithStash` interface implementation of `PreRestart` will call `UnstashAll()`. This means that before the actor restarts, it will transfer all stashed messages back to the actor’s mailbox.

The result of this is that when an actor is restarted, any stashed messages will be delivered to the new incarnation of the actor. This is usually the desired behavior.

> [!NOTE]
> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `IWithUnboundedStash` interface instead.
## Killing an Actor

Expand Down
1 change: 1 addition & 0 deletions docs/cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"CRDT",
"datacenter",
"denormalization",
"deque",
"deserialization",
"downstream",
"downstreams",
Expand Down
9 changes: 5 additions & 4 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1175,14 +1175,15 @@ namespace Akka.Actor
void Become(Akka.Actor.UntypedReceive receive);
void BecomeStacked(Akka.Actor.UntypedReceive receive);
}
[System.ObsoleteAttribute("Bounded stashing is not yet implemented. Unbounded stashing will be used instead " +
"[0.7.0]")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
[System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IDequeBasedMessageQueueSemantics> { }
public interface IWithTimers
{
Akka.Actor.ITimerScheduler Timers { get; set; }
}
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { }
public interface IWrappedMessage
{
object Message { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ namespace Akka.Persistence
{
public static Akka.Persistence.DiscardToDeadLetterStrategy Instance { get; }
}
public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity
public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity
{
public static readonly System.Func<Akka.Actor.Envelope, bool> UnstashFilterPredicate;
protected Eventsourced() { }
Expand Down Expand Up @@ -270,7 +270,7 @@ namespace Akka.Persistence
{
Akka.Persistence.Recovery Recovery { get; }
}
public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
{
Akka.Persistence.IStashOverflowStrategy InternalStashOverflowStrategy { get; }
}
Expand Down Expand Up @@ -844,7 +844,7 @@ namespace Akka.Persistence.Journal
protected System.Exception TryUnwrapException(System.Exception e) { }
protected abstract System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<System.Exception>> WriteMessagesAsync(System.Collections.Generic.IEnumerable<Akka.Persistence.AtomicWrite> messages);
}
public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
{
protected AsyncWriteProxy() { }
public Akka.Actor.IStash Stash { get; set; }
Expand Down Expand Up @@ -980,7 +980,7 @@ namespace Akka.Persistence.Journal
public System.Collections.Generic.IDictionary<string, System.Collections.Generic.LinkedList<Akka.Persistence.IPersistentRepresentation>> Update(string pid, long seqNr, System.Func<Akka.Persistence.IPersistentRepresentation, Akka.Persistence.IPersistentRepresentation> updater) { }
protected override System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<System.Exception>> WriteMessagesAsync(System.Collections.Generic.IEnumerable<Akka.Persistence.AtomicWrite> messages) { }
}
public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
{
public PersistencePluginProxy(Akka.Configuration.Config config) { }
public Akka.Actor.IStash Stash { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ namespace Akka.Persistence.Sql.Common.Journal
public long Offset { get; }
public Akka.Actor.IActorRef ReplyTo { get; }
}
public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
public abstract class SqlJournal : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
{
protected SqlJournal(Akka.Configuration.Config journalConfig) { }
protected bool HasNewEventSubscribers { get; }
Expand Down Expand Up @@ -424,7 +424,7 @@ namespace Akka.Persistence.Sql.Common.Snapshot
public readonly System.DateTime Timestamp;
public SnapshotEntry(string persistenceId, long sequenceNr, System.DateTime timestamp, string manifest, object payload) { }
}
public abstract class SqlSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
public abstract class SqlSnapshotStore : Akka.Persistence.Snapshot.SnapshotStore, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
{
protected SqlSnapshotStore(Akka.Configuration.Config config) { }
protected Akka.Event.ILoggingAdapter Log { get; }
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka/Actor/Stash/IWithBoundedStash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ namespace Akka.Actor
/// <code>public IStash Stash { get; set; }</code>
/// </summary>
// ReSharper disable once InconsistentNaming
[Obsolete("Bounded stashing is not yet implemented. Unbounded stashing will be used instead [0.7.0]")]
public interface IWithBoundedStash : IActorStash, IRequiresMessageQueue<IBoundedDequeBasedMessageQueueSemantics>
[Obsolete("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")]
public interface IWithBoundedStash : IWithUnrestrictedStash, IRequiresMessageQueue<IBoundedDequeBasedMessageQueueSemantics>
{ }
}

37 changes: 37 additions & 0 deletions src/core/Akka/Actor/Stash/IWithStash.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//-----------------------------------------------------------------------
// <copyright file="IWithUnboundedStash.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Dispatch;

namespace Akka.Actor
{
/// <summary>
/// The `IWithStash` interface enables an actor to temporarily stash away messages that can not or
/// should not be handled using the actor's current behavior.
/// <para>
/// Note that the `IWithStash` interface can only be used together with actors that have a deque-based
/// mailbox. By default Stash based actors request a Deque based mailbox since the stash
/// interface extends <see cref="IRequiresMessageQueue{T}"/>.
/// </para>
/// You can override the default mailbox provided when `IDequeBasedMessageQueueSemantics` are requested via config:
/// <code>
/// akka.actor.mailbox.requirements {
/// "Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics" = your-custom-mailbox
/// }
/// </code>
/// Alternatively, you can add your own requirement marker to the actor and configure a mailbox type to be used
/// for your marker.
/// <para>
/// For a `Stash` that also enforces unboundedness of the deque see <see cref="IWithUnboundedStash"/>. For a `Stash`
/// that does not enforce any mailbox type see <see cref="IWithUnrestrictedStash"/>.
/// </para>
/// </summary>
public interface IWithStash : IWithUnrestrictedStash, IRequiresMessageQueue<IDequeBasedMessageQueueSemantics>
{
}
}

7 changes: 2 additions & 5 deletions src/core/Akka/Actor/Stash/IWithUnboundedStash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,10 @@
namespace Akka.Actor
{
/// <summary>
/// Lets the <see cref="StashFactory"/> know that this Actor needs stash support
/// with unrestricted storage capacity.
/// You need to add the property:
/// <code>public IStash Stash { get; set; }</code>
/// The `IWithUnboundedStash` interface is a version of <see cref="IActorStash"/> that enforces an unbounded stash for you actor.
/// </summary>
// ReSharper disable once InconsistentNaming
public interface IWithUnboundedStash : IActorStash, IRequiresMessageQueue<IUnboundedDequeBasedMessageQueueSemantics>
public interface IWithUnboundedStash : IWithUnrestrictedStash, IRequiresMessageQueue<IUnboundedDequeBasedMessageQueueSemantics>
{
}
}
Expand Down
19 changes: 19 additions & 0 deletions src/core/Akka/Actor/Stash/IWithUnrestrictedStash.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//-----------------------------------------------------------------------
// <copyright file="IWithUnrestrictedStash.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Dispatch;

namespace Akka.Actor
{
/// <summary>
/// A version of <see cref="IActorStash"/> that does not enforce any mailbox type. The proper mailbox has to be configured
/// manually, and the mailbox should extend the <see cref="IDequeBasedMessageQueueSemantics"/> marker interface.
/// </summary>
public interface IWithUnrestrictedStash : IActorStash
{
}
}
Loading

0 comments on commit 35aaa62

Please sign in to comment.