Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async/await support to TestKit and migrate tests to Async api to reduce racy failures #4072

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ protected override void BeforeTermination()
/// <summary>
/// Using region 2 as it is not shutdown in either test.
/// </summary>
private void PingEntities()
private async Task PingEntities()
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
_region2.Tell(1, _probe2.Ref);
_probe2.ExpectMsg<int>(1.Seconds()).Should().Be(1);
Expand All @@ -117,22 +117,22 @@ private void PingEntities()
}

[Fact]
public void Sharding_and_CoordinatedShutdown_must_run_successfully()
public async Task Sharding_and_CoordinatedShutdown_must_run_successfully()
{
InitCluster();
RunCoordinatedShutdownWhenLeaving();
RunCoordinatedShutdownWhenDowning();
await InitCluster();
await RunCoordinatedShutdownWhenLeaving();
await RunCoordinatedShutdownWhenDowning();
}

private void InitCluster()
private async Task InitCluster()
{
Cluster.Get(_sys1).Join(Cluster.Get(_sys1).SelfAddress); // coordinator will initially run on sys1
AwaitAssert(() => Cluster.Get(_sys1).SelfMember.Status.Should().Be(MemberStatus.Up));
await AwaitAssertAsync(() => Cluster.Get(_sys1).SelfMember.Status.Should().Be(MemberStatus.Up));

Cluster.Get(_sys2).Join(Cluster.Get(_sys1).SelfAddress);
Within(10.Seconds(), () =>
await WithinAsync(10.Seconds(), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(_sys1).State.Members.Count.Should().Be(2);
Cluster.Get(_sys1).State.Members.All(x => x.Status == MemberStatus.Up).Should().BeTrue();
Expand All @@ -142,9 +142,9 @@ private void InitCluster()
});

Cluster.Get(_sys3).Join(Cluster.Get(_sys1).SelfAddress);
Within(10.Seconds(), () =>
await WithinAsync(10.Seconds(), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(_sys1).State.Members.Count.Should().Be(3);
Cluster.Get(_sys1).State.Members.All(x => x.Status == MemberStatus.Up).Should().BeTrue();
Expand All @@ -155,59 +155,59 @@ private void InitCluster()
});
});

PingEntities();
await PingEntities();
}

private void RunCoordinatedShutdownWhenLeaving()
private async Task RunCoordinatedShutdownWhenLeaving()
{
Cluster.Get(_sys3).Leave(Cluster.Get(_sys1).SelfAddress);
_probe1.ExpectMsg("CS-unbind-1");

Within(20.Seconds(), () =>
await WithinAsync(20.Seconds(), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(_sys2).State.Members.Count.Should().Be(2);
Cluster.Get(_sys3).State.Members.Count.Should().Be(2);
});
});

Within(10.Seconds(), () =>
await WithinAsync(10.Seconds(), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(_sys1).IsTerminated.Should().BeTrue();
_sys1.WhenTerminated.IsCompleted.Should().BeTrue();
});
});

PingEntities();
await PingEntities();
}

private void RunCoordinatedShutdownWhenDowning()
private async Task RunCoordinatedShutdownWhenDowning()
{
// coordinator is on Sys2
Cluster.Get(_sys2).Down(Cluster.Get(_sys3).SelfAddress);
_probe3.ExpectMsg("CS-unbind-3");

Within(20.Seconds(), () =>
await WithinAsync(20.Seconds(), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(_sys2).State.Members.Count.Should().Be(1);
});
});

Within(10.Seconds(), () =>
await WithinAsync(10.Seconds(), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(_sys3).IsTerminated.Should().BeTrue();
_sys3.WhenTerminated.IsCompleted.Should().BeTrue();
});
});

PingEntities();
await PingEntities();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public PersistentShardSpec(ITestOutputHelper helper) : base(SpecConfig, helper)
}

