Skip to content

Commit

Permalink
Configure duration for applying MemberStatus.WeaklyUp to joining no…
Browse files Browse the repository at this point in the history
…des (#4946)

* Configure duration for applying `MemberStatus.WeaklyUp`  to joining nodes

port of akka/akka#29665

* fixed validation check for TimeSpan duration passed in via HOCON

* harden ClusterLogSpecs
  • Loading branch information
Aaronontheweb authored Apr 19, 2021
1 parent 65e5a22 commit 19f1580
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ namespace Akka.Cluster
public string UseDispatcher { get; }
public bool VerboseGossipReceivedLogging { get; }
public bool VerboseHeartbeatLogging { get; }
public System.TimeSpan WeaklyUpAfter { get; }
}
[Akka.Annotations.InternalApiAttribute()]
public interface IClusterActorRefProvider : Akka.Actor.IActorRefProvider, Akka.Remote.IRemoteActorRefProvider { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public MemberWeaklyUpConfig()
CommonConfig = DebugConfig(on: false)
.WithFallback(ConfigurationFactory.ParseString(@"
akka.remote.retry-gate-closed-for = 3s
akka.cluster.allow-weakly-up-members = on"))
akka.cluster.allow-weakly-up-members = 3s"))
.WithFallback(MultiNodeClusterSpec.ClusterConfig());

TestTransport = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public MinMembersBeforeUpWithWeaklyUpSpecConfig()

CommonConfig = ConfigurationFactory.ParseString(@"
akka.cluster.min-nr-of-members = 3
akka.cluster.allow-weakly-up-members = on
akka.cluster.allow-weakly-up-members = 3s
").WithFallback(MultiNodeClusterSpec.ClusterConfigWithFailureDetectorPuppet());
}
}
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka.Cluster.Tests/ClusterConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void Clustering_must_be_able_to_parse_generic_cluster_config_elements()
settings.LeaderActionsInterval.Should().Be(1.Seconds());
settings.UnreachableNodesReaperInterval.Should().Be(1.Seconds());
settings.AllowWeaklyUpMembers.Should().BeTrue();
settings.WeaklyUpAfter.Should().Be(7.Seconds());
settings.PublishStatsInterval.Should().NotHaveValue();
settings.AutoDownUnreachableAfter.Should().NotHaveValue();
settings.DownRemovalMargin.Should().Be(TimeSpan.Zero);
Expand Down
16 changes: 11 additions & 5 deletions src/core/Akka.Cluster.Tests/ClusterLogSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ protected ClusterLogSpec(ITestOutputHelper output, Config config = null)

protected void AwaitUp()
{
AwaitCondition(() => ClusterView.IsSingletonCluster);
ClusterView.Self.Address.ShouldBe(_selfAddress);
ClusterView.Members.Select(m => m.Address).ShouldBe(new Address[] { _selfAddress });
AwaitAssert(() => ClusterView.Status.ShouldBe(MemberStatus.Up));
Within(TimeSpan.FromSeconds(10), () =>
{
AwaitCondition(() => ClusterView.IsSingletonCluster);
ClusterView.Self.Address.ShouldBe(_selfAddress);
ClusterView.Members.Select(m => m.Address).ShouldBe(new Address[] { _selfAddress });
AwaitAssert(() => ClusterView.Status.ShouldBe(MemberStatus.Up));
});
}
/// <summary>
/// The expected log info pattern to intercept after a <see cref="Cluster.Join(Address)"/>.
Expand All @@ -71,9 +74,12 @@ protected void Join(string expected)
/// <param name="expected"></param>
protected void Down(string expected)
{
EventFilter
Within(TimeSpan.FromSeconds(10), () =>
{
EventFilter
.Info(contains: expected)
.ExpectOne(() => _cluster.Down(_selfAddress));
});
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Cluster/ClusterDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2174,7 +2174,7 @@ public void LeaderActions()
{
_leaderActionCounter += 1;

if (_cluster.Settings.AllowWeaklyUpMembers && _leaderActionCounter >= 3)
if (_cluster.Settings.AllowWeaklyUpMembers && (_leaderActionCounter * _cluster.Settings.LeaderActionsInterval.TotalMilliseconds) >= _cluster.Settings.WeaklyUpAfter.TotalMilliseconds)
MoveJoiningToWeaklyUp();

if (_leaderActionCounter == firstNotice || _leaderActionCounter % periodicNotice == 0)
Expand Down
41 changes: 36 additions & 5 deletions src/core/Akka.Cluster/ClusterSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,29 @@ public ClusterSettings(Config config, string systemName)
DowningProviderType = typeof(NoDowning);

RunCoordinatedShutdownWhenDown = clusterConfig.GetBoolean("run-coordinated-shutdown-when-down", false);
AllowWeaklyUpMembers = clusterConfig.GetBoolean("allow-weakly-up-members", false);

// TODO: replace with a switch expression when we upgrade to C#8 or later
TimeSpan GetWeaklyUpDuration()
{
var cKey = "allow-weakly-up-members";
switch (clusterConfig.GetString(cKey, string.Empty)
.ToLowerInvariant())
{
case "off":
return TimeSpan.Zero;
case "on":

return TimeSpan.FromSeconds(7); // for backwards compatibility when it wasn't a duration
default:
var val = clusterConfig.GetTimeSpan(cKey, TimeSpan.FromSeconds(7));
if(!(val > TimeSpan.Zero))
throw new ConfigurationException($"Valid settings for [akka.cluster.{cKey}] are 'off', 'on', or a timespan greater than 0s. Received [{val}]");
return val;
}
}

WeaklyUpAfter = GetWeaklyUpDuration();

}

/// <summary>
Expand Down Expand Up @@ -263,12 +285,21 @@ public ClusterSettings(Config config, string systemName)
/// <summary>
/// If this is set to "off", the leader will not move <see cref="MemberStatus.Joining"/> members to <see cref="MemberStatus.Up"/> during a network
/// split. This feature allows the leader to accept <see cref="MemberStatus.Joining"/> members to be <see cref="MemberStatus.WeaklyUp"/>
/// so they become part of the cluster even during a network split. The leader will
/// move <see cref="MemberStatus.Joining"/> members to <see cref="MemberStatus.WeaklyUp"/> after 3 rounds of 'leader-actions-interval'
/// without convergence.
/// so they become part of the cluster even during a network split.
///
/// The leader will move <see cref="MemberStatus.WeaklyUp"/> members to <see cref="MemberStatus.Up"/> status once convergence has been reached.
/// </summary>
public bool AllowWeaklyUpMembers => WeaklyUpAfter != TimeSpan.Zero;

/// <summary>
/// The duration after which a member who is currently <see cref="MemberStatus.Joining"/> will be marked as
/// <see cref="MemberStatus.WeaklyUp"/> in the event that members of the cluster are currently unreachable.
///
/// This is designed to allow new cluster members to perform work even in the event of a cluster split.
///
/// The leader will move <see cref="MemberStatus.WeaklyUp"/> members to <see cref="MemberStatus.Up"/> status once convergence has been reached.
/// </summary>
public bool AllowWeaklyUpMembers { get; }
public TimeSpan WeaklyUpAfter { get; }
}
}

5 changes: 2 additions & 3 deletions src/core/Akka.Cluster/Configuration/Cluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ akka {
# If this is set to "off", the leader will not move 'Joining' members to 'Up' during a network
# split. This feature allows the leader to accept 'Joining' members to be 'WeaklyUp'
# so they become part of the cluster even during a network split. The leader will
# move `Joining` members to 'WeaklyUp' after 3 rounds of 'leader-actions-interval'
# without convergence.
# move `Joining` members to 'WeaklyUp' after this configured duration without convergence.
# The leader will move 'WeaklyUp' members to 'Up' status once convergence has been reached.
allow-weakly-up-members = on
allow-weakly-up-members = 7s

# The roles of this member. List of strings, e.g. roles = ["A", "B"].
# The roles are part of the membership information and can be used by
Expand Down

0 comments on commit 19f1580

Please sign in to comment.