Skip to content

Commit

Permalink
Akka.Actor: bounded IStash programmatic configuration (#6661)
Browse files Browse the repository at this point in the history
* close #6658

Add APIs to track content of Stash

* added basic stashing test cases

* updated API approvals

* Added `StashSize` to `Deploy`

* adding support for bounded stash sizes

* API approval

* adding `Props` and `ActorOf` support for configuring stash size

* added API approvals

* ensured that `IWithUnboundedStash` can't be accidentally made bounded via HOCON or config

* added `stashSize` to protobuf wire format

needed for remote deployments

* rename `StashSize` to `StashCapacity`

* renamed wire format from stashSize to stashCapacity

* added serialization support for `StashCapacity

* API approvals

* added documentation

* added bounded stashing api docs to untyped actor page

* added comment explaining backwards compat handling for StashCapacity

* fixed `DeploySurrogate` API

* fix `Deployer`

* fix `AbstractStash`

* fixed docs
  • Loading branch information
Aaronontheweb authored Apr 24, 2023
1 parent f79c26d commit 1877a0e
Show file tree
Hide file tree
Showing 18 changed files with 405 additions and 132 deletions.
32 changes: 32 additions & 0 deletions docs/articles/actors/receive-actor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,38 @@ The result of this is that when an actor is restarted, any stashed messages will
> [!NOTE]
> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `IWithUnboundedStash` interface instead.

### Bounded Stashes

In certain scenarios, it might be helpful to put a limit on the size of the `IStash` inside your actor. You can configure a bounded stash via the following actor definition:

```csharp
public class StashingActorWithOverflow : UntypedActor, IWithStash
```

The `IWithStash` interface will default to *unbounded* stash behavior, but the the `Props` class or via `akka.actor.deployment` we can easily configure this actor to impose a limit on its stash capacity:

```csharp
// create an actor with a stash size of 2
IActorRef stasher = Sys.ActorOf(Props.Create<StashingActorWithOverflow>().WithStashCapacity(2));
```

Or via HOCON:

```hocon
akka.actor.deployment{{
/configStashingActor {{
stash-capacity = 2
}}
}}
```

Either of these settings will configure the `IStash` to only have a maximum capacity of 2 items. If a third item is attempted to be stashed the `IStash` will throw a `StashOverflowException`.

> [!TIP]
> You can always check to see if your `IStash` is approaching its capacity by checking the `IStash.IsFull`, `IStash.Capacity`, or `IStash.Count` properties.

If you attempt to apply a maximum stash capacity to an `IWithUnboundedStash` actor then the setting will be ignored.

## Killing an Actor

You can kill an actor by sending a `Kill` message. This will cause the actor to throw a `ActorKilledException`, triggering a failure. The actor will suspend operation and its supervisor will be asked how to handle the failure, which may mean resuming the actor, restarting it or terminating it completely. See [What Supervision Means](xref:supervision#what-supervision-means) for more information.
Expand Down
32 changes: 32 additions & 0 deletions docs/articles/actors/untyped-actor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,38 @@ Note that the stash is part of the ephemeral actor state, unlike the mailbox. Th
> [!NOTE]
> If you want to enforce that your actor can only work with an unbounded stash, then you should use the `UntypedActorWithUnboundedStash` class instead.
### Bounded Stashes

In certain scenarios, it might be helpful to put a limit on the size of the `IStash` inside your actor. You can configure a bounded stash via the following actor definition:

```csharp
public class StashingActorWithOverflow : UntypedActor, IWithStash
```

The `IWithStash` interface will default to *unbounded* stash behavior, but the the `Props` class or via `akka.actor.deployment` we can easily configure this actor to impose a limit on its stash capacity:

```csharp
// create an actor with a stash size of 2
IActorRef stasher = Sys.ActorOf(Props.Create<StashingActorWithOverflow>().WithStashCapacity(2));
```

Or via HOCON:

```hocon
akka.actor.deployment{{
/configStashingActor {{
stash-capacity = 2
}}
}}
```

Either of these settings will configure the `IStash` to only have a maximum capacity of 2 items. If a third item is attempted to be stashed the `IStash` will throw a `StashOverflowException`.

> [!TIP]
> You can always check to see if your `IStash` is approaching its capacity by checking the `IStash.IsFull`, `IStash.Capacity`, or `IStash.Count` properties.
If you attempt to apply a maximum stash capacity to an `IWithUnboundedStash` actor then the setting will be ignored.

## Killing an Actor

You can kill an actor by sending a `Kill` message. This will cause the actor to throw a `ActorKilledException`, triggering a failure. The actor will suspend operation and its supervisor will be asked how to handle the failure, which may mean resuming the actor, restarting it or terminating it completely. See [What Supervision Means](xref:supervision#what-supervision-means) for more information.
Expand Down
4 changes: 2 additions & 2 deletions src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<Copyright>Copyright © 2013-2023 Akka.NET Team</Copyright>
<Authors>Akka.NET Team</Authors>
<VersionPrefix>1.5.3</VersionPrefix>
<VersionPrefix>1.5.4</VersionPrefix>
<PackageIcon>akkalogo.png</PackageIcon>
<PackageProjectUrl>https://github.com/akkadotnet/akka.net</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/akkadotnet/akka.net/blob/master/LICENSE</PackageLicenseUrl>
Expand Down Expand Up @@ -39,7 +39,7 @@
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
</PropertyGroup>
<PropertyGroup>
<PackageReleaseNotes>Placeholder for nightly builds*</PackageReleaseNotes>
<PackageReleaseNotes>placeholder for nightlies*</PackageReleaseNotes>
</PropertyGroup>
<!-- SourceLink support for all Akka.NET projects -->
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ namespace Akka.Actor
public static readonly string NoDispatcherGiven;
public static readonly string NoMailboxGiven;
public static readonly Akka.Actor.Scope NoScopeGiven;
public const int NoStashSize = -1;
public static readonly Akka.Actor.Deploy None;
public Deploy() { }
public Deploy(string path, Akka.Actor.Scope scope) { }
Expand All @@ -629,19 +630,22 @@ namespace Akka.Actor
public Deploy(Akka.Routing.RouterConfig routerConfig) { }
public Deploy(string path, Akka.Configuration.Config config, Akka.Routing.RouterConfig routerConfig, Akka.Actor.Scope scope, string dispatcher) { }
public Deploy(string path, Akka.Configuration.Config config, Akka.Routing.RouterConfig routerConfig, Akka.Actor.Scope scope, string dispatcher, string mailbox) { }
public Deploy(string path, Akka.Configuration.Config config, Akka.Routing.RouterConfig routerConfig, Akka.Actor.Scope scope, string dispatcher, string mailbox, int stashCapacity) { }
public Akka.Configuration.Config Config { get; }
public string Dispatcher { get; }
public string Mailbox { get; }
public string Path { get; }
public Akka.Routing.RouterConfig RouterConfig { get; }
public Akka.Actor.Scope Scope { get; }
public int StashCapacity { get; }
public bool Equals(Akka.Actor.Deploy other) { }
public Akka.Util.ISurrogate ToSurrogate(Akka.Actor.ActorSystem system) { }
public virtual Akka.Actor.Deploy WithDispatcher(string dispatcher) { }
public virtual Akka.Actor.Deploy WithFallback(Akka.Actor.Deploy other) { }
public virtual Akka.Actor.Deploy WithMailbox(string mailbox) { }
public virtual Akka.Actor.Deploy WithRouterConfig(Akka.Routing.RouterConfig routerConfig) { }
public virtual Akka.Actor.Deploy WithScope(Akka.Actor.Scope scope) { }
public virtual Akka.Actor.Deploy WithStashCapacity(int stashSize) { }
public class DeploySurrogate : Akka.Util.ISurrogate
{
public DeploySurrogate() { }
Expand All @@ -651,6 +655,7 @@ namespace Akka.Actor
public string Path { get; set; }
public Akka.Routing.RouterConfig RouterConfig { get; set; }
public Akka.Actor.Scope Scope { get; set; }
public int StashCapacity { get; set; }
public Akka.Util.ISurrogated FromSurrogate(Akka.Actor.ActorSystem system) { }
}
}
Expand Down Expand Up @@ -1474,6 +1479,7 @@ namespace Akka.Actor
public Akka.Actor.Props WithDispatcher(string dispatcher) { }
public Akka.Actor.Props WithMailbox(string mailbox) { }
public Akka.Actor.Props WithRouter(Akka.Routing.RouterConfig routerConfig) { }
public Akka.Actor.Props WithStashCapacity(int stashCapacity) { }
public Akka.Actor.Props WithSupervisorStrategy(Akka.Actor.SupervisorStrategy supervisorStrategy) { }
public class PropsSurrogate : Akka.Util.ISurrogate
{
Expand Down Expand Up @@ -1938,7 +1944,7 @@ namespace Akka.Actor.Internal
public abstract class AbstractStash : Akka.Actor.IStash
{
protected AbstractStash(Akka.Actor.IActorContext context) { }
public int Capacity { get; }
public virtual int Capacity { get; }
public int Count { get; }
public bool IsEmpty { get; }
public bool IsFull { get; }
Expand Down Expand Up @@ -2144,6 +2150,7 @@ namespace Akka.Actor.Internal
public class UnboundedStashImpl : Akka.Actor.Internal.AbstractStash
{
public UnboundedStashImpl(Akka.Actor.IActorContext context) { }
public override int Capacity { get; }
}
}
namespace Akka.Actor.Setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,7 @@ namespace Akka.Actor
public static readonly string NoDispatcherGiven;
public static readonly string NoMailboxGiven;
public static readonly Akka.Actor.Scope NoScopeGiven;
public const int NoStashSize = -1;
public static readonly Akka.Actor.Deploy None;
public Deploy() { }
public Deploy(string path, Akka.Actor.Scope scope) { }
Expand All @@ -629,19 +630,22 @@ namespace Akka.Actor
public Deploy(Akka.Routing.RouterConfig routerConfig) { }
public Deploy(string path, Akka.Configuration.Config config, Akka.Routing.RouterConfig routerConfig, Akka.Actor.Scope scope, string dispatcher) { }
public Deploy(string path, Akka.Configuration.Config config, Akka.Routing.RouterConfig routerConfig, Akka.Actor.Scope scope, string dispatcher, string mailbox) { }
public Deploy(string path, Akka.Configuration.Config config, Akka.Routing.RouterConfig routerConfig, Akka.Actor.Scope scope, string dispatcher, string mailbox, int stashCapacity) { }
public Akka.Configuration.Config Config { get; }
public string Dispatcher { get; }
public string Mailbox { get; }
public string Path { get; }
public Akka.Routing.RouterConfig RouterConfig { get; }
public Akka.Actor.Scope Scope { get; }
public int StashCapacity { get; }
public bool Equals(Akka.Actor.Deploy other) { }
public Akka.Util.ISurrogate ToSurrogate(Akka.Actor.ActorSystem system) { }
public virtual Akka.Actor.Deploy WithDispatcher(string dispatcher) { }
public virtual Akka.Actor.Deploy WithFallback(Akka.Actor.Deploy other) { }
public virtual Akka.Actor.Deploy WithMailbox(string mailbox) { }
public virtual Akka.Actor.Deploy WithRouterConfig(Akka.Routing.RouterConfig routerConfig) { }
public virtual Akka.Actor.Deploy WithScope(Akka.Actor.Scope scope) { }
public virtual Akka.Actor.Deploy WithStashCapacity(int stashSize) { }
public class DeploySurrogate : Akka.Util.ISurrogate
{
public DeploySurrogate() { }
Expand All @@ -651,6 +655,7 @@ namespace Akka.Actor
public string Path { get; set; }
public Akka.Routing.RouterConfig RouterConfig { get; set; }
public Akka.Actor.Scope Scope { get; set; }
public int StashCapacity { get; set; }
public Akka.Util.ISurrogated FromSurrogate(Akka.Actor.ActorSystem system) { }
}
}
Expand Down Expand Up @@ -1472,6 +1477,7 @@ namespace Akka.Actor
public Akka.Actor.Props WithDispatcher(string dispatcher) { }
public Akka.Actor.Props WithMailbox(string mailbox) { }
public Akka.Actor.Props WithRouter(Akka.Routing.RouterConfig routerConfig) { }
public Akka.Actor.Props WithStashCapacity(int stashCapacity) { }
public Akka.Actor.Props WithSupervisorStrategy(Akka.Actor.SupervisorStrategy supervisorStrategy) { }
public class PropsSurrogate : Akka.Util.ISurrogate
{
Expand Down Expand Up @@ -1936,7 +1942,7 @@ namespace Akka.Actor.Internal
public abstract class AbstractStash : Akka.Actor.IStash
{
protected AbstractStash(Akka.Actor.IActorContext context) { }
public int Capacity { get; }
public virtual int Capacity { get; }
public int Count { get; }
public bool IsEmpty { get; }
public bool IsFull { get; }
Expand Down Expand Up @@ -2142,6 +2148,7 @@ namespace Akka.Actor.Internal
public class UnboundedStashImpl : Akka.Actor.Internal.AbstractStash
{
public UnboundedStashImpl(Akka.Actor.IActorContext context) { }
public override int Capacity { get; }
}
}
namespace Akka.Actor.Setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void Serialization_must_serialize_and_deserialize_DaemonMsgCreate_with_De
ConfigurationFactory.ParseString("a=1"),
new RoundRobinPool(5, null, supervisorStrategy, null),
new RemoteScope(new Address("akka", "Test", "host1", 1921)),
"mydispatcher");
"mydispatcher").WithStashCapacity(10);
var deploy2 = new Deploy("path2",
ConfigurationFactory.ParseString("a=2"),
FromConfig.Instance,
Expand Down
33 changes: 20 additions & 13 deletions src/core/Akka.Remote/Serialization/DaemonMsgCreateSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public override byte[] ToBinary(object obj)
return message.ToByteArray();
}

throw new ArgumentException($"Can't serialize a non-DaemonMsgCreate message using DaemonMsgCreateSerializer [{obj.GetType()}]");
throw new ArgumentException(
$"Can't serialize a non-DaemonMsgCreate message using DaemonMsgCreateSerializer [{obj.GetType()}]");
}

/// <inheritdoc />
Expand Down Expand Up @@ -105,15 +106,15 @@ private Proto.Msg.DeployData DeployToProto(Deploy deploy)
{
var deployBuilder = new Proto.Msg.DeployData();
deployBuilder.Path = deploy.Path;

deployBuilder.StashCapacity = deploy.StashCapacity;
{
var tuple = Serialize(deploy.Config);
deployBuilder.ConfigSerializerId = tuple.Item1;
deployBuilder.ConfigManifest = tuple.Item3;
deployBuilder.Config = ByteString.CopyFrom(tuple.Item4);
}

if (deploy.RouterConfig != NoRouter.Instance)
if (!deploy.RouterConfig.Equals(NoRouter.Instance))
{
var tuple = Serialize(deploy.RouterConfig);
deployBuilder.RouterConfigSerializerId = tuple.Item1;
Expand All @@ -140,7 +141,8 @@ private Proto.Msg.DeployData DeployToProto(Deploy deploy)
private Deploy DeployFromProto(Proto.Msg.DeployData protoDeploy)
{
Config config;
if (protoDeploy.ConfigSerializerId > 0) // TODO: should be protoDeploy.Config != null. But it always not null
if (protoDeploy.ConfigSerializerId >
0) // TODO: should be protoDeploy.Config != null. But it always not null
{
config = system.Serialization.Deserialize(
protoDeploy.Config.ToByteArray(),
Expand All @@ -152,9 +154,10 @@ private Deploy DeployFromProto(Proto.Msg.DeployData protoDeploy)
config = Config.Empty;
}


RouterConfig routerConfig;
if (protoDeploy.RouterConfigSerializerId > 0) // TODO: should be protoDeploy.RouterConfig != null. But it always not null
if (protoDeploy.RouterConfigSerializerId >
0) // TODO: should be protoDeploy.RouterConfig != null. But it always not null
{
routerConfig = system.Serialization.Deserialize(
protoDeploy.RouterConfig.ToByteArray(),
Expand Down Expand Up @@ -183,18 +186,22 @@ private Deploy DeployFromProto(Proto.Msg.DeployData protoDeploy)
? protoDeploy.Dispatcher
: Deploy.NoDispatcherGiven;

return new Deploy(protoDeploy.Path, config, routerConfig, scope, dispatcher);
// automatically covers backwards compat scenarios too - if the stash capacity is 0 (Protobuf default)
// it'll be set to -1 here, which is consistent with how defaults prior to v1.5.4 were handled.
var stashCapacity = protoDeploy.StashCapacity > 0 ? protoDeploy.StashCapacity : Deploy.NoStashSize;

return stashCapacity == Deploy.NoStashSize
? new Deploy(protoDeploy.Path, config, routerConfig, scope, dispatcher)
: new Deploy(protoDeploy.Path, config, routerConfig, scope, dispatcher)
.WithStashCapacity(stashCapacity);
}

//
// IActorRef
//
private Proto.Msg.ActorRefData SerializeActorRef(IActorRef actorRef)
private static Proto.Msg.ActorRefData SerializeActorRef(IActorRef actorRef)
{
return new Proto.Msg.ActorRefData
{
Path = Akka.Serialization.Serialization.SerializedActorPath(actorRef)
};
return new Proto.Msg.ActorRefData { Path = Akka.Serialization.Serialization.SerializedActorPath(actorRef) };
}

private IActorRef DeserializeActorRef(Proto.Msg.ActorRefData actorRefData)
Expand All @@ -212,4 +219,4 @@ private IActorRef DeserializeActorRef(Proto.Msg.ActorRefData actorRefData)
return (serializer.Identifier, hasManifest, manifest, serializer.ToBinary(obj));
}
}
}
}
Loading

0 comments on commit 1877a0e

Please sign in to comment.