From 48a704cfe1914d84b9e01eab86f0a746e76e769c Mon Sep 17 00:00:00 2001 From: Andreas Dirnberger Date: Tue, 7 Sep 2021 21:11:44 +0200 Subject: [PATCH] Ask should push unhandled answers into deadletter 2 (#5259) * Ask should push unhandled answers into deadletter * update future handler * fix unit test * remove not needed return * remove redundant sync lock * remove redudant code and seal class * update api spec * update api spec 2 * handle of Status.Failure * ask should fail on system messages --- .../CoreAPISpec.ApproveCore.approved.txt | 5 +- src/core/Akka.Tests/Actor/AskSpec.cs | 82 +++++++++++++ src/core/Akka/Actor/ActorRef.cs | 112 +++++++----------- src/core/Akka/Actor/Futures.cs | 9 +- 4 files changed, 134 insertions(+), 74 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index ef20cd29aba..58979a4589c 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -862,12 +862,11 @@ namespace Akka.Actor public Failures() { } public System.Collections.Generic.List Entries { get; } } - public class FutureActorRef : Akka.Actor.MinimalActorRef + public sealed class FutureActorRef : Akka.Actor.MinimalActorRef { - public FutureActorRef(System.Threading.Tasks.TaskCompletionSource result, System.Action unregister, Akka.Actor.ActorPath path) { } + public FutureActorRef(System.Threading.Tasks.TaskCompletionSource result, Akka.Actor.ActorPath path, Akka.Actor.IActorRefProvider provider) { } public override Akka.Actor.ActorPath Path { get; } public override Akka.Actor.IActorRefProvider Provider { get; } - public override void SendSystemMessage(Akka.Dispatch.SysMsg.ISystemMessage message) { } protected override void TellInternal(object message, Akka.Actor.IActorRef sender) { } } public class static Futures diff --git a/src/core/Akka.Tests/Actor/AskSpec.cs b/src/core/Akka.Tests/Actor/AskSpec.cs index c7ec057a2fb..0e0fca26f47 100644 --- a/src/core/Akka.Tests/Actor/AskSpec.cs +++ b/src/core/Akka.Tests/Actor/AskSpec.cs @@ -15,6 +15,7 @@ using Akka.Util.Internal; using FluentAssertions; using Nito.AsyncEx; +using Akka.Dispatch.SysMsg; namespace Akka.Tests.Actor { @@ -37,6 +38,29 @@ protected override void OnReceive(object message) { Sender.Tell("answer"); } + + if (message.Equals("delay")) + { + Thread.Sleep(3000); + Sender.Tell("answer"); + } + + if (message.Equals("many")) + { + Sender.Tell("answer1"); + Sender.Tell("answer2"); + Sender.Tell("answer2"); + } + + if (message.Equals("invalid")) + { + Sender.Tell(123); + } + + if (message.Equals("system")) + { + Sender.Tell(new DummySystemMessage()); + } } } @@ -83,6 +107,10 @@ protected override void OnReceive(object message) } } + public sealed class DummySystemMessage : ISystemMessage + { + } + [Fact] public async Task Can_Ask_Response_actor() { @@ -114,6 +142,60 @@ public async Task Can_get_timeout_when_asking_actor() await Assert.ThrowsAsync(async () => await actor.Ask("timeout", TimeSpan.FromSeconds(3))); } + [Fact] + public async Task Ask_should_put_timeout_answer_into_deadletter() + { + var actor = Sys.ActorOf(); + + await EventFilter.DeadLetter().ExpectOneAsync(TimeSpan.FromSeconds(5), async () => + { + await Assert.ThrowsAsync(async () => await actor.Ask("delay", TimeSpan.FromSeconds(1))); + }); + } + + [Fact] + public async Task Ask_should_put_too_many_answers_into_deadletter() + { + var actor = Sys.ActorOf(); + + await EventFilter.DeadLetter().ExpectAsync(2, async () => + { + var result = await actor.Ask("many", TimeSpan.FromSeconds(1)); + result.ShouldBe("answer1"); + }); + } + + [Fact] + public async Task Ask_should_not_put_canceled_answer_into_deadletter() + { + var actor = Sys.ActorOf(); + + await EventFilter.DeadLetter().ExpectAsync(0, async () => + { + using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1))) + await Assert.ThrowsAsync(async () => await actor.Ask("delay", Timeout.InfiniteTimeSpan, cts.Token)); + }); + } + + [Fact] + public async Task Ask_should_put_invalid_answer_into_deadletter() + { + var actor = Sys.ActorOf(); + + await EventFilter.DeadLetter().ExpectOne(async () => + { + await Assert.ThrowsAsync(async () => await actor.Ask("invalid", TimeSpan.FromSeconds(1))); + }); + } + + [Fact] + public async Task Ask_should_fail_on_system_message() + { + var actor = Sys.ActorOf(); + + await Assert.ThrowsAsync(async () => await actor.Ask("system", TimeSpan.FromSeconds(1))); + } + [Fact] public async Task Can_cancel_when_asking_actor() { diff --git a/src/core/Akka/Actor/ActorRef.cs b/src/core/Akka/Actor/ActorRef.cs index ca783f4ad5f..7208047ff44 100644 --- a/src/core/Akka/Actor/ActorRef.cs +++ b/src/core/Akka/Actor/ActorRef.cs @@ -69,51 +69,34 @@ public interface IRepointableRef : IActorRefScope /// /// ActorRef implementation used for one-off tasks. /// - public class FutureActorRef : MinimalActorRef + public sealed class FutureActorRef : MinimalActorRef { private readonly TaskCompletionSource _result; private readonly ActorPath _path; + private readonly IActorRefProvider _provider; /// /// INTERNAL API /// /// TBD - /// TBD /// TBD - public FutureActorRef(TaskCompletionSource result, Action unregister, ActorPath path) + /// TBD + public FutureActorRef(TaskCompletionSource result, ActorPath path, IActorRefProvider provider) { - if (ActorCell.Current != null) - { - _actorAwaitingResultSender = ActorCell.Current.Sender; - } _result = result; _path = path; - - _result.Task.ContinueWith(unregister); + _provider = provider; } /// /// TBD /// - public override ActorPath Path - { - get { return _path; } - } + public override ActorPath Path => _path; /// /// TBD /// - /// TBD - public override IActorRefProvider Provider - { - get { throw new NotImplementedException(); } - } - - - private const int INITIATED = 0; - private const int COMPLETED = 1; - private int status = INITIATED; - private readonly IActorRef _actorAwaitingResultSender; + public override IActorRefProvider Provider => _provider; /// /// TBD @@ -122,43 +105,36 @@ public override IActorRefProvider Provider /// TBD protected override void TellInternal(object message, IActorRef sender) { + var handled = false; - if (message is ISystemMessage sysM) //we have special handling for system messages - { - SendSystemMessage(sysM); - } - else + switch (message) { - if (Interlocked.Exchange(ref status, COMPLETED) == INITIATED) - { - if (message is T t) - { - _result.TrySetResult(t); - } - else if (message == null) //special case: https://github.com/akkadotnet/akka.net/issues/5204 - { - _result.TrySetResult(default); - } - else if (message is Failure f) - { - _result.TrySetException(f.Exception ?? new TaskCanceledException("Task cancelled by actor via Failure message.")); - } - else - { - _result.TrySetException(new ArgumentException( - $"Received message of type [{message.GetType()}] - Ask expected message of type [{typeof(T)}]")); - } - } + case ISystemMessage msg: + handled = _result.TrySetException(new InvalidOperationException($"system message of type '{msg.GetType().Name}' is invalid for {nameof(FutureActorRef)}")); + break; + case T t: + handled = _result.TrySetResult(t); + break; + case null: + handled = _result.TrySetResult(default); + break; + case Status.Failure f: + handled = _result.TrySetException(f.Cause + ?? new TaskCanceledException("Task cancelled by actor via Failure message.")); + break; + case Failure f: + handled = _result.TrySetException(f.Exception + ?? new TaskCanceledException("Task cancelled by actor via Failure message.")); + break; + default: + _ = _result.TrySetException(new ArgumentException( + $"Received message of type [{message.GetType()}] - Ask expected message of type [{typeof(T)}]")); + break; } - } - - /// - /// TBD - /// - /// TBD - public override void SendSystemMessage(ISystemMessage message) - { - base.SendSystemMessage(message); + + //ignore canceled ask and put unhandled answers into deadletter + if (!handled && !_result.Task.IsCanceled) + _provider.DeadLetters.Tell(message ?? default(T), this); } } @@ -727,16 +703,16 @@ public abstract class ActorRefWithCell : InternalActorRefBase private IEnumerable SelfAndChildren() { yield return this; - foreach(var child in Children.SelectMany(x => - { - switch(x) - { - case ActorRefWithCell cell: - return cell.SelfAndChildren(); - default: - return new[] { x }; - } - })) + foreach (var child in Children.SelectMany(x => + { + switch (x) + { + case ActorRefWithCell cell: + return cell.SelfAndChildren(); + default: + return new[] { x }; + } + })) { yield return child; } diff --git a/src/core/Akka/Actor/Futures.cs b/src/core/Akka/Actor/Futures.cs index 20dbcedd613..c49cc8b74e4 100644 --- a/src/core/Akka/Actor/Futures.cs +++ b/src/core/Akka/Actor/Futures.cs @@ -149,16 +149,19 @@ public static Task Ask(this ICanTell self, Func message } //create a new tempcontainer path - ActorPath path = provider.TempPath(); + var path = provider.TempPath(); - var future = new FutureActorRef(result, t => + var future = new FutureActorRef(result, path, provider); + + //The future actor needs to be unregistered in the temp container + _ = result.Task.ContinueWith(t => { provider.UnregisterTempActor(path); ctr1?.Dispose(); ctr2?.Dispose(); timeoutCancellation?.Dispose(); - }, path); + }, TaskContinuationOptions.ExecuteSynchronously); //The future actor needs to be registered in the temp container provider.RegisterTempActor(future, path);