Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for UnrestrictedStash #6325

Merged
merged 1 commit into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions docs/articles/actors/receive-actor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -790,12 +790,15 @@ static void Main(string[] args)

## Stash

The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox.
The `IWithStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.Become()` or `Context.Unbecome()`, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally.

Here is an example of the `IWithUnboundedStash` interface in action:
> [!NOTE]
> The interface `IWithStash` implements the marker interface `IRequiresMessageQueue<IDequeBasedMessageQueueSemantics>` which requests the system to automatically choose a deque-based mailbox implementation for the actor (defaults to an unbounded deque mailbox). If you want more control over the mailbox, see the documentation on mailboxes: [Mailboxes](xref:mailboxes).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


Here is an example of the `IWithStash` interface in action:

```csharp
public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash
public class ActorWithProtocol : ReceiveActor, IWithStash
{
public IStash Stash { get; set; }

Expand Down Expand Up @@ -827,11 +830,18 @@ public class ActorWithProtocol : ReceiveActor, IWithUnboundedStash
}
```

Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `Int`) of the mailbox's configuration.
Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `int`) of the mailbox's configuration.

Invoking `UnstashAll()` enqueues messages from the stash to the actor's mailbox until the capacity of the mailbox (if any) has been reached (note that messages from the stash are prepended to the mailbox). In case a bounded mailbox overflows, a `MessageQueueAppendFailedException` is thrown. The stash is guaranteed to be empty after calling `UnstashAll()`.

Note that the `stash` is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `IWithUnboundedStash` interface implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior.
Note that the stash is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property.

However, the `IWithStash` interface implementation of `PreRestart` will call `UnstashAll()`. This means that before the actor restarts, it will transfer all stashed messages back to the actor’s mailbox.

The result of this is that when an actor is restarted, any stashed messages will be delivered to the new incarnation of the actor. This is usually the desired behavior.

> [!NOTE]
> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `IWithUnboundedStash` interface instead.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


## Killing an Actor

Expand Down
20 changes: 15 additions & 5 deletions docs/articles/actors/untyped-actor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -708,12 +708,15 @@ static void Main(string[] args)

## Stash

The `IWithUnboundedStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.BecomeStacked()` or `Context.UnbecomeStacked()`;, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally. An actor that implements `IWithUnboundedStash` will automatically get a dequeue-based mailbox.
The `IWithStash` interface enables an actor to temporarily stash away messages that can not or should not be handled using the actor's current behavior. Upon changing the actor's message handler, i.e., right before invoking `Context.Become()` or `Context.Unbecome()`, all stashed messages can be "un-stashed", thereby prepending them to the actor's mailbox. This way, the stashed messages can be processed in the same order as they have been received originally.

Here is an example of the `IWithUnboundedStash` interface in action:
> [!NOTE]
> The interface `IWithStash` implements the marker interface `IRequiresMessageQueue<IDequeBasedMessageQueueSemantics>` which requests the system to automatically choose a deque-based mailbox implementation for the actor (defaults to an unbounded deque mailbox). If you want more control over the mailbox, see the documentation on mailboxes: [Mailboxes](xref:mailboxes).

Here is an example of the `IWithStash` interface in action:

```csharp
public class ActorWithProtocol : UntypedActor, IWithUnboundedStash
public class ActorWithProtocol : UntypedActor, IWithStash
{
public IStash Stash { get; set; }

Expand Down Expand Up @@ -748,11 +751,18 @@ public class ActorWithProtocol : UntypedActor, IWithUnboundedStash
}
```

Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `Int`) of the mailbox's configuration.
Invoking `Stash()` adds the current message (the message that the actor received last) to the actor's stash. It is typically invoked when handling the default case in the actor's message handler to stash messages that aren't handled by the other cases. It is illegal to stash the same message twice; to do so results in an `IllegalStateException` being thrown. The stash may also be bounded in which case invoking `Stash()` may lead to a capacity violation, which results in a `StashOverflowException`. The capacity of the stash can be configured using the stash-capacity setting (an `int`) of the mailbox's configuration.

Invoking `UnstashAll()` enqueues messages from the stash to the actor's mailbox until the capacity of the mailbox (if any) has been reached (note that messages from the stash are prepended to the mailbox). In case a bounded mailbox overflows, a `MessageQueueAppendFailedException` is thrown. The stash is guaranteed to be empty after calling `UnstashAll()`.

Note that the `stash` is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property. The `IWithUnboundedStash` interface implementation of `PreRestart` will call `UnstashAll()`, which is usually the desired behavior.
Note that the stash is part of the ephemeral actor state, unlike the mailbox. Therefore, it should be managed like other parts of the actor's state which have the same property.

