From 43a916865a749cd714165172f63252345b1435c0 Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Fri, 30 Dec 2022 09:44:07 +0100 Subject: [PATCH] Read stash capacity from actor's mailbox or dispatcher configuration --- .../CoreAPISpec.ApproveCore.Core.verified.txt | 19 +- ...oreAPISpec.ApproveCore.DotNet.verified.txt | 19 +- .../CoreAPISpec.ApproveCore.Net.verified.txt | 19 +- .../Actor/Stash/ActorWithBoundedStashSpec.cs | 210 ++++++++++++++++++ .../Actor/Stash/Internal/AbstractStash.cs | 166 ++++++++------ .../Actor/Stash/Internal/BoundedStashImpl.cs | 5 +- .../Stash/Internal/UnboundedStashImpl.cs | 2 +- src/core/Akka/Dispatch/Dispatchers.cs | 30 +++ src/core/Akka/Dispatch/Mailbox.cs | 35 ++- src/core/Akka/Dispatch/Mailboxes.cs | 90 +++++--- 10 files changed, 465 insertions(+), 130 deletions(-) create mode 100644 src/core/Akka.Tests/Actor/Stash/ActorWithBoundedStashSpec.cs diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt index 74f1fb4220f..82979c2d0d5 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt @@ -1933,13 +1933,13 @@ namespace Akka.Actor.Internal { public abstract class AbstractStash : Akka.Actor.IStash { - protected AbstractStash(Akka.Actor.IActorContext context, int capacity = 100) { } + protected AbstractStash(Akka.Actor.IActorContext context) { } public System.Collections.Generic.IEnumerable ClearStash() { } public void Prepend(System.Collections.Generic.IEnumerable envelopes) { } public void Stash() { } public void Unstash() { } public void UnstashAll() { } - public void UnstashAll(System.Func predicate) { } + public void UnstashAll(System.Func filterPredicate) { } } public class ActorSystemImpl : Akka.Actor.ExtendedActorSystem { @@ -1988,7 +1988,7 @@ namespace Akka.Actor.Internal } public class BoundedStashImpl : Akka.Actor.Internal.AbstractStash { - public BoundedStashImpl(Akka.Actor.IActorContext context, int capacity = 100) { } + public BoundedStashImpl(Akka.Actor.IActorContext context) { } } public class ChildNameReserved : Akka.Actor.Internal.IChildStats { @@ -2486,14 +2486,14 @@ namespace Akka.Dispatch public static void RunTask(System.Func asyncAction) { } protected override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) { } } - public sealed class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue + public class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox { public BoundedDequeBasedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { } public int Capacity { get; } public System.TimeSpan PushTimeout { get; } public override Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { } } - public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue + public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox { public BoundedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { } public int Capacity { get; } @@ -2549,6 +2549,8 @@ namespace Akka.Dispatch public Akka.Configuration.Config DefaultDispatcherConfig { get; } public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; } public Akka.Dispatch.IDispatcherPrerequisites Prerequisites { get; } + [Akka.Annotations.InternalApiAttribute()] + public static Akka.Configuration.Config GetConfig(Akka.Configuration.Config config, string id, int depth = 0) { } public bool HasDispatcher(string id) { } public Akka.Dispatch.MessageDispatcher Lookup(string dispatcherName) { } public bool RegisterConfigurator(string id, Akka.Dispatch.MessageDispatcherConfigurator configurator) { } @@ -2602,6 +2604,10 @@ namespace Akka.Dispatch public interface IMultipleConsumerSemantics : Akka.Dispatch.ISemantics { } public interface IProducesMessageQueue where TQueue : Akka.Dispatch.MessageQueues.IMessageQueue { } + public interface IProducesPushTimeoutSemanticsMailbox + { + System.TimeSpan PushTimeout { get; } + } public interface IRequiresMessageQueue where T : Akka.Dispatch.ISemantics { } public interface IRunnable @@ -2635,13 +2641,14 @@ namespace Akka.Dispatch public static readonly string NoMailboxRequirement; public Mailboxes(Akka.Actor.ActorSystem system) { } public Akka.Actor.DeadLetterMailbox DeadLetterMailbox { get; } - public System.Type FromConfig(string path) { } public Akka.Dispatch.MailboxType GetMailboxType(Akka.Actor.Props props, Akka.Configuration.Config dispatcherConfig) { } public System.Type GetRequiredType(System.Type actorType) { } public bool HasRequiredType(System.Type actorType) { } public Akka.Dispatch.MailboxType Lookup(string id) { } public Akka.Dispatch.MailboxType LookupByQueueType(System.Type queueType) { } public bool ProducesMessageQueue(System.Type mailboxType) { } + [Akka.Annotations.InternalApiAttribute()] + public int StashCapacity(string dispatcher, string mailbox) { } } public abstract class MessageDispatcher { diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 312e7e27bf7..61cd0b8994d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -1935,13 +1935,13 @@ namespace Akka.Actor.Internal { public abstract class AbstractStash : Akka.Actor.IStash { - protected AbstractStash(Akka.Actor.IActorContext context, int capacity = 100) { } + protected AbstractStash(Akka.Actor.IActorContext context) { } public System.Collections.Generic.IEnumerable ClearStash() { } public void Prepend(System.Collections.Generic.IEnumerable envelopes) { } public void Stash() { } public void Unstash() { } public void UnstashAll() { } - public void UnstashAll(System.Func predicate) { } + public void UnstashAll(System.Func filterPredicate) { } } public class ActorSystemImpl : Akka.Actor.ExtendedActorSystem { @@ -1990,7 +1990,7 @@ namespace Akka.Actor.Internal } public class BoundedStashImpl : Akka.Actor.Internal.AbstractStash { - public BoundedStashImpl(Akka.Actor.IActorContext context, int capacity = 100) { } + public BoundedStashImpl(Akka.Actor.IActorContext context) { } } public class ChildNameReserved : Akka.Actor.Internal.IChildStats { @@ -2490,14 +2490,14 @@ namespace Akka.Dispatch public static void RunTask(System.Func asyncAction) { } protected override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) { } } - public sealed class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue + public class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox { public BoundedDequeBasedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { } public int Capacity { get; } public System.TimeSpan PushTimeout { get; } public override Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { } } - public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue + public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox { public BoundedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { } public int Capacity { get; } @@ -2553,6 +2553,8 @@ namespace Akka.Dispatch public Akka.Configuration.Config DefaultDispatcherConfig { get; } public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; } public Akka.Dispatch.IDispatcherPrerequisites Prerequisites { get; } + [Akka.Annotations.InternalApiAttribute()] + public static Akka.Configuration.Config GetConfig(Akka.Configuration.Config config, string id, int depth = 0) { } public bool HasDispatcher(string id) { } public Akka.Dispatch.MessageDispatcher Lookup(string dispatcherName) { } public bool RegisterConfigurator(string id, Akka.Dispatch.MessageDispatcherConfigurator configurator) { } @@ -2606,6 +2608,10 @@ namespace Akka.Dispatch public interface IMultipleConsumerSemantics : Akka.Dispatch.ISemantics { } public interface IProducesMessageQueue where TQueue : Akka.Dispatch.MessageQueues.IMessageQueue { } + public interface IProducesPushTimeoutSemanticsMailbox + { + System.TimeSpan PushTimeout { get; } + } public interface IRequiresMessageQueue where T : Akka.Dispatch.ISemantics { } public interface IRunnable : System.Threading.IThreadPoolWorkItem @@ -2640,13 +2646,14 @@ namespace Akka.Dispatch public static readonly string NoMailboxRequirement; public Mailboxes(Akka.Actor.ActorSystem system) { } public Akka.Actor.DeadLetterMailbox DeadLetterMailbox { get; } - public System.Type FromConfig(string path) { } public Akka.Dispatch.MailboxType GetMailboxType(Akka.Actor.Props props, Akka.Configuration.Config dispatcherConfig) { } public System.Type GetRequiredType(System.Type actorType) { } public bool HasRequiredType(System.Type actorType) { } public Akka.Dispatch.MailboxType Lookup(string id) { } public Akka.Dispatch.MailboxType LookupByQueueType(System.Type queueType) { } public bool ProducesMessageQueue(System.Type mailboxType) { } + [Akka.Annotations.InternalApiAttribute()] + public int StashCapacity(string dispatcher, string mailbox) { } } public abstract class MessageDispatcher { diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 74f1fb4220f..82979c2d0d5 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -1933,13 +1933,13 @@ namespace Akka.Actor.Internal { public abstract class AbstractStash : Akka.Actor.IStash { - protected AbstractStash(Akka.Actor.IActorContext context, int capacity = 100) { } + protected AbstractStash(Akka.Actor.IActorContext context) { } public System.Collections.Generic.IEnumerable ClearStash() { } public void Prepend(System.Collections.Generic.IEnumerable envelopes) { } public void Stash() { } public void Unstash() { } public void UnstashAll() { } - public void UnstashAll(System.Func predicate) { } + public void UnstashAll(System.Func filterPredicate) { } } public class ActorSystemImpl : Akka.Actor.ExtendedActorSystem { @@ -1988,7 +1988,7 @@ namespace Akka.Actor.Internal } public class BoundedStashImpl : Akka.Actor.Internal.AbstractStash { - public BoundedStashImpl(Akka.Actor.IActorContext context, int capacity = 100) { } + public BoundedStashImpl(Akka.Actor.IActorContext context) { } } public class ChildNameReserved : Akka.Actor.Internal.IChildStats { @@ -2486,14 +2486,14 @@ namespace Akka.Dispatch public static void RunTask(System.Func asyncAction) { } protected override bool TryExecuteTaskInline(System.Threading.Tasks.Task task, bool taskWasPreviouslyQueued) { } } - public sealed class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue + public class BoundedDequeBasedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox { public BoundedDequeBasedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { } public int Capacity { get; } public System.TimeSpan PushTimeout { get; } public override Akka.Dispatch.MessageQueues.IMessageQueue Create(Akka.Actor.IActorRef owner, Akka.Actor.ActorSystem system) { } } - public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue + public sealed class BoundedMailbox : Akka.Dispatch.MailboxType, Akka.Dispatch.IProducesMessageQueue, Akka.Dispatch.IProducesPushTimeoutSemanticsMailbox { public BoundedMailbox(Akka.Actor.Settings settings, Akka.Configuration.Config config) { } public int Capacity { get; } @@ -2549,6 +2549,8 @@ namespace Akka.Dispatch public Akka.Configuration.Config DefaultDispatcherConfig { get; } public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; } public Akka.Dispatch.IDispatcherPrerequisites Prerequisites { get; } + [Akka.Annotations.InternalApiAttribute()] + public static Akka.Configuration.Config GetConfig(Akka.Configuration.Config config, string id, int depth = 0) { } public bool HasDispatcher(string id) { } public Akka.Dispatch.MessageDispatcher Lookup(string dispatcherName) { } public bool RegisterConfigurator(string id, Akka.Dispatch.MessageDispatcherConfigurator configurator) { } @@ -2602,6 +2604,10 @@ namespace Akka.Dispatch public interface IMultipleConsumerSemantics : Akka.Dispatch.ISemantics { } public interface IProducesMessageQueue where TQueue : Akka.Dispatch.MessageQueues.IMessageQueue { } + public interface IProducesPushTimeoutSemanticsMailbox + { + System.TimeSpan PushTimeout { get; } + } public interface IRequiresMessageQueue where T : Akka.Dispatch.ISemantics { } public interface IRunnable @@ -2635,13 +2641,14 @@ namespace Akka.Dispatch public static readonly string NoMailboxRequirement; public Mailboxes(Akka.Actor.ActorSystem system) { } public Akka.Actor.DeadLetterMailbox DeadLetterMailbox { get; } - public System.Type FromConfig(string path) { } public Akka.Dispatch.MailboxType GetMailboxType(Akka.Actor.Props props, Akka.Configuration.Config dispatcherConfig) { } public System.Type GetRequiredType(System.Type actorType) { } public bool HasRequiredType(System.Type actorType) { } public Akka.Dispatch.MailboxType Lookup(string id) { } public Akka.Dispatch.MailboxType LookupByQueueType(System.Type queueType) { } public bool ProducesMessageQueue(System.Type mailboxType) { } + [Akka.Annotations.InternalApiAttribute()] + public int StashCapacity(string dispatcher, string mailbox) { } } public abstract class MessageDispatcher { diff --git a/src/core/Akka.Tests/Actor/Stash/ActorWithBoundedStashSpec.cs b/src/core/Akka.Tests/Actor/Stash/ActorWithBoundedStashSpec.cs new file mode 100644 index 00000000000..acc678fda0a --- /dev/null +++ b/src/core/Akka.Tests/Actor/Stash/ActorWithBoundedStashSpec.cs @@ -0,0 +1,210 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Dispatch; +using Akka.Event; +using Akka.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Tests.Actor.Stash +{ + public class StashingActor : UntypedActor, IWithBoundedStash + { + public IStash Stash { get; set; } + + protected override void OnReceive(object message) + { + if (message is string s && s.StartsWith("hello")) + { + Stash.Stash(); + Sender.Tell("ok"); + } + else if (message.Equals("world")) + { + Context.Become(AfterWorldBehavior); + Stash.UnstashAll(); + } + } + + private void AfterWorldBehavior(object message) => Stash.Stash(); + } + + public class StashingActorWithOverflow : UntypedActor, IWithBoundedStash + { + private int numStashed = 0; + + public IStash Stash { get; set; } + + protected override void OnReceive(object message) + { + if (!(message is string s) || !s.StartsWith("hello")) + return; + + numStashed++; + try + { + Stash.Stash(); + Sender.Tell("ok"); + } + catch (Exception ex) when (ex is StashOverflowException) + { + if (numStashed == 21) + { + Sender.Tell("STASHOVERFLOW"); + Context.Stop(Self); + } + else + { + Sender.Tell("Unexpected StashOverflowException: " + numStashed); + } + } + } + } + + // bounded deque-based mailbox with capacity 10 + public class Bounded10 : BoundedDequeBasedMailbox + { + public Bounded10(Settings settings, Config config) + : base(settings, config) + { } + } + + public class Bounded100 : BoundedDequeBasedMailbox + { + public Bounded100(Settings settings, Config config) + : base(settings, config) + { } + } + + public class ActorWithBoundedStashSpec : AkkaSpec + { + private static Config SpecConfig => ConfigurationFactory.ParseString(@$" + akka.loggers = [""Akka.TestKit.TestEventListener, Akka.TestKit""] + my-dispatcher-1 {{ + mailbox-type = ""{typeof(Bounded10).AssemblyQualifiedName}"" + mailbox-capacity = 10 + mailbox-push-timeout-time = 500ms + stash-capacity = 20 + }} + my-dispatcher-2 {{ + mailbox-type = ""{typeof(Bounded100).AssemblyQualifiedName}"" + mailbox-capacity = 100 + mailbox-push-timeout-time = 500ms + stash-capacity = 20 + }} + my-aliased-dispatcher-1 = my-dispatcher-1 + my-aliased-dispatcher-2 = my-aliased-dispatcher-1 + my-mailbox-1 {{ + mailbox-type = ""{typeof(Bounded10).AssemblyQualifiedName}"" + mailbox-capacity = 10 + mailbox-push-timeout-time = 500ms + stash-capacity = 20 + }} + my-mailbox-2 {{ + mailbox-type = ""{typeof(Bounded100).AssemblyQualifiedName}"" + mailbox-capacity = 100 + mailbox-push-timeout-time = 500ms + stash-capacity = 20 + }}"); + + public ActorWithBoundedStashSpec(ITestOutputHelper outputHelper) + : base(outputHelper, SpecConfig) + { + Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter)); + } + + protected override void AtStartup() + { + base.AtStartup(); + Sys.EventStream.Publish(EventFilter.Warning(pattern: new Regex(".*Received dead letter from.*hello.*")).Mute()); + } + + protected override async Task AfterAllAsync() + { + await base.AfterAllAsync(); + Sys.EventStream.Unsubscribe(TestActor, typeof(DeadLetter)); + } + + private void TestDeadLetters(IActorRef stasher) + { + for (var n = 1; n <= 11; n++) + { + stasher.Tell("hello" + n); + ExpectMsg("ok"); + } + + // cause unstashAll with capacity violation + stasher.Tell("world"); + ExpectMsg().Equals(new DeadLetter("hello1", TestActor, stasher)); + + AwaitCondition(() => ExpectMsg() != null); + + stasher.Tell(PoisonPill.Instance); + // stashed messages are sent to deadletters when stasher is stopped + for (var n = 2; n <= 11; n++) + ExpectMsg().Equals(new DeadLetter("hello" + n, TestActor, stasher)); + } + + private void TestStashOverflowException(IActorRef stasher) + { + // fill up stash + for (var n = 1; n <= 20; n++) + { + stasher.Tell("hello" + n); + ExpectMsg("ok"); + } + + stasher.Tell("hello21"); + ExpectMsg("STASHOVERFLOW"); + + // stashed messages are sent to deadletters when stasher is stopped + for (var n = 1; n <= 20; n++) + ExpectMsg().Equals(new DeadLetter("hello" + n, TestActor, stasher)); + } + + [Fact(Skip = "DequeWrapperMessageQueue implementations are not actually double-ended queues, so this cannot currently work.")] + public void An_actor_with_stash_must_end_up_in_DeadLetters_in_case_of_a_capacity_violation_when_configure_via_dispatcher() + { + var stasher = Sys.ActorOf(Props.Create().WithDispatcher("my-dispatcher-1")); + TestDeadLetters(stasher); + } + + [Fact(Skip = "DequeWrapperMessageQueue implementations are not actually double-ended queues, so this cannot currently work.")] + public void An_actor_with_stash_must_end_up_in_DeadLetters_in_case_of_a_capacity_violation_when_configure_via_mailbox() + { + var stasher = Sys.ActorOf(Props.Create().WithDispatcher("my-mailbox-1")); + TestDeadLetters(stasher); + } + + [Fact] + public void An_actor_with_stash_must_throw_a_StashOverflowException_in_case_of_a_stash_capacity_violation_when_configured_via_dispatcher() + { + var stasher = Sys.ActorOf(Props.Create().WithDispatcher("my-dispatcher-2")); + TestStashOverflowException(stasher); + } + + [Fact] + public void An_actor_with_stash_must_throw_a_StashOverflowException_in_case_of_a_stash_capacity_violation_when_configured_via_mailbox() + { + var stasher = Sys.ActorOf(Props.Create().WithDispatcher("my-mailbox-2")); + TestStashOverflowException(stasher); + } + + [Fact] + public void An_actor_with_stash_must_get_stash_capacity_from_aliased_dispatchers() + { + var stasher = Sys.ActorOf(Props.Create().WithDispatcher("my-aliased-dispatcher-2")); + TestStashOverflowException(stasher); + } + } +} diff --git a/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs b/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs index 2cf63e87d10..c029120b0f6 100644 --- a/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs +++ b/src/core/Akka/Actor/Stash/Internal/AbstractStash.cs @@ -15,54 +15,63 @@ namespace Akka.Actor.Internal { - /// INTERNAL - /// Abstract base class for stash support - /// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk. + /// + /// INTERNAL API + /// + /// Support class for implementing a stash for an actor instance. A default stash per actor (= user stash) + /// is maintained by [[UnrestrictedStash]] by extending this trait. Actors that explicitly need other stashes + /// (optionally in addition to and isolated from the user stash) can create new stashes via . + /// /// public abstract class AbstractStash : IStash { - private LinkedList _theStash; + /// + /// The private stash of the actor. It is only accessible using and + /// . + /// + private LinkedList _theStash = new(); + private readonly ActorCell _actorCell; + + /// + /// The capacity of the stash. Configured in the actor's mailbox or dispatcher config. + /// private readonly int _capacity; - /// INTERNAL - /// Abstract base class for stash support - /// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk. + /// + /// The actor's deque-based message queue. + /// `mailbox.queue` is the underlying `Deque`. /// - /// TBD - /// TBD - /// This exception is thrown if the actor's mailbox isn't deque-based (e.g. ). - protected AbstractStash(IActorContext context, int capacity = 100) + private readonly IDequeBasedMessageQueueSemantics _mailbox; + + protected AbstractStash(IActorContext context) { var actorCell = (ActorCell)context; - Mailbox = actorCell.Mailbox.MessageQueue as IDequeBasedMessageQueueSemantics; - if(Mailbox == null) + _mailbox = actorCell.Mailbox.MessageQueue as IDequeBasedMessageQueueSemantics; + if (_mailbox == null) { - string message = $@"DequeBasedMailbox required, got: {actorCell.Mailbox.GetType().Name} -An (unbounded) deque-based mailbox can be configured as follows: - my-custom-mailbox {{ - mailbox-type = ""Akka.Dispatch.UnboundedDequeBasedMailbox"" - }}"; - throw new NotSupportedException(message); + var message = $"DequeBasedMailbox required, got: {actorCell.Mailbox.GetType().Name}\n" + + "An (unbounded) deque-based mailbox can be configured as follows:\n" + + "my-custom-mailbox {\n" + + " mailbox-type = \"Akka.Dispatch.UnboundedDequeBasedMailbox\"\n" + + "}\n"; + + throw new ActorInitializationException(actorCell.Self, message); } - _theStash = new LinkedList(); _actorCell = actorCell; - // TODO: capacity needs to come from dispatcher or mailbox config - // https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Stash.scala#L126 - _capacity = capacity; + // The capacity of the stash. Configured in the actor's mailbox or dispatcher config. + _capacity = context.System.Mailboxes.StashCapacity(context.Props.Dispatcher, context.Props.Mailbox); } - private IDequeBasedMessageQueueSemantics Mailbox { get; } - private int _currentEnvelopeId; /// - /// Stashes the current message in the actor's state. + /// Adds the current message (the message that the actor received last) to the actor's stash. /// /// This exception is thrown if we attempt to stash the same message more than once. /// - /// This exception is thrown in the event that we're using a for the and we've exceeded capacity. + /// This exception is thrown in the event that we're using a for the and we've exceeded capacity. /// public void Stash() { @@ -74,68 +83,90 @@ public void Stash() throw new IllegalActorStateException($"Can't stash the same message {currMsg} more than once"); } _currentEnvelopeId = _actorCell.CurrentEnvelopeId; - - if(_capacity <= 0 || _theStash.Count < _capacity) + + if (_capacity <= 0 || _theStash.Count < _capacity) _theStash.AddLast(new Envelope(currMsg, sender)); - else throw new StashOverflowException($"Couldn't enqueue message {currMsg} to stash of {_actorCell.Self}"); + else + throw new StashOverflowException($"Couldn't enqueue message {currMsg} from ${sender} to stash of {_actorCell.Self}"); } /// - /// Unstash the most recently stashed message (top of the message stack.) + /// Prepends the oldest message in the stash to the mailbox, and then removes that + /// message from the stash. + /// + /// Messages from the stash are enqueued to the mailbox until the capacity of the + /// mailbox (if any) has been reached. In case a bounded mailbox overflows, a + /// `MessageQueueAppendFailedException` is thrown. + /// + /// The unstashed message is guaranteed to be removed from the stash regardless + /// if the call successfully returns or throws an exception. /// public void Unstash() { - if(_theStash.Count > 0) + if (_theStash.Count <= 0) + return; + + try { - try - { - EnqueueFirst(_theStash.Head()); - } - finally - { - _theStash.RemoveFirst(); - } + EnqueueFirst(_theStash.Head()); + } + finally + { + _theStash.RemoveFirst(); } } /// - /// Unstash all of the s in the Stash. + /// Prepends all messages in the stash to the mailbox, and then clears the stash. + /// + /// Messages from the stash are enqueued to the mailbox until the capacity of the + /// mailbox(if any) has been reached. In case a bounded mailbox overflows, a + /// `MessageQueueAppendFailedException` is thrown. + /// + /// The stash is guaranteed to be empty after calling . /// - public void UnstashAll() - { - UnstashAll(envelope => true); - } + public void UnstashAll() => UnstashAll(_ => true); /// - /// Unstash all of the s in the Stash. + /// INTERNA API + /// + /// Prepends selected messages in the stash, applying `filterPredicate`, to the + /// mailbox, and then clears the stash. + /// + /// + /// Messages from the stash are enqueued to the mailbox until the capacity of the + /// mailbox(if any) has been reached. In case a bounded mailbox overflows, a + /// `MessageQueueAppendFailedException` is thrown. + /// + /// The stash is guaranteed to be empty after calling . /// - /// A predicate function to determine which messages to select. - public void UnstashAll(Func predicate) + /// Only stashed messages selected by this predicate are prepended to the mailbox. + public void UnstashAll(Func filterPredicate) { - if(_theStash.Count > 0) + if (_theStash.Count <= 0) + return; + + try { - try - { - foreach(var item in _theStash.Reverse().Where(predicate)) - { - EnqueueFirst(item); - } - } - finally - { - _theStash = new LinkedList(); - } + foreach (var item in _theStash.Reverse().Where(filterPredicate)) + EnqueueFirst(item); + } + finally + { + _theStash = new LinkedList(); } } - + /// - /// Eliminates the contents of the , and returns - /// the previous contents of the messages. + /// INTERNAL API. + /// + /// Clears the stash and and returns all envelopes that have not been unstashed. + /// /// /// Previously stashed messages. public IEnumerable ClearStash() { - if(_theStash.Count == 0) + if (_theStash.Count == 0) return Enumerable.Empty(); var stashed = _theStash; @@ -144,7 +175,8 @@ public IEnumerable ClearStash() } /// - /// TBD + /// Prepends `others` to this stash. This method is optimized for a large stash and + /// small `others`. /// /// TBD public void Prepend(IEnumerable envelopes) @@ -164,8 +196,8 @@ public void Prepend(IEnumerable envelopes) /// private void EnqueueFirst(Envelope msg) { - Mailbox.EnqueueFirst(msg); - if(msg.Message is Terminated terminatedMessage) + _mailbox.EnqueueFirst(msg); + if (msg.Message is Terminated terminatedMessage) { _actorCell.TerminatedQueuedFor(terminatedMessage.ActorRef, Option.None); } diff --git a/src/core/Akka/Actor/Stash/Internal/BoundedStashImpl.cs b/src/core/Akka/Actor/Stash/Internal/BoundedStashImpl.cs index 4bcfd0b613b..b28b8bfd3ef 100644 --- a/src/core/Akka/Actor/Stash/Internal/BoundedStashImpl.cs +++ b/src/core/Akka/Actor/Stash/Internal/BoundedStashImpl.cs @@ -18,9 +18,8 @@ public class BoundedStashImpl : AbstractStash /// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk. /// /// TBD - /// TBD - public BoundedStashImpl(IActorContext context, int capacity = 100) - : base(context, capacity) + public BoundedStashImpl(IActorContext context) + : base(context) { } } diff --git a/src/core/Akka/Actor/Stash/Internal/UnboundedStashImpl.cs b/src/core/Akka/Actor/Stash/Internal/UnboundedStashImpl.cs index f45e6f4668d..aa88507b5d1 100644 --- a/src/core/Akka/Actor/Stash/Internal/UnboundedStashImpl.cs +++ b/src/core/Akka/Actor/Stash/Internal/UnboundedStashImpl.cs @@ -19,7 +19,7 @@ public class UnboundedStashImpl : AbstractStash /// /// TBD public UnboundedStashImpl(IActorContext context) - : base(context, int.MaxValue) + : base(context) { } } diff --git a/src/core/Akka/Dispatch/Dispatchers.cs b/src/core/Akka/Dispatch/Dispatchers.cs index fd2bb557cc6..da173edb3e5 100644 --- a/src/core/Akka/Dispatch/Dispatchers.cs +++ b/src/core/Akka/Dispatch/Dispatchers.cs @@ -11,6 +11,7 @@ using System.Threading; using System.Threading.Tasks; using Akka.Actor; +using Akka.Annotations; using Akka.Configuration; using Akka.Event; using Helios.Concurrency; @@ -336,6 +337,35 @@ public sealed class Dispatchers private readonly ConcurrentDictionary _dispatcherConfigurators = new ConcurrentDictionary(); + /// + /// Get (possibly aliased) dispatcher config. Returns empty config if not found. + /// + [InternalApi] + public static Config GetConfig(Config config, string id, int depth = 0) + { + if (depth > MaxDispatcherAliasDepth) + { + // Didn't find dispatcher config after `MaxDispatcherAliasDepth` aliases + return ConfigurationFactory.Empty; + } + else if (config.HasPath(id)) + { + var hocon = config.GetValue(id); + if (hocon.IsObject()) + return config.GetConfig(id); + if (hocon.IsString()) + return GetConfig(config, config.GetString(id), depth + 1); + + // Expected either config or alias at `id` but found `unexpected` + return ConfigurationFactory.Empty; + } + else + { + // Dispatcher `id` not configured + return ConfigurationFactory.Empty; + } + } + /// Initializes a new instance of the class. /// The system. /// The prerequisites required for some instances. diff --git a/src/core/Akka/Dispatch/Mailbox.cs b/src/core/Akka/Dispatch/Mailbox.cs index 07412b0ef88..ba49666282f 100644 --- a/src/core/Akka/Dispatch/Mailbox.cs +++ b/src/core/Akka/Dispatch/Mailbox.cs @@ -631,6 +631,18 @@ public interface IProducesMessageQueue where TQueue : IMessageQueue { } + /// + /// INTERNAL API + /// + /// Used to determine mailbox factories which create + /// mailboxes, and thus should be validated that the `pushTimeOut` is greater than 0. + /// + /// + public interface IProducesPushTimeoutSemanticsMailbox + { + TimeSpan PushTimeout { get; } + } + /// /// UnboundedMailbox is the default used by Akka.NET Actors /// @@ -659,7 +671,9 @@ public UnboundedMailbox(Settings settings, Config config) : base(settings, confi /// /// The default bounded mailbox implementation /// - public sealed class BoundedMailbox : MailboxType, IProducesMessageQueue + public sealed class BoundedMailbox : MailboxType, + IProducesMessageQueue, + IProducesPushTimeoutSemanticsMailbox { /// /// The capacity of this mailbox. @@ -676,7 +690,8 @@ public sealed class BoundedMailbox : MailboxType, IProducesMessageQueue /// or the 'mailbox-push-timeout-time' in is negative. /// - public BoundedMailbox(Settings settings, Config config) : base(settings, config) + public BoundedMailbox(Settings settings, Config config) + : base(settings, config) { if (config.IsNullOrEmpty()) throw ConfigurationException.NullOrEmptyConfig(); @@ -687,15 +702,12 @@ public BoundedMailbox(Settings settings, Config config) : base(settings, config) if (Capacity < 0) throw new ArgumentException("The capacity for BoundedMailbox cannot be negative", nameof(config)); if (PushTimeout.TotalSeconds < 0) - throw new ArgumentException("The push time-out for BoundedMailbox cannot be be negative", - nameof(config)); + throw new ArgumentException("The push time-out for BoundedMailbox cannot be be negative", nameof(config)); } /// - public override IMessageQueue Create(IActorRef owner, ActorSystem system) - { - return new BoundedMessageQueue(Capacity, PushTimeout); - } + public override IMessageQueue Create(IActorRef owner, ActorSystem system) => + new BoundedMessageQueue(Capacity, PushTimeout); } /// @@ -797,7 +809,9 @@ public override IMessageQueue Create(IActorRef owner, ActorSystem system) /// /// BoundedDequeBasedMailbox is an bounded backed by a double-ended queue. Used for stashing. /// - public sealed class BoundedDequeBasedMailbox : MailboxType, IProducesMessageQueue + public class BoundedDequeBasedMailbox : MailboxType, + IProducesMessageQueue, + IProducesPushTimeoutSemanticsMailbox { /// /// The capacity of this mailbox. @@ -815,7 +829,8 @@ public sealed class BoundedDequeBasedMailbox : MailboxType, IProducesMessageQueu /// This exception is thrown if the 'mailbox-capacity' in /// or the 'mailbox-push-timeout-time' in is negative. /// - public BoundedDequeBasedMailbox(Settings settings, Config config) : base(settings, config) + public BoundedDequeBasedMailbox(Settings settings, Config config) + : base(settings, config) { if (config.IsNullOrEmpty()) throw ConfigurationException.NullOrEmptyConfig(); diff --git a/src/core/Akka/Dispatch/Mailboxes.cs b/src/core/Akka/Dispatch/Mailboxes.cs index 5fd2cd33b1d..3a047c07b9c 100644 --- a/src/core/Akka/Dispatch/Mailboxes.cs +++ b/src/core/Akka/Dispatch/Mailboxes.cs @@ -8,13 +8,16 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using System.Reflection; using Akka.Actor; +using Akka.Annotations; using Akka.Configuration; -using Akka.Util.Reflection; using Akka.Dispatch.MessageQueues; using Akka.Event; +using Akka.Util; +using Akka.Util.Internal; namespace Akka.Dispatch { @@ -70,6 +73,7 @@ public Mailboxes(ActorSystem system) } _defaultMailboxConfig = Settings.Config.GetConfig(DefaultMailboxId); + _defaultStashCapacity = StashCapacityFromConfig(Dispatchers.DefaultDispatcherId, DefaultMailboxId); } /// @@ -169,21 +173,32 @@ private MailboxType LookupConfigurator(string id) var mailboxTypeName = conf.GetString("mailbox-type", null); if (string.IsNullOrEmpty(mailboxTypeName)) throw new ConfigurationException($"The setting mailbox-type defined in [{id}] is empty"); - var type = Type.GetType(mailboxTypeName); - if (type == null) - throw new ConfigurationException($"Found mailbox-type [{mailboxTypeName}] in configuration for [{id}], but could not find that type in any loaded assemblies."); - var args = new object[] {Settings, conf}; + var mailboxType = Type.GetType(mailboxTypeName) + ?? throw new ConfigurationException($"Found mailbox-type [{mailboxTypeName}] in configuration for [{id}], but could not find that type in any loaded assemblies."); + var args = new object[] { Settings, conf }; try { - configurator = (MailboxType) Activator.CreateInstance(type, args); + configurator = (MailboxType)Activator.CreateInstance(mailboxType, args); + + if (!_mailboxNonZeroPushTimeoutWarningIssued) + { + if (configurator is IProducesPushTimeoutSemanticsMailbox m && m.PushTimeout.Ticks > 0L) + { + Warn($"Configured potentially-blocking mailbox [{id}] configured with non-zero PushTimeOut ({m.PushTimeout}), " + + "which can lead to blocking behavior when sending messages to this mailbox. " + + $"Avoid this by setting `{id}.mailbox-push-timeout-time` to `0`."); + + _mailboxNonZeroPushTimeoutWarningIssued = true; + } + + // good; nothing to see here, move along, sir. + } } catch (Exception ex) { - throw new ArgumentException($"Cannot instantiate MailboxType {type}, defined in [{id}]. Make sure it has a public " + + throw new ArgumentException($"Cannot instantiate MailboxType {mailboxType}, defined in [{id}]. Make sure it has a public " + "constructor with [Akka.Actor.Settings, Akka.Configuration.Config] parameters", ex); } - - // TODO: check for blocking mailbox with a non-zero pushtimeout and issue a warning } // add the new configurator to the mapping, or keep the existing if it was already added @@ -311,39 +326,52 @@ MailboxType VerifyRequirements(MailboxType mailboxType) return VerifyRequirements(Lookup(DefaultMailboxId)); } + private void Warn(string msg) => + _system.EventStream.Publish(new Warning("mailboxes", GetType(), msg)); + + private readonly AtomicReference> _stashCapacityCache = + new(ImmutableDictionary.Empty); + + private readonly int _defaultStashCapacity; + /// - /// Creates a mailbox from a configuration path. + /// INTERNAL API + /// + /// The capacity of the stash. Configured in the actor's mailbox or dispatcher config. + /// /// - /// The path. - /// Mailbox. - public Type FromConfig(string path) + [InternalApi] + public int StashCapacity(string dispatcher, string mailbox) { - //TODO: this should not exist, its a temp hack because we are not serializing mailbox info when doing remote deploy.. - if (string.IsNullOrEmpty(path)) + bool UpdateCache(ImmutableDictionary cache, string key, int value) { - return typeof (UnboundedMailbox); + return _stashCapacityCache.CompareAndSet(cache, cache.SetItem(key, value)) || + UpdateCache(_stashCapacityCache.Value, key, value); // recursive, try again } - var config = _system.Settings.Config.GetConfig(path); - if (config.IsNullOrEmpty()) - throw new ConfigurationException($"Cannot retrieve mailbox type from config: {path} configuration node not found"); + if (dispatcher == Dispatchers.DefaultDispatcherId && mailbox == DefaultMailboxId) + return _defaultStashCapacity; - var type = config.GetString("mailbox-type", null); + var cache = _stashCapacityCache.Value; + var key = $"{dispatcher}-{mailbox}"; - var mailboxType = TypeCache.GetType(type); - return mailboxType; - /* -mailbox-capacity = 1000 -mailbox-push-timeout-time = 10s -stash-capacity = -1 - */ - } + if (!cache.TryGetValue(key, out var value)) + { + value = StashCapacityFromConfig(dispatcher, mailbox); + UpdateCache(cache, key, value); + } - //TODO: stash capacity + return value; + } - private void Warn(string msg) + private int StashCapacityFromConfig(string dispatcher, string mailbox) { - _system.EventStream.Publish(new Warning("mailboxes", GetType(), msg)); + var disp = Dispatchers.GetConfig(Settings.Config, dispatcher); + var fallback = disp.WithFallback(Settings.Config.GetConfig(DefaultMailboxId)); + var config = mailbox == DefaultMailboxId + ? fallback + : Settings.Config.GetConfig(mailbox).WithFallback(fallback); + return config.GetInt("stash-capacity"); } } }