From 101f5a249d40e9549242e9e17572cbe85e460dc3 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 23 Apr 2021 13:40:34 -0500 Subject: [PATCH] fixed racy ActorModelSpec (#4976) fixed `A_dispatcher_must_handle_queuing_from_multiple_threads` - we were using the wrong message type the entire time, and the previous instance caused `Thread.Sleep` to be called repeatedly. --- .../Actor/Dispatch/ActorModelSpec.cs | 45 +++++++++++++++---- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/src/core/Akka.Tests/Actor/Dispatch/ActorModelSpec.cs b/src/core/Akka.Tests/Actor/Dispatch/ActorModelSpec.cs index 69c7a8cf287..6388d94f569 100644 --- a/src/core/Akka.Tests/Actor/Dispatch/ActorModelSpec.cs +++ b/src/core/Akka.Tests/Actor/Dispatch/ActorModelSpec.cs @@ -21,12 +21,17 @@ using Akka.Util; using Akka.Util.Internal; using Xunit; +using Xunit.Abstractions; namespace Akka.Tests.Actor.Dispatch { public abstract class ActorModelSpec : AkkaSpec { - protected ActorModelSpec(Config hocon) : base(hocon) { } + private readonly ITestOutputHelper _testOutputHelper; + protected ActorModelSpec(Config hocon, ITestOutputHelper output = null) : base(hocon, output) + { + _testOutputHelper = output; + } interface IActorModelMessage : INoSerializationVerificationNeeded { } @@ -168,6 +173,13 @@ private DoubleStop() { } public static readonly DoubleStop Instance = new DoubleStop(); } + private class GetStats : IActorModelMessage + { + private GetStats(){} + + public static readonly GetStats Instance = new GetStats(); + } + sealed class ThrowException : IActorModelMessage { public ThrowException(Exception e) @@ -184,7 +196,7 @@ public ThrowException(Exception e) class DispatcherActor : ReceiveActor { private Switch _busy = new Switch(false); - + private readonly ILoggingAdapter _log = Context.GetLogger(); private MessageDispatcherInterceptor _interceptor = Context.Dispatcher.AsInstanceOf(); private void Ack() @@ -222,6 +234,11 @@ public DispatcherActor() Receive(interrupt => { Ack(); Sender.Tell(interrupt.Expect); _busy.SwitchOff(); }); Receive(throwEx => { Ack(); _busy.SwitchOff(); throw throwEx.E; }, throwEx => true); Receive(doubleStop => { Ack(); Context.Stop(Self); Context.Stop(Self); _busy.SwitchOff(); }); + Receive(stats => { + Ack(); + Sender.Tell(_interceptor.GetStats(Self)); + _busy.SwitchOff(); + }); } } @@ -474,7 +491,7 @@ public void A_dispatcher_must_process_messages_one_at_a_time() AssertRefDefaultZero(a, registers: 1, msgsReceived: 3, msgsProcessed: 3, unregisters: 1, dispatcher: dispatcher); } - [Fact(Skip = "Racy on Azure DevOps")] + [Fact] public void A_dispatcher_must_handle_queuing_from_multiple_threads() { var dispatcher = InterceptedDispatcher(); @@ -487,14 +504,26 @@ public void A_dispatcher_must_handle_queuing_from_multiple_threads() { foreach (var c in Enumerable.Range(1, 20)) { - a.Tell(new WaitAck(1, counter)); + a.Tell(new CountDown(counter)); } }); } - AssertCountdown(counter, (int)Dilated(TimeSpan.FromSeconds(3.0)).TotalMilliseconds, "Should process 200 messages"); - AssertRefDefaultZero(a, dispatcher, registers: 1, msgsReceived: 200, msgsProcessed: 200); - Sys.Stop(a); + try + { + AssertCountdown(counter, (int)Dilated(TimeSpan.FromSeconds(3.0)).TotalMilliseconds, + "Should process 200 messages"); + AssertRefDefaultZero(a, dispatcher, registers: 1, msgsReceived: 200, msgsProcessed: 200); + } + finally + { + var stats = a.Ask(GetStats.Instance).Result; + _testOutputHelper.WriteLine("Observed stats: {0}", stats); + + Sys.Stop(a); + } + + } [Fact] @@ -643,7 +672,7 @@ public class DispatcherModelSpec : ActorModelSpec "; - public DispatcherModelSpec() : base(DispatcherHocon) { } + public DispatcherModelSpec(ITestOutputHelper output) : base(DispatcherHocon, output) { } protected override MessageDispatcherInterceptor InterceptedDispatcher() {