Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClusterStressSpec and Cluster Failure Detector Cleanup #4940

Merged
Prev Previous commit
Next Next commit
added assert invariants to build script
cleaned up gossip class to assert more invariants
  • Loading branch information
Aaronontheweb committed Apr 16, 2021
commit c71441eb056f4caa94038f5f926c76316890c6be
2 changes: 2 additions & 0 deletions build.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ Target "MultiNodeTests" (fun _ ->

Target "MultiNodeTestsNetCore" (fun _ ->
if not skipBuild.Value then
setEnvironVar "akka.cluster.assert" "on" // needed to enable assert invariants for Akka.Cluster
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Causes MNTR runs to blow up if the Gossip class does something it shouldn't

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be picked up by Akka, or do we need to change it to AKKA_CLUSTER_ASSERT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't realize it was picked up as is inside the code, but maybe we should follow our own environment variable naming standard by making this AKKA_CLUSTER_ASSERT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we should - I'll update on that on a subsequent PR. I need to tweak some of the timing settings for StressSpec since the build server is way underpowered compared to my development machine. I'll address that with this too.

let multiNodeTestPath = findToolInSubPath "Akka.MultiNodeTestRunner.dll" (currentDirectory @@ "src" @@ "core" @@ "Akka.MultiNodeTestRunner" @@ "bin" @@ "Release" @@ testNetCoreVersion @@ "win10-x64" @@ "publish")

let projects =
Expand Down Expand Up @@ -388,6 +389,7 @@ Target "MultiNodeTestsNetCore" (fun _ ->
)
Target "MultiNodeTestsNet" (fun _ ->
if not skipBuild.Value then
setEnvironVar "akka.cluster.assert" "on" // needed to enable assert invariants for Akka.Cluster
let multiNodeTestPath = findToolInSubPath "Akka.MultiNodeTestRunner.dll" (currentDirectory @@ "src" @@ "core" @@ "Akka.MultiNodeTestRunner" @@ "bin" @@ "Release" @@ testNetVersion @@ "win10-x64" @@ "publish")

let projects =
Expand Down
96 changes: 82 additions & 14 deletions src/core/Akka.Cluster.Tests.MultiNode/StressSpec.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
using System;
//-----------------------------------------------------------------------
// <copyright file="StressSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
Expand Down Expand Up @@ -68,8 +75,8 @@ public StressSpecConfig()
}
akka.actor.provider = cluster
akka.cluster {
failure-detector.acceptable-heartbeat-pause = 3s
downing-provider-class = ""Akka.Cluster.SBR.SplitBrainResolverProvider, Akka.Cluster""
failure-detector.acceptable-heartbeat-pause = 3s
downing-provider-class = ""Akka.Cluster.SplitBrainResolver, Akka.Cluster""
split-brain-resolver {
active-strategy = keep-majority #TODO: remove this once it's been made default
stable-after = 10s
Expand Down Expand Up @@ -266,8 +273,7 @@ public string FormatPhi()
if (_phiValuesObservedByNode.IsEmpty) return string.Empty;
else
{
//var lines = _phiValuesObservedByNode.Select(
// x => x.Value.SelectMany(y => FormatPhiLine(x.Key, y.Address, y)));

var lines = (from mon in _phiValuesObservedByNode from phi in mon.Value select FormatPhiLine(mon.Key, phi.Address, phi));
return FormatPhiHeader + Environment.NewLine + string.Join(Environment.NewLine, lines);
}
Expand Down Expand Up @@ -917,26 +923,36 @@ string FormatNodeLeave()
var currentRoles = Roles.Take(NbrUsedRoles - 1).ToArray();
var title = $"{FormatNodeLeave()} one from {NbrUsedRoles} nodes cluster";
CreateResultAggregator(title, expectedResults:currentRoles.Length, true);

var removeRole = Roles[NbrUsedRoles - 1];
var removeAddress = GetAddress(removeRole);

Console.WriteLine($"Preparing to {FormatNodeLeave()}[{removeAddress}] role [{removeRole.Name}] out of [{Roles.Count}]");
RunOn(() =>
{
var watchee = Sys.ActorOf(Props.Create(() => new Watchee()), "watchee");
if(!shutdown)
Cluster.Leave(GetAddress(Myself));
Console.WriteLine("Created watchee [{0}]", watchee);
}, removeRole);

EnterBarrier("watchee-created-" + Step);

RunOn(() =>
{
Sys.ActorSelection(Node(removeRole) / "user" / "watchee").Tell(new Identify("watchee"), IdentifyProbe.Ref);
var watchee = IdentifyProbe.ExpectMsg<ActorIdentity>().Subject;
Watch(watchee);
AwaitAssert(() =>
{
Sys.ActorSelection(new RootActorPath(removeAddress) / "user" / "watchee").Tell(new Identify("watchee"), IdentifyProbe.Ref);
var watchee = IdentifyProbe.ExpectMsg<ActorIdentity>(TimeSpan.FromSeconds(1)).Subject;
Watch(watchee);
}, interval:TimeSpan.FromSeconds(1.25d));

}, Roles.First());
EnterBarrier("watchee-established-" + Step);

RunOn(() =>
{
if (!shutdown)
Cluster.Leave(GetAddress(Myself));
}, removeRole);

RunOn(() =>
{
ReportResult(() =>
Expand All @@ -950,7 +966,7 @@ string FormatNodeLeave()
Log.Info("Shutting down [{0}]", removeAddress);
}

TestConductor.Exit(removeRole, 0).Wait(RemainingOrDefault);
TestConductor.Exit(removeRole, 0).Wait();
}
}, Roles.First());

Expand Down Expand Up @@ -1041,7 +1057,7 @@ public void PartitionSeveral(int numberOfNodes)
{
foreach (var y in removeRoles)
{
TestConductor.Blackhole(x, y, ThrottleTransportAdapter.Direction.Both);
TestConductor.Blackhole(x, y, ThrottleTransportAdapter.Direction.Both).Wait();
}
}
}, Roles.First());
Expand Down Expand Up @@ -1222,6 +1238,17 @@ public void Cluster_under_stress()
IncrementStep();
MustLeaveNodesOneByOneFromLargeCluster();
IncrementStep();
MustShutdownNodesOneByOneFromLargeCluster();
IncrementStep();
MustLeaveSeveralNodes();
IncrementStep();
MustShutdownSeveralNodes();
IncrementStep();
MustShutdownNodesOneByOneFromSmallCluster();
IncrementStep();
MustLeaveNodesOneByOneFromSmallCluster();
IncrementStep();
MustLogClrInfo();
}

