Skip to content

Commit

Permalink
fixed racy ActorModelSpec (#4976)
Browse files Browse the repository at this point in the history
fixed `A_dispatcher_must_handle_queuing_from_multiple_threads` - we were using the wrong message type the entire time, and the previous instance caused `Thread.Sleep` to be called repeatedly.
  • Loading branch information
Aaronontheweb authored Apr 23, 2021
1 parent d178b63 commit 101f5a2
Showing 1 changed file with 37 additions and 8 deletions.
45 changes: 37 additions & 8 deletions src/core/Akka.Tests/Actor/Dispatch/ActorModelSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@
using Akka.Util;
using Akka.Util.Internal;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Tests.Actor.Dispatch
{
public abstract class ActorModelSpec : AkkaSpec
{
protected ActorModelSpec(Config hocon) : base(hocon) { }
private readonly ITestOutputHelper _testOutputHelper;
protected ActorModelSpec(Config hocon, ITestOutputHelper output = null) : base(hocon, output)
{
_testOutputHelper = output;
}

interface IActorModelMessage : INoSerializationVerificationNeeded { }

Expand Down Expand Up @@ -168,6 +173,13 @@ private DoubleStop() { }
public static readonly DoubleStop Instance = new DoubleStop();
}

private class GetStats : IActorModelMessage
{
private GetStats(){}

public static readonly GetStats Instance = new GetStats();
}

sealed class ThrowException : IActorModelMessage
{
public ThrowException(Exception e)
Expand All @@ -184,7 +196,7 @@ public ThrowException(Exception e)
class DispatcherActor : ReceiveActor
{
private Switch _busy = new Switch(false);

private readonly ILoggingAdapter _log = Context.GetLogger();
private MessageDispatcherInterceptor _interceptor = Context.Dispatcher.AsInstanceOf<MessageDispatcherInterceptor>();

private void Ack()
Expand Down Expand Up @@ -222,6 +234,11 @@ public DispatcherActor()
Receive<InterruptNicely>(interrupt => { Ack(); Sender.Tell(interrupt.Expect); _busy.SwitchOff(); });
Receive<ThrowException>(throwEx => { Ack(); _busy.SwitchOff(); throw throwEx.E; }, throwEx => true);
Receive<DoubleStop>(doubleStop => { Ack(); Context.Stop(Self); Context.Stop(Self); _busy.SwitchOff(); });
Receive<GetStats>(stats => {
Ack();
Sender.Tell(_interceptor.GetStats(Self));
_busy.SwitchOff();
});
}
}

Expand Down Expand Up @@ -474,7 +491,7 @@ public void A_dispatcher_must_process_messages_one_at_a_time()
AssertRefDefaultZero(a, registers: 1, msgsReceived: 3, msgsProcessed: 3, unregisters: 1, dispatcher: dispatcher);
}

[Fact(Skip = "Racy on Azure DevOps")]
[Fact]
public void A_dispatcher_must_handle_queuing_from_multiple_threads()
{
var dispatcher = InterceptedDispatcher();
Expand All @@ -487,14 +504,26 @@ public void A_dispatcher_must_handle_queuing_from_multiple_threads()
{
foreach (var c in Enumerable.Range(1, 20))
{
a.Tell(new WaitAck(1, counter));
a.Tell(new CountDown(counter));
}
});
}

AssertCountdown(counter, (int)Dilated(TimeSpan.FromSeconds(3.0)).TotalMilliseconds, "Should process 200 messages");
AssertRefDefaultZero(a, dispatcher, registers: 1, msgsReceived: 200, msgsProcessed: 200);
Sys.Stop(a);
try
{
AssertCountdown(counter, (int)Dilated(TimeSpan.FromSeconds(3.0)).TotalMilliseconds,
"Should process 200 messages");
AssertRefDefaultZero(a, dispatcher, registers: 1, msgsReceived: 200, msgsProcessed: 200);
}
finally
{
var stats = a.Ask<InterceptorStats>(GetStats.Instance).Result;
_testOutputHelper.WriteLine("Observed stats: {0}", stats);

Sys.Stop(a);
}


}

[Fact]
Expand Down Expand Up @@ -643,7 +672,7 @@ public class DispatcherModelSpec : ActorModelSpec
";

public DispatcherModelSpec() : base(DispatcherHocon) { }
public DispatcherModelSpec(ITestOutputHelper output) : base(DispatcherHocon, output) { }

protected override MessageDispatcherInterceptor InterceptedDispatcher()
{
Expand Down

0 comments on commit 101f5a2

Please sign in to comment.