However, the `IWithStash` interface implementation of `PreRestart` will call `UnstashAll()`. This means that before the actor restarts, it will transfer all stashed messages back to the actor’s mailbox.

The result of this is that when an actor is restarted, any stashed messages will be delivered to the new incarnation of the actor. This is usually the desired behavior.

> [!NOTE]
> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `IWithUnboundedStash` interface instead.

## Killing an Actor

Expand Down
1 change: 1 addition & 0 deletions docs/cSpell.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"CRDT",
"datacenter",
"denormalization",
"deque",
"deserialization",
"downstream",
"downstreams",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1178,14 +1178,15 @@ namespace Akka.Actor
void Become(Akka.Actor.UntypedReceive receive);
void BecomeStacked(Akka.Actor.UntypedReceive receive);
}
[System.ObsoleteAttribute("Bounded stashing is not yet implemented. Unbounded stashing will be used instead " +
"[0.7.0]")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
[System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IDequeBasedMessageQueueSemantics> { }
public interface IWithTimers
{
Akka.Actor.ITimerScheduler Timers { get; set; }
}
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is extend-only, so it's safe.

public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { }
public interface IWrappedMessage
{
object Message { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1180,14 +1180,15 @@ namespace Akka.Actor
void Become(Akka.Actor.UntypedReceive receive);
void BecomeStacked(Akka.Actor.UntypedReceive receive);
}
[System.ObsoleteAttribute("Bounded stashing is not yet implemented. Unbounded stashing will be used instead " +
"[0.7.0]")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
[System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IDequeBasedMessageQueueSemantics> { }
public interface IWithTimers
{
Akka.Actor.ITimerScheduler Timers { get; set; }
}
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { }
public interface IWrappedMessage
{
object Message { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1178,14 +1178,15 @@ namespace Akka.Actor
void Become(Akka.Actor.UntypedReceive receive);
void BecomeStacked(Akka.Actor.UntypedReceive receive);
}
[System.ObsoleteAttribute("Bounded stashing is not yet implemented. Unbounded stashing will be used instead " +
"[0.7.0]")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
[System.ObsoleteAttribute("Use `IWithStash` with a configured BoundedDeque-based mailbox instead.")]
public interface IWithBoundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IBoundedDequeBasedMessageQueueSemantics> { }
public interface IWithStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IDequeBasedMessageQueueSemantics> { }
public interface IWithTimers
{
Akka.Actor.ITimerScheduler Timers { get; set; }
}
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWithUnrestrictedStash : Akka.Actor.IActorStash { }
public interface IWrappedMessage
{
object Message { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ namespace Akka.Persistence
{
public static Akka.Persistence.DiscardToDeadLetterStrategy Instance { get; }
}
public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity
public abstract class Eventsourced : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>, Akka.Persistence.IPersistenceRecovery, Akka.Persistence.IPersistenceStash, Akka.Persistence.IPersistentIdentity
{
public static readonly System.Func<Akka.Actor.Envelope, bool> UnstashFilterPredicate;
protected Eventsourced() { }
Expand Down Expand Up @@ -270,7 +270,7 @@ namespace Akka.Persistence
{
Akka.Persistence.Recovery Recovery { get; }
}
public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
public interface IPersistenceStash : Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
{
Akka.Persistence.IStashOverflowStrategy InternalStashOverflowStrategy { get; }
}
Expand Down Expand Up @@ -844,7 +844,7 @@ namespace Akka.Persistence.Journal
protected System.Exception TryUnwrapException(System.Exception e) { }
protected abstract System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<System.Exception>> WriteMessagesAsync(System.Collections.Generic.IEnumerable<Akka.Persistence.AtomicWrite> messages);
}
public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
public abstract class AsyncWriteProxy : Akka.Persistence.Journal.AsyncWriteJournal, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
{
protected AsyncWriteProxy() { }
public Akka.Actor.IStash Stash { get; set; }
Expand Down Expand Up @@ -980,7 +980,7 @@ namespace Akka.Persistence.Journal
public System.Collections.Generic.IDictionary<string, System.Collections.Generic.LinkedList<Akka.Persistence.IPersistentRepresentation>> Update(string pid, long seqNr, System.Func<Akka.Persistence.IPersistentRepresentation, Akka.Persistence.IPersistentRepresentation> updater) { }
protected override System.Threading.Tasks.Task<System.Collections.Immutable.IImmutableList<System.Exception>> WriteMessagesAsync(System.Collections.Generic.IEnumerable<Akka.Persistence.AtomicWrite> messages) { }
}
public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
public class PersistencePluginProxy : Akka.Actor.ActorBase, Akka.Actor.IActorStash, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics>
{
public PersistencePluginProxy(Akka.Configuration.Config config) { }
public Akka.Actor.IStash Stash { get; set; }
Expand Down
Loading