public void MustLogSettings()
Expand Down Expand Up @@ -1307,7 +1334,7 @@ public void MustGossipWhenIdle()
public void MustDownPartitionedNodes()
{
PartitionSeveral(Settings.NumberOfNodesPartition);
NbrUsedRoles += Settings.NumberOfNodesPartition;
NbrUsedRoles -= Settings.NumberOfNodesPartition;
EnterBarrier("after-" + Step);
}

Expand All @@ -1316,5 +1343,46 @@ public void MustLeaveNodesOneByOneFromLargeCluster()
RemoveOneByOne(Settings.NumberOfNodesLeavingOneByOneLarge, shutdown:false);
EnterBarrier("after-" + Step);
}

public void MustShutdownNodesOneByOneFromLargeCluster()
{
RemoveOneByOne(Settings.NumberOfNodesShutdownOneByOneLarge, shutdown: true);
EnterBarrier("after-" + Step);
}

public void MustLeaveSeveralNodes()
{
RemoveSeveral(Settings.NumberOfNodesLeaving, shutdown: false);
NbrUsedRoles -= Settings.NumberOfNodesLeaving;
EnterBarrier("after-" + Step);
}

public void MustShutdownSeveralNodes()
{
RemoveSeveral(Settings.NumberOfNodesShutdown, shutdown: true);
NbrUsedRoles -= Settings.NumberOfNodesShutdown;
EnterBarrier("after-" + Step);
}