[Fact]
public void Persistent_Shard_must_remember_entities_started_with_StartEntity()
public async Task Persistent_Shard_must_remember_entities_started_with_StartEntity()
{
Func<string, Props> ep = id => Props.Create(() => new EntityActor(id));

Expand Down Expand Up @@ -82,7 +82,7 @@ public void Persistent_Shard_must_remember_entities_started_with_StartEntity()
var secondIncarnation = Sys.ActorOf(props);

secondIncarnation.Tell(Shard.GetShardStats.Instance);
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
ExpectMsgAllOf(new Shard.ShardStats("shard-1", 1));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
Expand Down Expand Up @@ -72,7 +73,7 @@ public ClusterSingletonLeavingSpeedSpec() : base(@"
_probes = _systems.Select(i => CreateTestProbe()).ToArray();
}

public void Join(ActorSystem from, ActorSystem to, IActorRef probe)
public async Task JoinAsync(ActorSystem from, ActorSystem to, IActorRef probe)
{
from.ActorOf(ClusterSingletonManager.Props(
TheSingleton.props(probe),
Expand All @@ -81,9 +82,9 @@ public void Join(ActorSystem from, ActorSystem to, IActorRef probe)

Cluster.Get(from).Join(Cluster.Get(to).SelfAddress);

Within(TimeSpan.FromSeconds(15), () =>
await WithinAsync(TimeSpan.FromSeconds(15), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(from).State.Members.Select(x => x.UniqueAddress).Should().Contain(Cluster.Get(from).SelfUniqueAddress);
Cluster.Get(from)
Expand All @@ -96,24 +97,24 @@ public void Join(ActorSystem from, ActorSystem to, IActorRef probe)
}

[Fact]
public void ClusterSingleton_that_is_leaving_must()
public async Task ClusterSingleton_that_is_leaving_must()
{
ClusterSingleton_that_is_leaving_must_join_cluster();
ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest();
await ClusterSingleton_that_is_leaving_must_join_cluster();
await ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest();
}

private void ClusterSingleton_that_is_leaving_must_join_cluster()
private async Task ClusterSingleton_that_is_leaving_must_join_cluster()
{
for (int i = 0; i < _systems.Length; i++)
Join(_systems[i], _systems[0], _probes[i]);
await JoinAsync(_systems[i], _systems[0], _probes[i]);

// leader is most likely on system, lowest port
Join(Sys, _systems[0], TestActor);
await JoinAsync(Sys, _systems[0], TestActor);

_probes[0].ExpectMsg("started");
}

private void ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest()
private async Task ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest()
{
List<(TimeSpan, TimeSpan)> durations = new List<(TimeSpan, TimeSpan)>();
Stopwatch sw = new Stopwatch();
Expand All @@ -132,9 +133,9 @@ private void ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_old

var startedDuration = sw.Elapsed;

Within(TimeSpan.FromSeconds(15), () =>
await WithinAsync(TimeSpan.FromSeconds(15), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(_systems[i]).IsTerminated.Should().BeTrue();
Cluster.Get(Sys).State.Members.Select(m => m.Address).Should().NotContain(leaveAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ClusterSingletonRestart2Spec() : base(@"
.WithFallback(Sys.Settings.Config));
}

public void Join(ActorSystem from, ActorSystem to)
public async Task JoinAsync(ActorSystem from, ActorSystem to)
{
if (Cluster.Get(from).SelfRoles.Contains("singleton"))
{
Expand All @@ -54,9 +54,9 @@ public void Join(ActorSystem from, ActorSystem to)
}


Within(TimeSpan.FromSeconds(45), () =>
await WithinAsync(TimeSpan.FromSeconds(45), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(from).Join(Cluster.Get(to).SelfAddress);
Cluster.Get(from).State.Members.Select(x => x.UniqueAddress).Should().Contain(Cluster.Get(from).SelfUniqueAddress);
Expand All @@ -70,18 +70,18 @@ public void Join(ActorSystem from, ActorSystem to)
}

[Fact]
public void Restarting_cluster_node_during_hand_over_must_restart_singletons_in_restarted_node()
public async Task Restarting_cluster_node_during_hand_over_must_restart_singletons_in_restarted_node()
{
Join(_sys1, _sys1);
Join(_sys2, _sys1);
Join(_sys3, _sys1);
await JoinAsync(_sys1, _sys1);
await JoinAsync(_sys2, _sys1);
await JoinAsync(_sys3, _sys1);

var proxy3 = _sys3.ActorOf(
ClusterSingletonProxy.Props("user/echo", ClusterSingletonProxySettings.Create(_sys3).WithRole("singleton")), "proxy3");

Within(TimeSpan.FromSeconds(5), () =>
await WithinAsync(TimeSpan.FromSeconds(5), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
var probe = CreateTestProbe(_sys3);
proxy3.Tell("hello", probe.Ref);
Expand All @@ -104,14 +104,14 @@ public void Restarting_cluster_node_during_hand_over_must_restart_singletons_in_
.WithFallback(_sys1.Settings.Config);
_sys4 = ActorSystem.Create(_sys1.Name, sys4Config);

Join(_sys4, _sys3);
await JoinAsync(_sys4, _sys3);

// let it stabilize
Task.Delay(TimeSpan.FromSeconds(5)).Wait();

Within(TimeSpan.FromSeconds(10), () =>
await WithinAsync(TimeSpan.FromSeconds(10), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
var probe = CreateTestProbe(_sys3);
proxy3.Tell("hello2", probe.Ref);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
Expand Down Expand Up @@ -39,15 +40,15 @@ public ClusterSingletonRestartSpec() : base(@"
_sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
}

public void Join(ActorSystem from, ActorSystem to)
public async Task JoinAsync(ActorSystem from, ActorSystem to)
{
from.ActorOf(ClusterSingletonManager.Props(Echo.Props,
PoisonPill.Instance,
ClusterSingletonManagerSettings.Create(from)), "echo");

Within(TimeSpan.FromSeconds(10), () =>
await WithinAsync(TimeSpan.FromSeconds(10), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(from).Join(Cluster.Get(to).SelfAddress);
Cluster.Get(from).State.Members.Select(x => x.UniqueAddress).Should().Contain(Cluster.Get(from).SelfUniqueAddress);
Expand All @@ -61,17 +62,17 @@ public void Join(ActorSystem from, ActorSystem to)
}

[Fact]
public void Restarting_cluster_node_with_same_hostname_and_port_must_handover_to_next_oldest()
public async Task Restarting_cluster_node_with_same_hostname_and_port_must_handover_to_next_oldest()
{
Join(_sys1, _sys1);
Join(_sys2, _sys1);
await JoinAsync(_sys1, _sys1);
await JoinAsync(_sys2, _sys1);

var proxy2 = _sys2.ActorOf(
ClusterSingletonProxy.Props("user/echo", ClusterSingletonProxySettings.Create(_sys2)), "proxy2");

Within(TimeSpan.FromSeconds(5), () =>
await WithinAsync(TimeSpan.FromSeconds(5), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
var probe = CreateTestProbe(_sys2);
proxy2.Tell("hello", probe.Ref);
Expand All @@ -88,11 +89,11 @@ public void Restarting_cluster_node_with_same_hostname_and_port_must_handover_to
.WithFallback(_sys1.Settings.Config);
_sys3 = ActorSystem.Create(_sys1.Name, sys3Config);

Join(_sys3, _sys2);
await JoinAsync(_sys3, _sys2);

Within(TimeSpan.FromSeconds(5), () =>
await WithinAsync(TimeSpan.FromSeconds(5), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
var probe = CreateTestProbe(_sys2);
proxy2.Tell("hello2", probe.Ref);
Expand All @@ -102,9 +103,9 @@ public void Restarting_cluster_node_with_same_hostname_and_port_must_handover_to

Cluster.Get(_sys2).Leave(Cluster.Get(_sys2).SelfAddress);

Within(TimeSpan.FromSeconds(15), () =>
await WithinAsync(TimeSpan.FromSeconds(15), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
Cluster.Get(_sys3)
.State.Members.Select(x => x.UniqueAddress)
Expand All @@ -117,9 +118,9 @@ public void Restarting_cluster_node_with_same_hostname_and_port_must_handover_to
_sys3.ActorOf(ClusterSingletonProxy.Props("user/echo", ClusterSingletonProxySettings.Create(_sys3)),
"proxy3");

Within(TimeSpan.FromSeconds(5), () =>
await WithinAsync(TimeSpan.FromSeconds(5), async () =>
{
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
var probe = CreateTestProbe(_sys3);
proxy3.Tell("hello3", probe.Ref);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System.Collections.Immutable;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Xunit;
Expand Down Expand Up @@ -46,7 +47,7 @@ public LocalConcurrencySpec(ITestOutputHelper output)
}

[Fact]
public void Updates_from_same_node_should_be_possible_to_do_from_two_actors()
public async Task Updates_from_same_node_should_be_possible_to_do_from_two_actors()
{
var updater1 = ActorOf(Props.Create<Updater>(), "updater1");
var updater2 = ActorOf(Props.Create<Updater>(), "updater2");
Expand All @@ -64,7 +65,7 @@ public void Updates_from_same_node_should_be_possible_to_do_from_two_actors()
}

var expected = b.ToImmutable();
AwaitAssert(() =>
await AwaitAssertAsync(() =>
{
_replicator.Tell(Dsl.Get(Updater.Key, ReadLocal.Instance));
var msg = ExpectMsg<GetSuccess>();
Expand Down
Loading