Skip to content

Commit

Permalink
Remove JoinAsync and JoinSeedNodesAsync default timeout values (#6473)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Feb 28, 2023
1 parent 1a63cd9 commit 6d24919
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 32 deletions.
10 changes: 6 additions & 4 deletions src/core/Akka.Cluster.Tests/ClusterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,9 @@ public async Task A_cluster_JoinAsync_must_fail_if_could_not_connect_to_cluster(
{
await Awaiting(async () =>
{
using var cts = new CancellationTokenSource(10.Seconds());
var nonExisting = Address.Parse($"akka.tcp://{selfAddress.System}@127.0.0.1:9999/");
var task = cluster.JoinAsync(nonExisting);
var task = cluster.JoinAsync(nonExisting, cts.Token);
LeaderActions();
await task;
})
Expand Down Expand Up @@ -424,8 +425,9 @@ public async Task A_cluster_JoinSeedNodesAsync_must_fail_if_could_not_connect_to
{
await Awaiting(async () =>
{
using var cts = new CancellationTokenSource(10.Seconds());
var nonExisting = Address.Parse($"akka.tcp://{selfAddress.System}@127.0.0.1:9999/");
var task = cluster.JoinSeedNodesAsync(new[] { nonExisting });
var task = cluster.JoinSeedNodesAsync(new[] { nonExisting }, cts.Token);
LeaderActions();
await task;
})
Expand Down Expand Up @@ -536,7 +538,7 @@ public async Task A_cluster_must_terminate_ActorSystem_via_leave_CoordinatedShut
var probe = CreateTestProbe(sys2);
Cluster.Get(sys2).Subscribe(probe.Ref, typeof(ClusterEvent.IMemberEvent));
await probe.ExpectMsgAsync<ClusterEvent.CurrentClusterState>();
await Cluster.Get(sys2).JoinAsync(Cluster.Get(sys2).SelfAddress);
await Cluster.Get(sys2).JoinAsync(Cluster.Get(sys2).SelfAddress).ShouldCompleteWithin(10.Seconds());
await probe.ExpectMsgAsync<ClusterEvent.MemberUp>();

Cluster.Get(sys2).Leave(Cluster.Get(sys2).SelfAddress);
Expand Down Expand Up @@ -571,7 +573,7 @@ public async Task A_cluster_must_terminate_ActorSystem_via_Down_CoordinatedShutd
var probe = CreateTestProbe(sys3);
Cluster.Get(sys3).Subscribe(probe.Ref, typeof(ClusterEvent.IMemberEvent));
await probe.ExpectMsgAsync<ClusterEvent.CurrentClusterState>();
await Cluster.Get(sys3).JoinAsync(Cluster.Get(sys3).SelfAddress);
await Cluster.Get(sys3).JoinAsync(Cluster.Get(sys3).SelfAddress).ShouldCompleteWithin(10.Seconds());
await probe.ExpectMsgAsync<ClusterEvent.MemberUp>();

Cluster.Get(sys3).Down(Cluster.Get(sys3).SelfAddress);
Expand Down
44 changes: 16 additions & 28 deletions src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,6 @@ public void Join(Address address)
ClusterCore.Tell(new ClusterUserAction.JoinTo(FillLocal(address)));
}

/// <summary>
/// Want at least 10-20 seconds of leeway here.
/// </summary>
private TimeSpan ComputeJoinTimeLimit()
{
return TimeSpan.FromMilliseconds(Math.Max(
(Settings.RetryUnsuccessfulJoinAfter ?? TimeSpan.FromSeconds(10)).TotalMilliseconds,
TimeSpan.FromSeconds(10).TotalMilliseconds));
}

/// <summary>
/// Try to asynchronously join this cluster node specified by <paramref name="address"/>.
/// A <see cref="Join"/> command is sent to the node to join. Returned task will be completed
Expand Down Expand Up @@ -282,18 +272,17 @@ public Task JoinAsync(Address address, CancellationToken token = default)

var completion = new TaskCompletionSource<NotUsed>(TaskCreationOptions.RunContinuationsAsynchronously);

var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token);
timeoutCts.CancelAfter(ComputeJoinTimeLimit());
timeoutCts.Token.Register(() =>
if (token != default)
{
timeoutCts.Dispose();
completion.TrySetException(new ClusterJoinFailedException(
$"Node has not managed to join the cluster using provided address: {address}"));
});

token.Register(() =>
{
completion.TrySetException(new ClusterJoinFailedException(
$"Node has not managed to join the cluster using provided address: {address}"));
});
}

RegisterOnMemberUp(() =>
{
timeoutCts.Dispose();
completion.TrySetResult(NotUsed.Instance);
});

Expand Down Expand Up @@ -358,19 +347,18 @@ public Task JoinSeedNodesAsync(IEnumerable<Address> seedNodes, CancellationToken

var completion = new TaskCompletionSource<NotUsed>(TaskCreationOptions.RunContinuationsAsynchronously);
var nodes = seedNodes.ToList();

var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(token);
timeoutCts.CancelAfter(ComputeJoinTimeLimit());
timeoutCts.Token.Register(() =>

if (token != default)
{
timeoutCts.Dispose();
completion.TrySetException(new ClusterJoinFailedException(
$"Node has not managed to join the cluster using provided addresses: [{string.Join(",", nodes)}]"));
});
token.Register(() =>
{
completion.TrySetException(new ClusterJoinFailedException(
$"Node has not managed to join the cluster using provided addresses: [{string.Join(",", nodes)}]"));
});
}

RegisterOnMemberUp(() =>
{
timeoutCts.Dispose();
completion.TrySetResult(NotUsed.Instance);
});

Expand Down

0 comments on commit 6d24919

Please sign in to comment.