public void MustShutdownNodesOneByOneFromSmallCluster()
{
RemoveOneByOne(Settings.NumberOfNodesShutdownOneByOneSmall, true);
EnterBarrier("after-" + Step);
}

public void MustLeaveNodesOneByOneFromSmallCluster()
{
RemoveOneByOne(Settings.NumberOfNodesLeavingOneByOneSmall, false);
EnterBarrier("after-" + Step);
}

public void MustLogClrInfo()
{
if (Settings.Infolog)
{
Log.Info("StressSpec CLR: " + Environment.NewLine + "{0}", ClrInfo());
}
EnterBarrier("after-" + Step);
}
}
}
38 changes: 23 additions & 15 deletions src/core/Akka.Cluster/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,25 +156,33 @@ public Gossip Copy(ImmutableSortedSet<Member> members = null, GossipOverview ove

private void AssertInvariants()
{
if (_members.Any(m => m.Status == MemberStatus.Removed))
void IfTrueThrow(bool func, string expected, string actual)
{
var members = string.Join(", ", _members.Where(m => m.Status == MemberStatus.Removed).Select(m => m.ToString()));
throw new ArgumentException($"Live members must not have status [Removed], got {members}", nameof(_members));
if (func) throw new ArgumentException($"{expected}, but found [{actual}]");
}


IfTrueThrow(_members.Any(m => m.Status == MemberStatus.Removed),
expected: "Live members must not have status [Removed]",
actual: string.Join(", ",
_members.Where(m => m.Status == MemberStatus.Removed).Select(m => m.ToString())));


var inReachabilityButNotMember = _overview.Reachability.AllObservers.Except(_members.Select(m => m.UniqueAddress));
if (!inReachabilityButNotMember.IsEmpty)
{
var inreachability = string.Join(", ", inReachabilityButNotMember.Select(a => a.ToString()));
throw new ArgumentException($"Nodes not part of cluster in reachability table, got {inreachability}", nameof(_overview));
}
IfTrueThrow(!inReachabilityButNotMember.IsEmpty,
expected: "Nodes not part of cluster in reachability table",
actual: string.Join(", ", inReachabilityButNotMember.Select(a => a.ToString())));

var inReachabilityVersionsButNotMember =
_overview.Reachability.Versions.Keys.Except(Members.Select(x => x.UniqueAddress)).ToImmutableHashSet();
IfTrueThrow(!inReachabilityVersionsButNotMember.IsEmpty,
expected: "Nodes not part of cluster in reachability versions table",
actual: string.Join(", ", inReachabilityVersionsButNotMember.Select(a => a.ToString())));

var seenButNotMember = _overview.Seen.Except(_members.Select(m => m.UniqueAddress));
if (!seenButNotMember.IsEmpty)
{
var seen = string.Join(", ", seenButNotMember.Select(a => a.ToString()));
throw new ArgumentException($"Nodes not part of cluster have marked the Gossip as seen, got {seen}", nameof(_overview));
}
IfTrueThrow(!seenButNotMember.IsEmpty,
expected: "Nodes not part of cluster have marked the Gossip as seen",
actual: string.Join(", ", seenButNotMember.Select(a => a.ToString())));
}

//TODO: Serializer should ignore
Expand Down Expand Up @@ -274,7 +282,7 @@ public Gossip Merge(Gossip that)
var mergedMembers = EmptyMembers.Union(Member.PickHighestPriority(this._members, that._members));

// 3. merge reachability table by picking records with highest version
var mergedReachability = this._overview.Reachability.Merge(mergedMembers.Select(m => m.UniqueAddress),
var mergedReachability = _overview.Reachability.Merge(mergedMembers.Select(m => m.UniqueAddress),
that._overview.Reachability);

// 4. Nobody can have seen this new gossip yet
Expand Down Expand Up @@ -448,7 +456,7 @@ public override string ToString()
/// <summary>
/// Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes.
/// </summary>
class GossipOverview
internal class GossipOverview
{
readonly ImmutableHashSet<UniqueAddress> _seen;
readonly Reachability _reachability;
Expand Down