diff --git a/src/core/Akka.Cluster.Tests/ClusterSpec.cs b/src/core/Akka.Cluster.Tests/ClusterSpec.cs index f2cd2e427a1..a11e14faae5 100644 --- a/src/core/Akka.Cluster.Tests/ClusterSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterSpec.cs @@ -364,8 +364,9 @@ public async Task A_cluster_JoinAsync_must_fail_if_could_not_connect_to_cluster( { await Awaiting(async () => { + using var cts = new CancellationTokenSource(10.Seconds()); var nonExisting = Address.Parse($"akka.tcp://{selfAddress.System}@127.0.0.1:9999/"); - var task = cluster.JoinAsync(nonExisting); + var task = cluster.JoinAsync(nonExisting, cts.Token); LeaderActions(); await task; }) @@ -424,8 +425,9 @@ public async Task A_cluster_JoinSeedNodesAsync_must_fail_if_could_not_connect_to { await Awaiting(async () => { + using var cts = new CancellationTokenSource(10.Seconds()); var nonExisting = Address.Parse($"akka.tcp://{selfAddress.System}@127.0.0.1:9999/"); - var task = cluster.JoinSeedNodesAsync(new[] { nonExisting }); + var task = cluster.JoinSeedNodesAsync(new[] { nonExisting }, cts.Token); LeaderActions(); await task; }) @@ -536,7 +538,7 @@ public async Task A_cluster_must_terminate_ActorSystem_via_leave_CoordinatedShut var probe = CreateTestProbe(sys2); Cluster.Get(sys2).Subscribe(probe.Ref, typeof(ClusterEvent.IMemberEvent)); await probe.ExpectMsgAsync(); - await Cluster.Get(sys2).JoinAsync(Cluster.Get(sys2).SelfAddress); + await Cluster.Get(sys2).JoinAsync(Cluster.Get(sys2).SelfAddress).ShouldCompleteWithin(10.Seconds()); await probe.ExpectMsgAsync(); Cluster.Get(sys2).Leave(Cluster.Get(sys2).SelfAddress); @@ -571,7 +573,7 @@ public async Task A_cluster_must_terminate_ActorSystem_via_Down_CoordinatedShutd var probe = CreateTestProbe(sys3); Cluster.Get(sys3).Subscribe(probe.Ref, typeof(ClusterEvent.IMemberEvent)); await probe.ExpectMsgAsync(); - await Cluster.Get(sys3).JoinAsync(Cluster.Get(sys3).SelfAddress); + await Cluster.Get(sys3).JoinAsync(Cluster.Get(sys3).SelfAddress).ShouldCompleteWithin(10.Seconds()); await probe.ExpectMsgAsync(); Cluster.Get(sys3).Down(Cluster.Get(sys3).SelfAddress); diff --git a/src/core/Akka.Cluster/Cluster.cs b/src/core/Akka.Cluster/Cluster.cs index 28016b980b2..9c7661a5fee 100644 --- a/src/core/Akka.Cluster/Cluster.cs +++ b/src/core/Akka.Cluster/Cluster.cs @@ -243,16 +243,6 @@ public void Join(Address address) ClusterCore.Tell(new ClusterUserAction.JoinTo(FillLocal(address))); } - /// - /// Want at least 10-20 seconds of leeway here. - /// - private TimeSpan ComputeJoinTimeLimit() - { - return TimeSpan.FromMilliseconds(Math.Max( - (Settings.RetryUnsuccessfulJoinAfter ?? TimeSpan.FromSeconds(10)).TotalMilliseconds, - TimeSpan.FromSeconds(10).TotalMilliseconds)); - } - /// /// Try to asynchronously join this cluster node specified by . /// A command is sent to the node to join. Returned task will be completed @@ -282,18 +272,17 @@ public Task JoinAsync(Address address, CancellationToken token = default) var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token); - timeoutCts.CancelAfter(ComputeJoinTimeLimit()); - timeoutCts.Token.Register(() => + if (token != default) { - timeoutCts.Dispose(); - completion.TrySetException(new ClusterJoinFailedException( - $"Node has not managed to join the cluster using provided address: {address}")); - }); - + token.Register(() => + { + completion.TrySetException(new ClusterJoinFailedException( + $"Node has not managed to join the cluster using provided address: {address}")); + }); + } + RegisterOnMemberUp(() => { - timeoutCts.Dispose(); completion.TrySetResult(NotUsed.Instance); }); @@ -358,19 +347,18 @@ public Task JoinSeedNodesAsync(IEnumerable
seedNodes, CancellationToken var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var nodes = seedNodes.ToList(); - - var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token); - timeoutCts.CancelAfter(ComputeJoinTimeLimit()); - timeoutCts.Token.Register(() => + + if (token != default) { - timeoutCts.Dispose(); - completion.TrySetException(new ClusterJoinFailedException( - $"Node has not managed to join the cluster using provided addresses: [{string.Join(",", nodes)}]")); - }); + token.Register(() => + { + completion.TrySetException(new ClusterJoinFailedException( + $"Node has not managed to join the cluster using provided addresses: [{string.Join(",", nodes)}]")); + }); + } RegisterOnMemberUp(() => { - timeoutCts.Dispose(); completion.TrySetResult(NotUsed.Instance); });