From f809aea16bba0ba456c81b15b72e4107baae03f4 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 16 Sep 2022 17:09:42 -0500 Subject: [PATCH 1/3] prototyping `WatchAysnc` methods --- src/core/Akka/Actor/Futures.cs | 42 ++++++------- src/core/Akka/Actor/GracefulStopSupport.cs | 68 +++++++++++++--------- src/core/Akka/Actor/WatchAsyncSupport.cs | 30 ++++++++++ 3 files changed, 90 insertions(+), 50 deletions(-) create mode 100644 src/core/Akka/Actor/WatchAsyncSupport.cs diff --git a/src/core/Akka/Actor/Futures.cs b/src/core/Akka/Actor/Futures.cs index fdfe471babd..c35da15f8d8 100644 --- a/src/core/Akka/Actor/Futures.cs +++ b/src/core/Akka/Actor/Futures.cs @@ -438,9 +438,9 @@ private ActorPath GetPath() { while (true) { - if (State == null) + switch (State) { - if (UpdateState(null, Registering.Instance)) + case null when UpdateState(null, Registering.Instance): { ActorPath p = null; try @@ -454,21 +454,19 @@ private ActorPath GetPath() State = p; } } - continue; - } - - if (State is ActorPath) - return State as ActorPath; - if (State is StoppedWithPath) - return State.AsInstanceOf().Path; - if (State is Stopped) - { - //even if we are already stopped we still need to produce a proper path - UpdateState(Stopped.Instance, new StoppedWithPath(Provider.TempPath())); - continue; + case null: + continue; + case ActorPath _: + return State as ActorPath; + case StoppedWithPath stoppedWithPath: + return stoppedWithPath.Path; + case Stopped _: + //even if we are already stopped we still need to produce a proper path + UpdateState(Stopped.Instance, new StoppedWithPath(Provider.TempPath())); + continue; + case Registering _: + continue; } - if (State is Registering) - continue; } } @@ -494,14 +492,12 @@ protected override void TellInternal(object message, IActorRef sender) public override void SendSystemMessage(ISystemMessage message) { if (message is Terminate) Stop(); - else if (message is DeathWatchNotification) + else if (message is DeathWatchNotification dw) { - var dw = message as DeathWatchNotification; Tell(new Terminated(dw.Actor, dw.ExistenceConfirmed, dw.AddressTerminated), this); } - else if (message is Watch) + else if (message is Watch watch) { - var watch = message as Watch; if (Equals(watch.Watchee, this)) { if (!AddWatcher(watch.Watcher)) @@ -517,9 +513,8 @@ public override void SendSystemMessage(ISystemMessage message) } } } - else if (message is Unwatch) + else if (message is Unwatch unwatch) { - var unwatch = message as Unwatch; if (Equals(unwatch.Watchee, this) && !Equals(unwatch.Watcher, this)) RemoveWatcher(unwatch.Watcher); else Console.WriteLine("BUG: illegal Unwatch({0},{1}) for {2}", unwatch.Watchee, unwatch.Watcher, this); } @@ -539,9 +534,8 @@ public override void Stop() if (UpdateState(null, Stopped.Instance)) StopEnsureCompleted(); else continue; } - else if (state is ActorPath) + else if (state is ActorPath p) { - var p = state as ActorPath; if (UpdateState(p, new StoppedWithPath(p))) { try diff --git a/src/core/Akka/Actor/GracefulStopSupport.cs b/src/core/Akka/Actor/GracefulStopSupport.cs index d1f92e5c21e..a18071afb7d 100644 --- a/src/core/Akka/Actor/GracefulStopSupport.cs +++ b/src/core/Akka/Actor/GracefulStopSupport.cs @@ -15,47 +15,63 @@ namespace Akka.Actor { /// - /// Returns a that will be completed with success when existing messages - /// of the target actor have been processed and the actor has been terminated. - /// - /// Useful when you need to wait for termination or compose ordered termination of several actors, - /// which should only be done outside of the as blocking inside is discouraged. - /// - /// IMPORTANT: the actor being terminated and its supervisor being informed of the availability of the deceased actor's name - /// are two distinct operations, which do not obey any reliable ordering. - /// - /// If the target actor isn't terminated within the timeout the is completed with failure. - /// - /// If you want to invoke specialized stopping logic on your target actor instead of , you can pass your stop command as a parameter: - /// - /// GracefulStop(someChild, timeout, MyStopGracefullyMessage).ContinueWith(r => { - /// // Do something after someChild starts being stopped. - /// }); - /// + /// GracefulStop extensions. /// public static class GracefulStopSupport { /// - /// TBD + /// Returns a that will be completed with success when existing messages + /// of the target actor have been processed and the actor has been terminated. + /// + /// Useful when you need to wait for termination or compose ordered termination of several actors, + /// which should only be done outside of the as blocking inside is discouraged. + /// + /// IMPORTANT: the actor being terminated and its supervisor being informed of the availability of the deceased actor's name + /// are two distinct operations, which do not obey any reliable ordering. + /// + /// If the target actor isn't terminated within the timeout the is completed with failure. + /// + /// If you want to invoke specialized stopping logic on your target actor instead of , you can pass your stop command as a parameter: + /// + /// GracefulStop(someChild, timeout, MyStopGracefullyMessage).ContinueWith(r => { + /// // Do something after someChild starts being stopped. + /// }); + /// /// - /// TBD - /// TBD - /// TBD + /// The actor to be terminated. + /// The amount of time we're going to wait for the actor to terminate. + /// A that will return true if the shuts down within public static Task GracefulStop(this IActorRef target, TimeSpan timeout) { return GracefulStop(target, timeout, PoisonPill.Instance); } /// - /// TBD + /// Returns a that will be completed with success when existing messages + /// of the target actor have been processed and the actor has been terminated. + /// + /// Useful when you need to wait for termination or compose ordered termination of several actors, + /// which should only be done outside of the as blocking inside is discouraged. + /// + /// IMPORTANT: the actor being terminated and its supervisor being informed of the availability of the deceased actor's name + /// are two distinct operations, which do not obey any reliable ordering. + /// + /// If the target actor isn't terminated within the timeout the is completed with failure. + /// + /// If you want to invoke specialized stopping logic on your target actor instead of , you can pass your stop command as a parameter: + /// + /// GracefulStop(someChild, timeout, MyStopGracefullyMessage).ContinueWith(r => { + /// // Do something after someChild starts being stopped. + /// }); + /// /// - /// TBD - /// TBD - /// TBD + /// The actor to be terminated. + /// The amount of time we're going to wait for the actor to terminate. + /// A custom message to use to shutdown - by default the other overload uses . /// /// This exception is thrown if the underlying task is . /// - /// TBD + /// A that will return true if the shuts down within public static Task GracefulStop(this IActorRef target, TimeSpan timeout, object stopMessage) { var internalTarget = target.AsInstanceOf(); diff --git a/src/core/Akka/Actor/WatchAsyncSupport.cs b/src/core/Akka/Actor/WatchAsyncSupport.cs new file mode 100644 index 00000000000..ad29e6bfbf0 --- /dev/null +++ b/src/core/Akka/Actor/WatchAsyncSupport.cs @@ -0,0 +1,30 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Util.Internal; + +namespace Akka.Actor +{ + /// + /// WatchAsync extensions + /// + public static class WatchAsyncSupport + { + public static Task WatchAsync(this IActorRef target) + { + if (target is IInternalActorRef internalActorRef) + { + var promiseRef = PromiseActorRef.Apply(internalActorRef.Provider, ) + } + + throw new InvalidOperationException( + $"{target} is not an {typeof(IInternalActorRef)} and cannot be death-watched"); + } + } +} \ No newline at end of file From 56abdd2559af361d9c94ac22cc84a4121f8db2b2 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Sat, 17 Sep 2022 11:19:48 -0500 Subject: [PATCH 2/3] cleaned up `GracefulStop` and `PromiseActorRef` internals --- src/core/Akka/Actor/Futures.cs | 119 ++++++++++++--------- src/core/Akka/Actor/GracefulStopSupport.cs | 32 +++--- src/core/Akka/Actor/WatchAsyncSupport.cs | 20 ++-- 3 files changed, 93 insertions(+), 78 deletions(-) diff --git a/src/core/Akka/Actor/Futures.cs b/src/core/Akka/Actor/Futures.cs index c35da15f8d8..ea10e2029ca 100644 --- a/src/core/Akka/Actor/Futures.cs +++ b/src/core/Akka/Actor/Futures.cs @@ -126,7 +126,7 @@ public static Task Ask(this ICanTell self, Func message var result = TaskEx.NonBlockingTaskCompletionSource(); CancellationTokenSource timeoutCancellation = null; - timeout = timeout ?? provider.Settings.AskTimeout; + timeout ??= provider.Settings.AskTimeout; CancellationTokenRegistration? ctr1 = null; CancellationTokenRegistration? ctr2 = null; @@ -176,16 +176,22 @@ public static Task Ask(this ICanTell self, Func message /// Provider used for Ask pattern implementation internal static IActorRefProvider ResolveProvider(ICanTell self) { - if (self is ActorSelection selection) - return ResolveProvider(selection.Anchor); - - if (self is IInternalActorRef actorRef) - return actorRef.Provider; + while (true) + { + switch (self) + { + case ActorSelection selection: + self = selection.Anchor; + continue; + case IInternalActorRef actorRef: + return actorRef.Provider; + } - if (ActorCell.Current is ActorCell cell) - return cell.SystemImpl.Provider; + if (ActorCell.Current is ActorCell cell) return cell.SystemImpl.Provider; - return null; + return null; + break; + } } } @@ -244,12 +250,11 @@ internal sealed class Registering { private Registering() { } // ReSharper disable once InconsistentNaming - private static readonly Registering _instance = new Registering(); /// /// TBD /// - public static Registering Instance { get { return _instance; } } + public static Registering Instance { get; } = new Registering(); } /// @@ -259,12 +264,11 @@ internal sealed class Stopped { private Stopped() { } // ReSharper disable once InconsistentNaming - private static readonly Stopped _instance = new Stopped(); /// - /// TBD + /// Singleton instance. /// - public static Stopped Instance { get { return _instance; } } + public static Stopped Instance { get; } = new Stopped(); } /// @@ -337,28 +341,37 @@ public static PromiseActorRef Apply(IActorRefProvider provider, TimeSpan timeout sender = sender ?? ActorRefs.NoSender; var result = new TaskCompletionSource(); var a = new PromiseActorRef(provider, result, messageClassName); - var cancellationSource = new CancellationTokenSource(); - cancellationSource.Token.Register(CancelAction, result); - cancellationSource.CancelAfter(timeout); - //var scheduler = provider.Guardian.Underlying.System.Scheduler.Advanced; - //var c = new Cancelable(scheduler, timeout); - //scheduler.ScheduleOnce(timeout, () => result.TrySetResult(new Status.Failure(new AskTimeoutException( - // string.Format("Ask timed out on [{0}] after [{1} ms]. Sender[{2}] sent message of type {3}.", targetName, timeout.TotalMilliseconds, sender, messageClassName)))), - // c); + if (timeout != TimeSpan.Zero) + { + // avoid CTS + delegate allocation if timeouts aren't needed + var cancellationSource = new CancellationTokenSource(); + cancellationSource.Token.Register(CancelAction, result); + cancellationSource.CancelAfter(timeout); + } - result.Task.ContinueWith(r => + + async Task ExecPromise() { - a.Stop(); - }, TaskContinuationOptions.ExecuteSynchronously); + try + { + await result.Task; + } + finally + { + a.Stop(); + } + } + +#pragma warning disable CS4014 + ExecPromise(); // need this to run as a detached task +#pragma warning restore CS4014 return a; } #endregion - - //TODO: ActorCell.emptyActorRefSet ? - // Aaronontheweb: using the ImmutableHashSet.Empty for now + private readonly AtomicReference> _watchedByDoNotCallMeDirectly = new AtomicReference>(ImmutableHashSet.Empty); private ImmutableHashSet WatchedBy @@ -399,7 +412,6 @@ private void RemoveWatcher(IActorRef watcher) private ImmutableHashSet ClearWatchers() { - //TODO: ActorCell.emptyActorRefSet ? if (WatchedBy == null || WatchedBy.IsEmpty) return ImmutableHashSet.Empty; if (!UpdateWatchedBy(WatchedBy, null)) return ClearWatchers(); else return WatchedBy; @@ -491,32 +503,39 @@ protected override void TellInternal(object message, IActorRef sender) /// public override void SendSystemMessage(ISystemMessage message) { - if (message is Terminate) Stop(); - else if (message is DeathWatchNotification dw) - { - Tell(new Terminated(dw.Actor, dw.ExistenceConfirmed, dw.AddressTerminated), this); - } - else if (message is Watch watch) + switch (message) { - if (Equals(watch.Watchee, this)) + case Terminate _: + Stop(); + break; + case DeathWatchNotification dw: + Tell(new Terminated(dw.Actor, dw.ExistenceConfirmed, dw.AddressTerminated), this); + break; + case Watch watch: { - if (!AddWatcher(watch.Watcher)) - { - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - watch.Watcher.SendSystemMessage(new DeathWatchNotification(watch.Watchee, existenceConfirmed: true, - addressTerminated: false)); - } - else + if (Equals(watch.Watchee, this)) { - //TODO: find a way to get access to logger? - Console.WriteLine("BUG: illegal Watch({0},{1}) for {2}", watch.Watchee, watch.Watcher, this); + if (!AddWatcher(watch.Watcher)) + { + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watch.Watcher.SendSystemMessage(new DeathWatchNotification(watch.Watchee, existenceConfirmed: true, + addressTerminated: false)); + } + else + { + //TODO: find a way to get access to logger? + Console.WriteLine("BUG: illegal Watch({0},{1}) for {2}", watch.Watchee, watch.Watcher, this); + } } + + break; } - } - else if (message is Unwatch unwatch) - { - if (Equals(unwatch.Watchee, this) && !Equals(unwatch.Watcher, this)) RemoveWatcher(unwatch.Watcher); - else Console.WriteLine("BUG: illegal Unwatch({0},{1}) for {2}", unwatch.Watchee, unwatch.Watcher, this); + case Unwatch unwatch when Equals(unwatch.Watchee, this) && !Equals(unwatch.Watcher, this): + RemoveWatcher(unwatch.Watcher); + break; + case Unwatch unwatch: + Console.WriteLine("BUG: illegal Unwatch({0},{1}) for {2}", unwatch.Watchee, unwatch.Watcher, this); + break; } } diff --git a/src/core/Akka/Actor/GracefulStopSupport.cs b/src/core/Akka/Actor/GracefulStopSupport.cs index a18071afb7d..13d2b0d92b0 100644 --- a/src/core/Akka/Actor/GracefulStopSupport.cs +++ b/src/core/Akka/Actor/GracefulStopSupport.cs @@ -72,34 +72,30 @@ public static Task GracefulStop(this IActorRef target, TimeSpan timeout) /// This exception is thrown if the underlying task is . /// /// A that will return true if the shuts down within - public static Task GracefulStop(this IActorRef target, TimeSpan timeout, object stopMessage) + public static async Task GracefulStop(this IActorRef target, TimeSpan timeout, object stopMessage) { var internalTarget = target.AsInstanceOf(); var promiseRef = PromiseActorRef.Apply(internalTarget.Provider, timeout, target, stopMessage.GetType().Name); internalTarget.SendSystemMessage(new Watch(internalTarget, promiseRef)); target.Tell(stopMessage, ActorRefs.NoSender); - return promiseRef.Result.ContinueWith(t => + + try { - if (t.Status == TaskStatus.RanToCompletion) + var result = await promiseRef.Result; + switch (result) { - switch (t.Result) - { - case Terminated terminated: - return terminated.ActorRef.Path.Equals(target.Path); - default: - internalTarget.SendSystemMessage(new Unwatch(internalTarget, promiseRef)); - return false; - - } + case Terminated terminated: + return terminated.ActorRef.Path.Equals(target.Path); + default: + return false; } - + } + finally + { + // need to cleanup DeathWatch afterwards internalTarget.SendSystemMessage(new Unwatch(internalTarget, promiseRef)); - if (t.Status == TaskStatus.Canceled) - throw new TaskCanceledException(); - - throw t.Exception; - }, TaskContinuationOptions.ExecuteSynchronously); + } } } } diff --git a/src/core/Akka/Actor/WatchAsyncSupport.cs b/src/core/Akka/Actor/WatchAsyncSupport.cs index ad29e6bfbf0..a8636ec7301 100644 --- a/src/core/Akka/Actor/WatchAsyncSupport.cs +++ b/src/core/Akka/Actor/WatchAsyncSupport.cs @@ -16,15 +16,15 @@ namespace Akka.Actor /// public static class WatchAsyncSupport { - public static Task WatchAsync(this IActorRef target) - { - if (target is IInternalActorRef internalActorRef) - { - var promiseRef = PromiseActorRef.Apply(internalActorRef.Provider, ) - } - - throw new InvalidOperationException( - $"{target} is not an {typeof(IInternalActorRef)} and cannot be death-watched"); - } + // public static Task WatchAsync(this IActorRef target) + // { + // if (target is IInternalActorRef internalActorRef) + // { + // var promiseRef = PromiseActorRef.Apply(internalActorRef.Provider, ) + // } + // + // throw new InvalidOperationException( + // $"{target} is not an {typeof(IInternalActorRef)} and cannot be death-watched"); + // } } } \ No newline at end of file From f36fe83b2589526e7554ce1b54b92953d6d5674a Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 8 Feb 2023 21:41:01 -0600 Subject: [PATCH 3/3] implemented `WatchAsync` --- .../CoreAPISpec.ApproveCore.Core.verified.txt | 4 + ...oreAPISpec.ApproveCore.DotNet.verified.txt | 4 + .../CoreAPISpec.ApproveCore.Net.verified.txt | 4 + .../Akka.Tests/Actor/GracefulStopSpecs.cs | 2 +- src/core/Akka.Tests/Actor/WatchAsyncSpecs.cs | 71 +++++++++++++++ src/core/Akka/Actor/Futures.cs | 89 ++++++++++++++----- src/core/Akka/Actor/GracefulStopSupport.cs | 25 +++--- src/core/Akka/Actor/WatchAsyncSupport.cs | 48 +++++++--- 8 files changed, 205 insertions(+), 42 deletions(-) create mode 100644 src/core/Akka.Tests/Actor/WatchAsyncSpecs.cs diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt index e1ee2709dbb..341173bf205 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt @@ -1882,6 +1882,10 @@ namespace Akka.Actor public Akka.Actor.IStash Stash { get; set; } } public delegate void UntypedReceive(object message); + public class static WatchAsyncSupport + { + public static System.Threading.Tasks.Task WatchAsync(this Akka.Actor.IActorRef target, System.Threading.CancellationToken token = null) { } + } public class static WrappedMessage { public static object Unwrap(object message) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 5aaa234a5fe..98d81fa34b1 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -1884,6 +1884,10 @@ namespace Akka.Actor public Akka.Actor.IStash Stash { get; set; } } public delegate void UntypedReceive(object message); + public class static WatchAsyncSupport + { + public static System.Threading.Tasks.Task WatchAsync(this Akka.Actor.IActorRef target, System.Threading.CancellationToken token = null) { } + } public class static WrappedMessage { public static object Unwrap(object message) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index e1ee2709dbb..341173bf205 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -1882,6 +1882,10 @@ namespace Akka.Actor public Akka.Actor.IStash Stash { get; set; } } public delegate void UntypedReceive(object message); + public class static WatchAsyncSupport + { + public static System.Threading.Tasks.Task WatchAsync(this Akka.Actor.IActorRef target, System.Threading.CancellationToken token = null) { } + } public class static WrappedMessage { public static object Unwrap(object message) { } diff --git a/src/core/Akka.Tests/Actor/GracefulStopSpecs.cs b/src/core/Akka.Tests/Actor/GracefulStopSpecs.cs index 810eb4388fe..7b353320fc9 100644 --- a/src/core/Akka.Tests/Actor/GracefulStopSpecs.cs +++ b/src/core/Akka.Tests/Actor/GracefulStopSpecs.cs @@ -51,7 +51,7 @@ public async Task GracefulStopShouldReturnTrueForAlreadyDeadActor() private class CustomShutdown{} - [Fact(DisplayName = "GracefulStop should return false if shutdown goes overtime", Skip = "GracefulStop currently throws a TaskCancellationException, which seems wrong")] + [Fact(DisplayName = "GracefulStop should return false if shutdown goes overtime")] public async Task GracefulStopShouldThrowIfShutdownGoesOvertime() { // arrange diff --git a/src/core/Akka.Tests/Actor/WatchAsyncSpecs.cs b/src/core/Akka.Tests/Actor/WatchAsyncSpecs.cs new file mode 100644 index 00000000000..9d4854b620e --- /dev/null +++ b/src/core/Akka.Tests/Actor/WatchAsyncSpecs.cs @@ -0,0 +1,71 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2023 Lightbend Inc. +// Copyright (C) 2013-2023 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.TestKit; +using Akka.TestKit.TestActors; +using FluentAssertions; +using Xunit; + +namespace Akka.Tests.Actor +{ + public class WatchAsyncSpecs : AkkaSpec + { + [Fact(DisplayName = "WatchAsync should return true when actor is terminated")] + public async Task WatchAsync_should_return_true_when_actor_is_terminated() + { + // arrange + var actor = Sys.ActorOf(BlackHoleActor.Props); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + var terminatedTask = actor.WatchAsync(cts.Token); + + // act + Sys.Stop(actor); + var terminated = await terminatedTask; + + // assert + terminated.Should().BeTrue(); + } + + [Fact(DisplayName = "WatchAsync should return true when called on actor that is already terminated")] + public async Task WatchAsync_should_return_true_when_actor_is_already_terminated() + { + // arrange + var actor = Sys.ActorOf(BlackHoleActor.Props); + Watch(actor); + Sys.Stop(actor); + await ExpectTerminatedAsync(actor); + + // act + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); + var terminated = await actor.WatchAsync(cts.Token); + + // assert + terminated.Should().BeTrue(); + } + + [Fact(DisplayName = "WatchAsync should return false when cancellation token is cancelled")] + public async Task WatchAsync_should_return_true_when_cancelled() + { + // arrange + var actor = Sys.ActorOf(BlackHoleActor.Props); + using var cts = new CancellationTokenSource(); + var terminatedTask = actor.WatchAsync(cts.Token); + + // act + cts.Cancel(); + var terminated = await terminatedTask; + + // assert + terminated.Should().BeFalse(); + } + } +} \ No newline at end of file diff --git a/src/core/Akka/Actor/Futures.cs b/src/core/Akka/Actor/Futures.cs index 4a26a728555..c56f625038e 100644 --- a/src/core/Akka/Actor/Futures.cs +++ b/src/core/Akka/Actor/Futures.cs @@ -187,7 +187,7 @@ internal static IActorRefProvider ResolveProvider(ICanTell self) return actorRef.Provider; } - if (ActorCell.Current is ActorCell cell) return cell.SystemImpl.Provider; + if (ActorCell.Current is { } cell) return cell.SystemImpl.Provider; return null; break; @@ -288,7 +288,7 @@ public StoppedWithPath(ActorPath path) /// /// TBD /// - public ActorPath Path { get; private set; } + public ActorPath Path { get; } #region Equality @@ -305,7 +305,7 @@ public override bool Equals(object obj) { if (ReferenceEquals(null, obj)) return false; if (ReferenceEquals(this, obj)) return true; - return obj is StoppedWithPath && Equals((StoppedWithPath)obj); + return obj is StoppedWithPath path && Equals(path); } /// @@ -326,6 +326,46 @@ public override int GetHashCode() // use a static delegate to avoid allocations private static readonly Action CancelAction = o => ((TaskCompletionSource)o).TrySetCanceled(); + /// + /// Creates a new + /// + /// The current actor ref provider. + /// The name of the message class. + /// An external cancellation token. + /// A new + /// + /// API is used by WatchAsync. + /// + public static PromiseActorRef Apply(IActorRefProvider provider, + string messageClassName, CancellationToken cancellationToken = default) + { + var result = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var a = new PromiseActorRef(provider, result, messageClassName); + + if (cancellationToken != default) + { + cancellationToken.Register(CancelAction, result); + } + + async Task ExecPromise() + { + try + { + await result.Task; + } + finally + { + a.Stop(); + } + } + +#pragma warning disable CS4014 + ExecPromise(); // need this to run as a detached task +#pragma warning restore CS4014 + + return a; + } + /// /// Creates a new /// @@ -338,36 +378,35 @@ public override int GetHashCode() public static PromiseActorRef Apply(IActorRefProvider provider, TimeSpan timeout, object targetName, string messageClassName, IActorRef sender = null) { - sender = sender ?? ActorRefs.NoSender; - var result = new TaskCompletionSource(); - var a = new PromiseActorRef(provider, result, messageClassName); + CancellationTokenSource cancellationSource = default; if (timeout != TimeSpan.Zero) { // avoid CTS + delegate allocation if timeouts aren't needed - var cancellationSource = new CancellationTokenSource(); - cancellationSource.Token.Register(CancelAction, result); + cancellationSource = new CancellationTokenSource(); cancellationSource.CancelAfter(timeout); } - + var p = Apply(provider, messageClassName, cancellationSource?.Token ?? default); + + // need to dispose CTS afterwards async Task ExecPromise() { try { - await result.Task; + await p._promise.Task; } finally { - a.Stop(); + cancellationSource?.Dispose(); } } #pragma warning disable CS4014 - ExecPromise(); // need this to run as a detached task + ExecPromise(); #pragma warning restore CS4014 - return a; + return p; } #endregion @@ -403,18 +442,26 @@ private bool AddWatcher(IActorRef watcher) private void RemoveWatcher(IActorRef watcher) { - if (!WatchedBy.Contains(watcher)) + while (true) { - return; + if (!WatchedBy.Contains(watcher)) + { + return; + } + + if (!UpdateWatchedBy(WatchedBy, WatchedBy.Remove(watcher))) continue; + break; } - if (!UpdateWatchedBy(WatchedBy, WatchedBy.Remove(watcher))) RemoveWatcher(watcher); } private ImmutableHashSet ClearWatchers() { - if (WatchedBy == null || WatchedBy.IsEmpty) return ImmutableHashSet.Empty; - if (!UpdateWatchedBy(WatchedBy, null)) return ClearWatchers(); - else return WatchedBy; + while (true) + { + if (WatchedBy == null || WatchedBy.IsEmpty) return ImmutableHashSet.Empty; + if (!UpdateWatchedBy(WatchedBy, null)) continue; + return WatchedBy; + } } private object State @@ -485,7 +532,7 @@ private ActorPath GetPath() /// InternalActorRefBase.TellInternal protected override void TellInternal(object message, IActorRef sender) { - if (State is Stopped || State is StoppedWithPath) Provider.DeadLetters.Tell(message); + if (State is Stopped or StoppedWithPath) Provider.DeadLetters.Tell(message); else { if (message == null) throw new InvalidMessageException("Message is null"); @@ -571,7 +618,7 @@ public override void Stop() continue; } } - else if (state is Stopped || state is StoppedWithPath) + else if (state is Stopped or StoppedWithPath) { //already stopped } diff --git a/src/core/Akka/Actor/GracefulStopSupport.cs b/src/core/Akka/Actor/GracefulStopSupport.cs index 13d2b0d92b0..ac1fd1584a0 100644 --- a/src/core/Akka/Actor/GracefulStopSupport.cs +++ b/src/core/Akka/Actor/GracefulStopSupport.cs @@ -74,22 +74,28 @@ public static Task GracefulStop(this IActorRef target, TimeSpan timeout) /// A that will return true if the shuts down within public static async Task GracefulStop(this IActorRef target, TimeSpan timeout, object stopMessage) { - var internalTarget = target.AsInstanceOf(); + if (target is not IInternalActorRef internalTarget) + throw new InvalidOperationException( + $"{target} is not an {typeof(IInternalActorRef)} and cannot be death-watched"); - var promiseRef = PromiseActorRef.Apply(internalTarget.Provider, timeout, target, stopMessage.GetType().Name); + var promiseRef = + PromiseActorRef.Apply(internalTarget.Provider, timeout, target, stopMessage.GetType().Name); internalTarget.SendSystemMessage(new Watch(internalTarget, promiseRef)); target.Tell(stopMessage, ActorRefs.NoSender); try { var result = await promiseRef.Result; - switch (result) + return result switch { - case Terminated terminated: - return terminated.ActorRef.Path.Equals(target.Path); - default: - return false; - } + Terminated terminated => terminated.ActorRef.Path.Equals(target.Path), + _ => false + }; + } + catch + { + // no need to throw here - the returned `false` status does the job just fine + return false; } finally { @@ -98,5 +104,4 @@ public static async Task GracefulStop(this IActorRef target, TimeSpan time } } } -} - +} \ No newline at end of file diff --git a/src/core/Akka/Actor/WatchAsyncSupport.cs b/src/core/Akka/Actor/WatchAsyncSupport.cs index a8636ec7301..f01a3d11764 100644 --- a/src/core/Akka/Actor/WatchAsyncSupport.cs +++ b/src/core/Akka/Actor/WatchAsyncSupport.cs @@ -6,7 +6,9 @@ //----------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Tasks; +using Akka.Dispatch.SysMsg; using Akka.Util.Internal; namespace Akka.Actor @@ -16,15 +18,41 @@ namespace Akka.Actor /// public static class WatchAsyncSupport { - // public static Task WatchAsync(this IActorRef target) - // { - // if (target is IInternalActorRef internalActorRef) - // { - // var promiseRef = PromiseActorRef.Apply(internalActorRef.Provider, ) - // } - // - // throw new InvalidOperationException( - // $"{target} is not an {typeof(IInternalActorRef)} and cannot be death-watched"); - // } + /// + /// WatchAsync allows non-Akka.NET components to subscribe to DeathWatch notifications for a given + /// + /// The actor to watch. + /// Optional - a cancellation token. + /// true if the target was terminated, false otherwise. + /// Thrown if we attempt to watch a + public static async Task WatchAsync(this IActorRef target, CancellationToken token = default) + { + if (target is not IInternalActorRef internalActorRef) + throw new InvalidOperationException( + $"{target} is not an {typeof(IInternalActorRef)} and cannot be death-watched"); + + var promiseRef = PromiseActorRef.Apply(internalActorRef.Provider, nameof(Watch), token); + internalActorRef.SendSystemMessage(new Watch(internalActorRef, promiseRef)); + + try + { + var result = await promiseRef.Result; + return result switch + { + Terminated terminated => terminated.ActorRef.Path.Equals(target.Path), + _ => false + }; + } + catch + { + // no need to throw here - the returned `false` status does the job just fine + return false; + } + finally + { + // need to cleanup DeathWatch afterwards + internalActorRef.SendSystemMessage(new Unwatch(internalActorRef, promiseRef)); + } + } } } \ No newline at end of file