Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Actor: bounded IStash programmatic configuration #6661

Merged
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2d2a1d3
close #6658
Aaronontheweb Apr 21, 2023
f92060e
added basic stashing test cases
Aaronontheweb Apr 21, 2023
3593fd0
updated API approvals
Aaronontheweb Apr 21, 2023
ae257d1
Added `StashSize` to `Deploy`
Aaronontheweb Apr 21, 2023
e52e9bc
adding support for bounded stash sizes
Aaronontheweb Apr 21, 2023
1a9dcec
API approval
Aaronontheweb Apr 21, 2023
65c1c37
adding `Props` and `ActorOf` support for configuring stash size
Aaronontheweb Apr 24, 2023
fd3ad96
Merge branch 'dev' into fix-6658-boundedstash-config
Aaronontheweb Apr 24, 2023
73b20f3
added API approvals
Aaronontheweb Apr 24, 2023
9fa962e
ensured that `IWithUnboundedStash` can't be accidentally made bounded…
Aaronontheweb Apr 24, 2023
02397fe
added `stashSize` to protobuf wire format
Aaronontheweb Apr 24, 2023
6c0bbbf
rename `StashSize` to `StashCapacity`
Aaronontheweb Apr 24, 2023
42b5c24
renamed wire format from stashSize to stashCapacity
Aaronontheweb Apr 24, 2023
72600cc
added serialization support for `StashCapacity
Aaronontheweb Apr 24, 2023
60ba6d8
API approvals
Aaronontheweb Apr 24, 2023
a4edb9a
added documentation
Aaronontheweb Apr 24, 2023
f6f7a42
added bounded stashing api docs to untyped actor page
Aaronontheweb Apr 24, 2023
e335924
added comment explaining backwards compat handling for StashCapacity
Aaronontheweb Apr 24, 2023
152715a
fixed `DeploySurrogate` API
Aaronontheweb Apr 24, 2023
09402f7
fix `Deployer`
Aaronontheweb Apr 24, 2023
58581c1
fix `AbstractStash`
Aaronontheweb Apr 24, 2023
0265f66
fixed docs
Aaronontheweb Apr 24, 2023
a835cb4
Merge branch 'dev' into fix-6658-boundedstash-config
Aaronontheweb Apr 24, 2023
dd87180
Merge branch 'dev' into fix-6658-boundedstash-config
Aaronontheweb Apr 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions docs/articles/actors/receive-actor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,38 @@ The result of this is that when an actor is restarted, any stashed messages will
> [!NOTE]
> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `IWithUnboundedStash` interface instead.

### Bounded Stashes

In certain scenarios, it might be helpful to put a limit on the size of the `IStash` inside your actor. You can configure a bounded stash via the following actor definition:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented how to configure a bounded stash with both Props and akka.actor.deployment here.


```csharp
public class StashingActorWithOverflow : UntypedActor, IWithStash
```

The `IWithStash` interface will default to *unbounded* stash behavior, but the the `Props` class or via `akka.actor.deployment` we can easily configure this actor to impose a limit on its stash capacity:

```csharp
// create an actor with a stash size of 10
IActorRef stasher = Sys.ActorOf(Props.Create<StashingActorWithOverflow>().WithStashCapacity(10));
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
```

Or via HOCON:

```hocon
akka.actor.deployment{{
/configStashingActor {{
stash-capacity = 2
}}
}}
```

Either of these settings will configure the `IStash` to only have a maximum capacity of 2 items. If a third item is attempted to be stashed the `IStash` will throw a `StashOverflowException`.

> [!TIP]
> You can always check to see if your `IStash` is approaching its capacity by checking the `IStash.IsFull`, `IStash.Capacity`, or `IStash.Count` properties.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instructed users how to check whether or not the bounded stash is becoming full here.


If you attempt to apply a maximum stash capacity to an `IWithUnboundedStash` actor then the setting will be ignored.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caveat about applying this setting when using IWithUnboundedStash.


## Killing an Actor

You can kill an actor by sending a `Kill` message. This will cause the actor to throw a `ActorKilledException`, triggering a failure. The actor will suspend operation and its supervisor will be asked how to handle the failure, which may mean resuming the actor, restarting it or terminating it completely. See [What Supervision Means](xref:supervision#what-supervision-means) for more information.
Expand Down
32 changes: 32 additions & 0 deletions docs/articles/actors/untyped-actor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,38 @@ Note that the stash is part of the ephemeral actor state, unlike the mailbox. Th
> [!NOTE]
> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `UntypedActorWithUnboundedStash` class instead.

### Bounded Stashes
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy and paste from the ReceiveActor page - we should probably have a separate page on behavior-switching and stashing.


In certain scenarios, it might be helpful to put a limit on the size of the `IStash` inside your actor. You can configure a bounded stash via the following actor definition:

```csharp
public class StashingActorWithOverflow : UntypedActor, IWithStash
```

The `IWithStash` interface will default to *unbounded* stash behavior, but the the `Props` class or via `akka.actor.deployment` we can easily configure this actor to impose a limit on its stash capacity:

```csharp
// create an actor with a stash size of 10
IActorRef stasher = Sys.ActorOf(Props.Create<StashingActorWithOverflow>().WithStashCapacity(10));
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
```

Or via HOCON:

```hocon
akka.actor.deployment{{
/configStashingActor {{
stash-capacity = 2
}}
}}
```

Either of these settings will configure the `IStash` to only have a maximum capacity of 2 items. If a third item is attempted to be stashed the `IStash` will throw a `StashOverflowException`.

> [!TIP]
> You can always check to see if your `IStash` is approaching its capacity by checking the `IStash.IsFull`, `IStash.Capacity`, or `IStash.Count` properties.

If you attempt to apply a maximum stash capacity to an `IWithUnboundedStash` actor then the setting will be ignored.

## Killing an Actor

You can kill an actor by sending a `Kill` message. This will cause the actor to throw a `ActorKilledException`, triggering a failure. The actor will suspend operation and its supervisor will be asked how to handle the failure, which may mean resuming the actor, restarting it or terminating it completely. See [What Supervision Means](xref:supervision#what-supervision-means) for more information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ namespace Akka.Actor
public static readonly string NoDispatcherGiven;
public static readonly string NoMailboxGiven;
public static readonly Akka.Actor.Scope NoScopeGiven;
public const int NoStashSize = -1;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This what IStash.Capacity has historically defaulted to when working with unbounded stashes - all I did was turn it into a constant.

public static readonly Akka.Actor.Deploy None;
public Deploy() { }
public Deploy(string path, Akka.Actor.Scope scope) { }
Expand All @@ -629,19 +630,22 @@ namespace Akka.Actor
public Deploy(Akka.Routing.RouterConfig routerConfig) { }
public Deploy(string path, Akka.Configuration.Config config, Akka.Routing.RouterConfig routerConfig, Akka.Actor.Scope scope, string dispatcher) { }
public Deploy(string path, Akka.Configuration.Config config, Akka.Routing.RouterConfig routerConfig, Akka.Actor.Scope scope, string dispatcher, string mailbox) { }
public Deploy(string path, Akka.Configuration.Config config, Akka.Routing.RouterConfig routerConfig, Akka.Actor.Scope scope, string dispatcher, string mailbox, int stashCapacity) { }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New constructor overload - previous are unchanged.

public Akka.Configuration.Config Config { get; }
public string Dispatcher { get; }
public string Mailbox { get; }
public string Path { get; }
public Akka.Routing.RouterConfig RouterConfig { get; }
public Akka.Actor.Scope Scope { get; }
public int StashCapacity { get; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New read-only property for StashCapacity.

public bool Equals(Akka.Actor.Deploy other) { }
public Akka.Util.ISurrogate ToSurrogate(Akka.Actor.ActorSystem system) { }
public virtual Akka.Actor.Deploy WithDispatcher(string dispatcher) { }
public virtual Akka.Actor.Deploy WithFallback(Akka.Actor.Deploy other) { }
public virtual Akka.Actor.Deploy WithMailbox(string mailbox) { }
public virtual Akka.Actor.Deploy WithRouterConfig(Akka.Routing.RouterConfig routerConfig) { }
public virtual Akka.Actor.Deploy WithScope(Akka.Actor.Scope scope) { }
public virtual Akka.Actor.Deploy WithStashCapacity(int stashSize) { }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New overrideable method for copying with the updated StashCapacity

public class DeploySurrogate : Akka.Util.ISurrogate
{
public DeploySurrogate() { }
Expand All @@ -651,6 +655,7 @@ namespace Akka.Actor
public string Path { get; set; }
public Akka.Routing.RouterConfig RouterConfig { get; set; }
public Akka.Actor.Scope Scope { get; set; }
public int StashSize { get; set; }
public Akka.Util.ISurrogated FromSurrogate(Akka.Actor.ActorSystem system) { }
}
}
Expand Down Expand Up @@ -1474,6 +1479,7 @@ namespace Akka.Actor
public Akka.Actor.Props WithDispatcher(string dispatcher) { }
public Akka.Actor.Props WithMailbox(string mailbox) { }
public Akka.Actor.Props WithRouter(Akka.Routing.RouterConfig routerConfig) { }
public Akka.Actor.Props WithStashCapacity(int stashCapacity) { }
public Akka.Actor.Props WithSupervisorStrategy(Akka.Actor.SupervisorStrategy supervisorStrategy) { }
public class PropsSurrogate : Akka.Util.ISurrogate
{
Expand Down Expand Up @@ -1938,7 +1944,7 @@ namespace Akka.Actor.Internal
public abstract class AbstractStash : Akka.Actor.IStash
{
protected AbstractStash(Akka.Actor.IActorContext context) { }
public int Capacity { get; }
public virtual int Capacity { get; }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AbstractStash.Capacity is virtual so it can be overridden and made constant inside UnboundedStashImpl.

public int Count { get; }
public bool IsEmpty { get; }
public bool IsFull { get; }
Expand Down Expand Up @@ -2144,6 +2150,7 @@ namespace Akka.Actor.Internal
public class UnboundedStashImpl : Akka.Actor.Internal.AbstractStash
{
public UnboundedStashImpl(Akka.Actor.IActorContext context) { }
public override int Capacity { get; }
}
}
namespace Akka.Actor.Setup
Expand Down
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same API changes as the previous file.

Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ namespace Akka.Actor
public static readonly string NoDispatcherGiven;
public static readonly string NoMailboxGiven;
public static readonly Akka.Actor.Scope NoScopeGiven;
public const int NoStashSize = -1;
public static readonly Akka.Actor.Deploy None;
public Deploy() { }
public Deploy(string path, Akka.Actor.Scope scope) { }
Expand All @@ -629,19 +630,22 @@ namespace Akka.Actor
public Deploy(Akka.Routing.RouterConfig routerConfig) { }
public Deploy(string path, Akka.Configuration.Config config, Akka.Routing.RouterConfig routerConfig, Akka.Actor.Scope scope, string dispatcher) { }
public Deploy(string path, Akka.Configuration.Config config, Akka.Routing.RouterConfig routerConfig, Akka.Actor.Scope scope, string dispatcher, string mailbox) { }
public Deploy(string path, Akka.Configuration.Config config, Akka.Routing.RouterConfig routerConfig, Akka.Actor.Scope scope, string dispatcher, string mailbox, int stashCapacity) { }
public Akka.Configuration.Config Config { get; }
public string Dispatcher { get; }
public string Mailbox { get; }
public string Path { get; }
public Akka.Routing.RouterConfig RouterConfig { get; }
public Akka.Actor.Scope Scope { get; }
public int StashCapacity { get; }
public bool Equals(Akka.Actor.Deploy other) { }
public Akka.Util.ISurrogate ToSurrogate(Akka.Actor.ActorSystem system) { }
public virtual Akka.Actor.Deploy WithDispatcher(string dispatcher) { }
public virtual Akka.Actor.Deploy WithFallback(Akka.Actor.Deploy other) { }
public virtual Akka.Actor.Deploy WithMailbox(string mailbox) { }
public virtual Akka.Actor.Deploy WithRouterConfig(Akka.Routing.RouterConfig routerConfig) { }
public virtual Akka.Actor.Deploy WithScope(Akka.Actor.Scope scope) { }
public virtual Akka.Actor.Deploy WithStashCapacity(int stashSize) { }
public class DeploySurrogate : Akka.Util.ISurrogate
{
public DeploySurrogate() { }
Expand All @@ -651,6 +655,7 @@ namespace Akka.Actor
public string Path { get; set; }
public Akka.Routing.RouterConfig RouterConfig { get; set; }
public Akka.Actor.Scope Scope { get; set; }
public int StashSize { get; set; }
public Akka.Util.ISurrogated FromSurrogate(Akka.Actor.ActorSystem system) { }
}
}
Expand Down Expand Up @@ -1472,6 +1477,7 @@ namespace Akka.Actor
public Akka.Actor.Props WithDispatcher(string dispatcher) { }
public Akka.Actor.Props WithMailbox(string mailbox) { }
public Akka.Actor.Props WithRouter(Akka.Routing.RouterConfig routerConfig) { }
public Akka.Actor.Props WithStashCapacity(int stashCapacity) { }
public Akka.Actor.Props WithSupervisorStrategy(Akka.Actor.SupervisorStrategy supervisorStrategy) { }
public class PropsSurrogate : Akka.Util.ISurrogate
{
Expand Down Expand Up @@ -1936,7 +1942,7 @@ namespace Akka.Actor.Internal
public abstract class AbstractStash : Akka.Actor.IStash
{
protected AbstractStash(Akka.Actor.IActorContext context) { }
public int Capacity { get; }
public virtual int Capacity { get; }
public int Count { get; }
public bool IsEmpty { get; }
public bool IsFull { get; }
Expand Down Expand Up @@ -2142,6 +2148,7 @@ namespace Akka.Actor.Internal
public class UnboundedStashImpl : Akka.Actor.Internal.AbstractStash
{
public UnboundedStashImpl(Akka.Actor.IActorContext context) { }
public override int Capacity { get; }
}
}
namespace Akka.Actor.Setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void Serialization_must_serialize_and_deserialize_DaemonMsgCreate_with_De
ConfigurationFactory.ParseString("a=1"),
new RoundRobinPool(5, null, supervisorStrategy, null),
new RemoteScope(new Address("akka", "Test", "host1", 1921)),
"mydispatcher");
"mydispatcher").WithStashCapacity(10);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reproduce whether or not StashCapacity was made serializable correctly for remote deployments - this test failed until I updated the DaemonMsgCreateSerializer.

var deploy2 = new Deploy("path2",
ConfigurationFactory.ParseString("a=2"),
FromConfig.Instance,
Expand Down
31 changes: 18 additions & 13 deletions src/core/Akka.Remote/Serialization/DaemonMsgCreateSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public override byte[] ToBinary(object obj)
return message.ToByteArray();
}

throw new ArgumentException($"Can't serialize a non-DaemonMsgCreate message using DaemonMsgCreateSerializer [{obj.GetType()}]");
throw new ArgumentException(
$"Can't serialize a non-DaemonMsgCreate message using DaemonMsgCreateSerializer [{obj.GetType()}]");
}

/// <inheritdoc />
Expand Down Expand Up @@ -105,15 +106,15 @@ private Proto.Msg.DeployData DeployToProto(Deploy deploy)
{
var deployBuilder = new Proto.Msg.DeployData();
deployBuilder.Path = deploy.Path;

deployBuilder.StashCapacity = deploy.StashCapacity;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass in StashCapacity into the wire format during remote deployments.

{
var tuple = Serialize(deploy.Config);
deployBuilder.ConfigSerializerId = tuple.Item1;
deployBuilder.ConfigManifest = tuple.Item3;
deployBuilder.Config = ByteString.CopyFrom(tuple.Item4);
}

if (deploy.RouterConfig != NoRouter.Instance)
if (!deploy.RouterConfig.Equals(NoRouter.Instance))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed a ReSharper complaint here.

{
var tuple = Serialize(deploy.RouterConfig);
deployBuilder.RouterConfigSerializerId = tuple.Item1;
Expand All @@ -140,7 +141,8 @@ private Proto.Msg.DeployData DeployToProto(Deploy deploy)
private Deploy DeployFromProto(Proto.Msg.DeployData protoDeploy)
{
Config config;
if (protoDeploy.ConfigSerializerId > 0) // TODO: should be protoDeploy.Config != null. But it always not null
if (protoDeploy.ConfigSerializerId >
0) // TODO: should be protoDeploy.Config != null. But it always not null
{
config = system.Serialization.Deserialize(
protoDeploy.Config.ToByteArray(),
Expand All @@ -152,9 +154,10 @@ private Deploy DeployFromProto(Proto.Msg.DeployData protoDeploy)
config = Config.Empty;
}


RouterConfig routerConfig;
if (protoDeploy.RouterConfigSerializerId > 0) // TODO: should be protoDeploy.RouterConfig != null. But it always not null
if (protoDeploy.RouterConfigSerializerId >
0) // TODO: should be protoDeploy.RouterConfig != null. But it always not null
{
routerConfig = system.Serialization.Deserialize(
protoDeploy.RouterConfig.ToByteArray(),
Expand Down Expand Up @@ -183,18 +186,20 @@ private Deploy DeployFromProto(Proto.Msg.DeployData protoDeploy)
? protoDeploy.Dispatcher
: Deploy.NoDispatcherGiven;

return new Deploy(protoDeploy.Path, config, routerConfig, scope, dispatcher);
var stashCapacity = protoDeploy.StashCapacity > 0 ? protoDeploy.StashCapacity : Deploy.NoStashSize;

return stashCapacity == Deploy.NoStashSize
? new Deploy(protoDeploy.Path, config, routerConfig, scope, dispatcher)
: new Deploy(protoDeploy.Path, config, routerConfig, scope, dispatcher)
.WithStashCapacity(stashCapacity);
}

//
// IActorRef
//
private Proto.Msg.ActorRefData SerializeActorRef(IActorRef actorRef)
private static Proto.Msg.ActorRefData SerializeActorRef(IActorRef actorRef)
{
return new Proto.Msg.ActorRefData
{
Path = Akka.Serialization.Serialization.SerializedActorPath(actorRef)
};
return new Proto.Msg.ActorRefData { Path = Akka.Serialization.Serialization.SerializedActorPath(actorRef) };
}

private IActorRef DeserializeActorRef(Proto.Msg.ActorRefData actorRefData)
Expand All @@ -212,4 +217,4 @@ private IActorRef DeserializeActorRef(Proto.Msg.ActorRefData actorRefData)
return (serializer.Identifier, hasManifest, manifest, serializer.ToBinary(obj));
}
}
}
}
Loading