diff --git a/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs b/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs index 052002a0666..02e61c2a11e 100644 --- a/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterDomainEventPublisherSpec.cs @@ -35,15 +35,25 @@ public class ClusterDomainEventPublisherSpec : AkkaSpec static readonly Member dUp = TestMember.Create(new Address("akka.tcp", "sys", "d", 2552), MemberStatus.Up, ImmutableHashSet.Create("GRP")); static readonly Gossip g0 = new Gossip(ImmutableSortedSet.Create(aUp)).Seen(aUp.UniqueAddress); + static readonly MembershipState state0 = new MembershipState(g0, aUp.UniqueAddress); static readonly Gossip g1 = new Gossip(ImmutableSortedSet.Create(aUp, cJoining)).Seen(aUp.UniqueAddress).Seen(cJoining.UniqueAddress); + static readonly MembershipState state1 = new MembershipState(g1, aUp.UniqueAddress); static readonly Gossip g2 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp)).Seen(aUp.UniqueAddress); + static readonly MembershipState state2 = new MembershipState(g2, aUp.UniqueAddress); static readonly Gossip g3 = g2.Seen(bExiting.UniqueAddress).Seen(cUp.UniqueAddress); + static readonly MembershipState state3 = new MembershipState(g3, aUp.UniqueAddress); static readonly Gossip g4 = new Gossip(ImmutableSortedSet.Create(a51Up, aUp, bExiting, cUp)).Seen(aUp.UniqueAddress); + static readonly MembershipState state4 = new MembershipState(g4, aUp.UniqueAddress); static readonly Gossip g5 = new Gossip(ImmutableSortedSet.Create(a51Up, aUp, bExiting, cUp)).Seen(aUp.UniqueAddress).Seen(bExiting.UniqueAddress).Seen(cUp.UniqueAddress); + static readonly MembershipState state5 = new MembershipState(g5, aUp.UniqueAddress); static readonly Gossip g6 = new Gossip(ImmutableSortedSet.Create(aLeaving, bExiting, cUp)).Seen(aUp.UniqueAddress); + static readonly MembershipState state6 = new MembershipState(g6, aUp.UniqueAddress); static readonly Gossip g7 = new Gossip(ImmutableSortedSet.Create(aExiting, bExiting, cUp)).Seen(aUp.UniqueAddress); + static readonly MembershipState state7 = new MembershipState(g7, aUp.UniqueAddress); static readonly Gossip g8 = new Gossip(ImmutableSortedSet.Create(aUp, bExiting, cUp, dUp), new GossipOverview(Reachability.Empty.Unreachable(aUp.UniqueAddress, dUp.UniqueAddress))).Seen(aUp.UniqueAddress); + static readonly MembershipState state8 = new MembershipState(g8, aUp.UniqueAddress); + static readonly MembershipState _emptyMembershipState = new MembershipState(Gossip.Empty, aUp.UniqueAddress); readonly TestProbe _memberSubscriber; public ClusterDomainEventPublisherSpec() : base(Config) @@ -54,7 +64,7 @@ public ClusterDomainEventPublisherSpec() : base(Config) Sys.EventStream.Subscribe(_memberSubscriber.Ref, typeof(ClusterEvent.ClusterShuttingDown)); _publisher = Sys.ActorOf(Props.Create()); - _publisher.Tell(new InternalClusterAction.PublishChanges(g0)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state0)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(aUp)); _memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(aUp.Address)); } @@ -62,15 +72,15 @@ public ClusterDomainEventPublisherSpec() : base(Config) [Fact] public void ClusterDomainEventPublisher_must_publish_MemberJoined() { - _publisher.Tell(new InternalClusterAction.PublishChanges(g1)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state1)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberJoined(cJoining)); } [Fact] public void ClusterDomainEventPublisher_must_publish_MemberUp() { - _publisher.Tell(new InternalClusterAction.PublishChanges(g2)); - _publisher.Tell(new InternalClusterAction.PublishChanges(g3)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state2)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state3)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); } @@ -78,7 +88,7 @@ public void ClusterDomainEventPublisher_must_publish_MemberUp() [Fact] public void ClusterDomainEventPublisher_must_publish_leader_changed() { - _publisher.Tell(new InternalClusterAction.PublishChanges(g4)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state4)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); @@ -89,17 +99,17 @@ public void ClusterDomainEventPublisher_must_publish_leader_changed() [Fact] public void ClusterDomainEventPublisher_must_publish_leader_changed_when_old_leader_leaves_and_is_removed() { - _publisher.Tell(new InternalClusterAction.PublishChanges(g3)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state3)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); - _publisher.Tell(new InternalClusterAction.PublishChanges(g6)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state6)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberLeft(aLeaving)); - _publisher.Tell(new InternalClusterAction.PublishChanges(g7)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state7)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(aExiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(cUp.Address)); _memberSubscriber.ExpectNoMsg(500.Milliseconds()); // at the removed member a an empty gossip is the last thing - _publisher.Tell(new InternalClusterAction.PublishChanges(Gossip.Empty)); + _publisher.Tell(new InternalClusterAction.PublishChanges(_emptyMembershipState)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Exiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(bRemoved, MemberStatus.Exiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberRemoved(cRemoved, MemberStatus.Up)); @@ -109,13 +119,13 @@ public void ClusterDomainEventPublisher_must_publish_leader_changed_when_old_lea [Fact] public void ClusterDomainEventPublisher_must_not_publish_leader_changed_when_same_leader() { - _publisher.Tell(new InternalClusterAction.PublishChanges(g4)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state4)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(a51Up)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); _memberSubscriber.ExpectMsg(new ClusterEvent.MemberUp(cUp)); _memberSubscriber.ExpectMsg(new ClusterEvent.LeaderChanged(a51Up.Address)); - _publisher.Tell(new InternalClusterAction.PublishChanges(g5)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state5)); _memberSubscriber.ExpectNoMsg(500.Milliseconds()); } @@ -125,9 +135,9 @@ public void ClusterDomainEventPublisher_must_publish_role_leader_changed() var subscriber = CreateTestProbe(); _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.RoleLeaderChanged)))); subscriber.ExpectMsg(); - _publisher.Tell(new InternalClusterAction.PublishChanges(new Gossip(ImmutableSortedSet.Create(cJoining, dUp)))); + _publisher.Tell(new InternalClusterAction.PublishChanges(new MembershipState(new Gossip(ImmutableSortedSet.Create(cJoining, dUp)), dUp.UniqueAddress))); subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", dUp.Address)); - _publisher.Tell(new InternalClusterAction.PublishChanges(new Gossip(ImmutableSortedSet.Create(cUp, dUp)))); + _publisher.Tell(new InternalClusterAction.PublishChanges(new MembershipState(new Gossip(ImmutableSortedSet.Create(cUp, dUp)), dUp.UniqueAddress))); subscriber.ExpectMsg(new ClusterEvent.RoleLeaderChanged("GRP", cUp.Address)); } @@ -145,7 +155,7 @@ public void ClusterDomainEventPublisher_must_send_CurrentClusterState_when_subsc public void ClusterDomainEventPublisher_must_send_events_corresponding_to_current_state_when_subscribe() { var subscriber = CreateTestProbe(); - _publisher.Tell(new InternalClusterAction.PublishChanges(g8)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state8)); _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.ReachabilityEvent)))); subscriber.ReceiveN(4).Should().BeEquivalentTo( @@ -165,7 +175,7 @@ public void ClusterDomainEventPublisher_should_support_unsubscribe() _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.IMemberEvent)))); subscriber.ExpectMsg(); _publisher.Tell(new InternalClusterAction.Unsubscribe(subscriber.Ref, typeof(ClusterEvent.IMemberEvent))); - _publisher.Tell(new InternalClusterAction.PublishChanges(g3)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state3)); subscriber.ExpectNoMsg(500.Milliseconds()); // but memberSubscriber is still subscriber _memberSubscriber.ExpectMsg(new ClusterEvent.MemberExited(bExiting)); @@ -178,10 +188,10 @@ public void ClusterDomainEventPublisher_must_publish_seen_changed() var subscriber = CreateTestProbe(); _publisher.Tell(new InternalClusterAction.Subscribe(subscriber.Ref, ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot, ImmutableHashSet.Create(typeof(ClusterEvent.SeenChanged)))); subscriber.ExpectMsg(); - _publisher.Tell(new InternalClusterAction.PublishChanges(g2)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state2)); subscriber.ExpectMsg(); subscriber.ExpectNoMsg(500.Milliseconds()); - _publisher.Tell(new InternalClusterAction.PublishChanges(g3)); + _publisher.Tell(new InternalClusterAction.PublishChanges(state3)); subscriber.ExpectMsg(); subscriber.ExpectNoMsg(500.Milliseconds()); } diff --git a/src/core/Akka.Cluster.Tests/ClusterDomainEventSpec.cs b/src/core/Akka.Cluster.Tests/ClusterDomainEventSpec.cs index e51fcf945b0..b6aae634582 100644 --- a/src/core/Akka.Cluster.Tests/ClusterDomainEventSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterDomainEventSpec.cs @@ -45,12 +45,17 @@ private static (Gossip, ImmutableHashSet) Converge(Gossip gossip) (t, m) => (t.Item1.Seen(m.UniqueAddress), t.Item2.Add(m.UniqueAddress))); } + private MembershipState State(Gossip g) + { + return new MembershipState(g, selfDummyAddress); + } + [Fact] public void DomainEvents_must_be_empty_for_the_same_gossip() { - var g1 = new Gossip(ImmutableSortedSet.Create(aUp)); + var g1 =new Gossip(ImmutableSortedSet.Create(aUp)); - ClusterEvent.DiffUnreachable(g1, g1, selfDummyAddress) + ClusterEvent.DiffUnreachable(State(g1), State(g1)) .Should() .BeEquivalentTo(ImmutableList.Create()); } @@ -65,15 +70,15 @@ public void DomainEvents_must_be_produced_for_new_members() var g2 = t2.Item1; var s2 = t2.Item2; - ClusterEvent.DiffMemberEvents(g1, g2) + ClusterEvent.DiffMemberEvents(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.MemberUp(bUp), new ClusterEvent.MemberJoined(eJoining))); - ClusterEvent.DiffUnreachable(g1, g2, selfDummyAddress) + ClusterEvent.DiffUnreachable(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create()); - ClusterEvent.DiffSeen(g1, g2, selfDummyAddress) + ClusterEvent.DiffSeen(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.SeenChanged(true, s2.Select(s => s.Address).ToImmutableHashSet()))); } @@ -88,15 +93,15 @@ public void DomainEvents_must_be_produced_for_changed_status_of_members() var g2 = t2.Item1; var s2 = t2.Item2; - ClusterEvent.DiffMemberEvents(g1, g2) + ClusterEvent.DiffMemberEvents(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.MemberUp(aUp), new ClusterEvent.MemberLeft(cLeaving), new ClusterEvent.MemberJoined(eJoining))); - ClusterEvent.DiffUnreachable(g1, g2, selfDummyAddress) + ClusterEvent.DiffUnreachable(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create()); - ClusterEvent.DiffSeen(g1, g2, selfDummyAddress) + ClusterEvent.DiffSeen(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.SeenChanged(true, s2.Select(s => s.Address).ToImmutableHashSet()))); } @@ -112,16 +117,16 @@ public void DomainEvents_must_be_produced_for_members_in_unreachable() Unreachable(aUp.UniqueAddress, bDown.UniqueAddress); var g2 = new Gossip(ImmutableSortedSet.Create(aUp, cUp, bDown, eDown), new GossipOverview(reachability2)); - ClusterEvent.DiffUnreachable(g1, g2, selfDummyAddress) + ClusterEvent.DiffUnreachable(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.UnreachableMember(bDown))); // never include self member in unreachable - ClusterEvent.DiffUnreachable(g1, g2, bDown.UniqueAddress) + ClusterEvent.DiffUnreachable(new MembershipState(g1, bDown.UniqueAddress), new MembershipState(g2, bDown.UniqueAddress)) .Should() .BeEquivalentTo(ImmutableList.Create()); - ClusterEvent.DiffSeen(g1, g2, selfDummyAddress) + ClusterEvent.DiffSeen(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create()); } @@ -140,19 +145,19 @@ public void DomainEvents_must_be_produced_for_members_becoming_reachable_after_u Reachable(aUp.UniqueAddress, bUp.UniqueAddress); var g2 = new Gossip(ImmutableSortedSet.Create(aUp, cUp, bUp, eUp), new GossipOverview(reachability2)); - ClusterEvent.DiffUnreachable(g1, g2, selfDummyAddress) + ClusterEvent.DiffUnreachable(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.UnreachableMember(cUp))); // never include self member in unreachable - ClusterEvent.DiffUnreachable(g1, g2, cUp.UniqueAddress) + ClusterEvent.DiffUnreachable(new MembershipState(g1, cUp.UniqueAddress), new MembershipState(g2, cUp.UniqueAddress)) .Should() .BeEquivalentTo(ImmutableList.Create()); - ClusterEvent.DiffReachable(g1, g2, selfDummyAddress) + ClusterEvent.DiffReachable(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.ReachableMember(bUp))); // never include self member in reachable - ClusterEvent.DiffReachable(g1, g2, bUp.UniqueAddress) + ClusterEvent.DiffReachable(new MembershipState(g1, bUp.UniqueAddress), new MembershipState(g2, bUp.UniqueAddress)) .Should() .BeEquivalentTo(ImmutableList.Create()); } @@ -166,11 +171,11 @@ public void DomainEvents_must_be_produced_for_downed_members() var g1 = t1.Item1; var g2 = t2.Item1; - ClusterEvent.DiffMemberEvents(g1, g2) + ClusterEvent.DiffMemberEvents(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.MemberDowned(eDown))); - ClusterEvent.DiffUnreachable(g1, g2, selfDummyAddress) + ClusterEvent.DiffUnreachable(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create()); } @@ -185,15 +190,15 @@ public void DomainEvents_must_be_produced_for_removed_members() var g2 = t2.Item1; var s2 = t2.Item2; - ClusterEvent.DiffMemberEvents(g1, g2) + ClusterEvent.DiffMemberEvents(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.MemberRemoved(dRemoved, MemberStatus.Exiting))); - ClusterEvent.DiffUnreachable(g1, g2, selfDummyAddress) + ClusterEvent.DiffUnreachable(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create()); - ClusterEvent.DiffSeen(g1, g2, selfDummyAddress) + ClusterEvent.DiffSeen(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.SeenChanged(true, s2.Select(s => s.Address).ToImmutableHashSet()))); } @@ -209,22 +214,22 @@ public void DomainEvents_must_be_produced_for_convergence_changes() .Seen(aUp.UniqueAddress) .Seen(bUp.UniqueAddress); - ClusterEvent.DiffMemberEvents(g1, g2) + ClusterEvent.DiffMemberEvents(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create()); - ClusterEvent.DiffUnreachable(g1, g2, selfDummyAddress) + ClusterEvent.DiffUnreachable(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create()); - ClusterEvent.DiffSeen(g1, g2, selfDummyAddress) + ClusterEvent.DiffSeen(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.SeenChanged(true, ImmutableHashSet.Create(aUp.Address, bUp.Address)))); - ClusterEvent.DiffMemberEvents(g2, g1) + ClusterEvent.DiffMemberEvents(State(g2), State(g1)) .Should() .BeEquivalentTo(ImmutableList.Create()); - ClusterEvent.DiffUnreachable(g1, g1, selfDummyAddress) + ClusterEvent.DiffUnreachable(State(g1), State(g1)) .Should() .BeEquivalentTo(ImmutableList.Create()); - ClusterEvent.DiffSeen(g2, g1, selfDummyAddress) + ClusterEvent.DiffSeen(State(g2), State(g1)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.SeenChanged(true, ImmutableHashSet.Create(aUp.Address, bUp.Address, eJoining.Address)))); } @@ -239,16 +244,16 @@ public void DomainEvents_must_be_produced_for_leader_changes() var g2 = t2.Item1; var s2 = t2.Item2; - ClusterEvent.DiffMemberEvents(g1, g2) + ClusterEvent.DiffMemberEvents(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.MemberRemoved(aRemoved, MemberStatus.Up))); - ClusterEvent.DiffUnreachable(g1, g2, selfDummyAddress) + ClusterEvent.DiffUnreachable(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create()); - ClusterEvent.DiffSeen(g1, g2, selfDummyAddress) + ClusterEvent.DiffSeen(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.SeenChanged(true, s2.Select(a => a.Address).ToImmutableHashSet()))); - ClusterEvent.DiffLeader(g1, g2, selfDummyAddress) + ClusterEvent.DiffLeader(State(g1), State(g2)) .Should() .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.LeaderChanged(bUp.Address))); } @@ -267,13 +272,13 @@ public void DomainEvents_must_be_produced_for_role_leader_changes() new ClusterEvent.RoleLeaderChanged("DD", dLeaving.Address), new ClusterEvent.RoleLeaderChanged("DE", dLeaving.Address), new ClusterEvent.RoleLeaderChanged("EE", eUp.Address)); - ClusterEvent.DiffRolesLeader(g0, g1, selfDummyAddress).Should().BeEquivalentTo(expected); + ClusterEvent.DiffRolesLeader(State(g0), State(g1)).Should().BeEquivalentTo(expected); var expected2 = ImmutableHashSet.Create( new ClusterEvent.RoleLeaderChanged("AA", null), new ClusterEvent.RoleLeaderChanged("AB", bUp.Address), new ClusterEvent.RoleLeaderChanged("DE", eJoining.Address)); - ClusterEvent.DiffRolesLeader(g1, g2, selfDummyAddress).Should().BeEquivalentTo(expected2); + ClusterEvent.DiffRolesLeader(State(g1), State(g2)).Should().BeEquivalentTo(expected2); } } } diff --git a/src/core/Akka.Cluster.Tests/GossipSpec.cs b/src/core/Akka.Cluster.Tests/GossipSpec.cs index c433becacac..d319aef2539 100644 --- a/src/core/Akka.Cluster.Tests/GossipSpec.cs +++ b/src/core/Akka.Cluster.Tests/GossipSpec.cs @@ -31,31 +31,37 @@ public class GossipSpec static readonly Member e2 = TestMember.Create(e1.Address, MemberStatus.Up); static readonly Member e3 = TestMember.Create(e1.Address, MemberStatus.Down); + private MembershipState State(Gossip g, Member selfMember = null) + { + selfMember = selfMember ?? a1; + return new MembershipState(g, selfMember.UniqueAddress); + } + [Fact] public void A_gossip_must_reach_convergence_when_its_empty() { - Gossip.Empty.Convergence(a1.UniqueAddress, new HashSet()).Should().BeTrue(); + State(Gossip.Empty).Convergence(ImmutableHashSet.Empty).Should().BeTrue(); } [Fact] public void A_gossip_must_reach_convergence_for_one_node() { var g1 = new Gossip(ImmutableSortedSet.Create(a1)).Seen(a1.UniqueAddress); - g1.Convergence(a1.UniqueAddress, new HashSet()).Should().BeTrue(); + State(g1).Convergence(ImmutableHashSet.Empty).Should().BeTrue(); } [Fact] public void A_gossip_must_not_reach_convergence_until_all_have_seen_version() { var g1 = new Gossip(ImmutableSortedSet.Create(a1, b1)).Seen(a1.UniqueAddress); - g1.Convergence(a1.UniqueAddress, new HashSet()).Should().BeFalse(); + State(g1).Convergence(ImmutableHashSet.Empty).Should().BeFalse(); } [Fact] public void A_gossip_must_reach_convergence_for_two_nodes() { var g1 = new Gossip(ImmutableSortedSet.Create(a1, b1)).Seen(a1.UniqueAddress).Seen(b1.UniqueAddress); - g1.Convergence(a1.UniqueAddress, new HashSet()).Should().BeTrue(); + State(g1).Convergence(ImmutableHashSet.Empty).Should().BeTrue(); } [Fact] @@ -63,7 +69,7 @@ public void A_gossip_must_reach_convergence_skipping_joining() { // e1 is joining var g1 = new Gossip(ImmutableSortedSet.Create(a1, b1, e1)).Seen(a1.UniqueAddress).Seen(b1.UniqueAddress); - g1.Convergence(a1.UniqueAddress, new HashSet()).Should().BeTrue(); + State(g1).Convergence(ImmutableHashSet.Empty).Should().BeTrue(); } [Fact] @@ -71,7 +77,7 @@ public void A_gossip_must_reach_convergence_skipping_down() { // e3 is down var g1 = new Gossip(ImmutableSortedSet.Create(a1, b1, e3)).Seen(a1.UniqueAddress).Seen(b1.UniqueAddress); - g1.Convergence(a1.UniqueAddress, new HashSet()).Should().BeTrue(); + State(g1).Convergence(ImmutableHashSet.Empty).Should().BeTrue(); } [Fact] @@ -79,7 +85,7 @@ public void A_gossip_must_reach_convergence_skipping_Leaving_with_ExitingConfirm { // c1 is leaving var g1 = new Gossip(ImmutableSortedSet.Create(a1, b1, c1)).Seen(a1.UniqueAddress).Seen(b1.UniqueAddress); - g1.Convergence(a1.UniqueAddress, new HashSet() { c1.UniqueAddress }).Should().BeTrue(); + State(g1).Convergence(ImmutableHashSet.Empty.Add(c1.UniqueAddress)).Should().BeTrue(); } [Fact] @@ -88,7 +94,7 @@ public void A_gossip_must_reach_convergence_skipping_Unreachable_Leaving_with_Ex // c1 is leaving var r1 = Reachability.Empty.Unreachable(b1.UniqueAddress, c1.UniqueAddress); var g1 = new Gossip(ImmutableSortedSet.Create(a1, b1, c1), new GossipOverview(r1)).Seen(a1.UniqueAddress).Seen(b1.UniqueAddress); - g1.Convergence(a1.UniqueAddress, new HashSet() { c1.UniqueAddress }).Should().BeTrue(); + State(g1).Convergence(ImmutableHashSet.Empty.Add(c1.UniqueAddress)).Should().BeTrue(); } [Fact] @@ -97,9 +103,9 @@ public void A_gossip_must_not_reach_convergence_when_unreachable() var r1 = Reachability.Empty.Unreachable(b1.UniqueAddress, a1.UniqueAddress); var g1 = new Gossip(ImmutableSortedSet.Create(a1, b1), new GossipOverview(r1)) .Seen(a1.UniqueAddress).Seen(b1.UniqueAddress); - g1.Convergence(b1.UniqueAddress, new HashSet()).Should().BeFalse(); + State(g1, b1).Convergence(ImmutableHashSet.Empty).Should().BeFalse(); // but from a1's point of view (it knows that itself is not unreachable) - g1.Convergence(a1.UniqueAddress, new HashSet()).Should().BeTrue(); + State(g1).Convergence(ImmutableHashSet.Empty).Should().BeTrue(); } [Fact] @@ -109,7 +115,7 @@ public void A_gossip_must_reach_convergence_when_downed_node_has_observed_unreac var r1 = Reachability.Empty.Unreachable(e3.UniqueAddress, a1.UniqueAddress); var g1 = new Gossip(ImmutableSortedSet.Create(a1, b1, e3), new GossipOverview(r1)) .Seen(a1.UniqueAddress).Seen(b1.UniqueAddress).Seen(e3.UniqueAddress); - g1.Convergence(b1.UniqueAddress, new HashSet()).Should().BeTrue(); + State(g1, b1).Convergence(ImmutableHashSet.Empty).Should().BeTrue(); } [Fact] @@ -171,15 +177,15 @@ public void A_gossip_must_merge_members_by_removing_removed_members() [Fact] public void A_gossip_must_have_leader_as_first_member_based_on_ordering_except_exiting_status() { - new Gossip(ImmutableSortedSet.Create(c2, e2)).Leader(c2.UniqueAddress).Should().Be(c2.UniqueAddress); - new Gossip(ImmutableSortedSet.Create(c3, e2)).Leader(c3.UniqueAddress).Should().Be(e2.UniqueAddress); - new Gossip(ImmutableSortedSet.Create(c3)).Leader(c3.UniqueAddress).Should().Be(c3.UniqueAddress); + State(new Gossip(ImmutableSortedSet.Create(c2, e2))).Leader.Should().Be(c2.UniqueAddress); + State(new Gossip(ImmutableSortedSet.Create(c3, e2))).Leader.Should().Be(e2.UniqueAddress); + State(new Gossip(ImmutableSortedSet.Create(c3))).Leader.Should().Be(c3.UniqueAddress); } [Fact] public void A_gossip_must_not_have_Down_member_as_leader() { - new Gossip(ImmutableSortedSet.Create(e3)).Leader(e3.UniqueAddress).Should().BeNull(); + State(new Gossip(ImmutableSortedSet.Create(e3))).Leader.Should().BeNull(); } [Fact] diff --git a/src/core/Akka.Cluster.Tests/MemberOrderingModelBasedTests.cs b/src/core/Akka.Cluster.Tests/MemberOrderingModelBasedTests.cs index 34e29660ed1..ea56ea2814a 100644 --- a/src/core/Akka.Cluster.Tests/MemberOrderingModelBasedTests.cs +++ b/src/core/Akka.Cluster.Tests/MemberOrderingModelBasedTests.cs @@ -59,7 +59,7 @@ public Property DistinctMemberAddressesMustCompareDifferently(Address[] addresse } } - public class MembershipMachine : Machine + public class MembershipMachine : Machine { public MembershipMachine() { @@ -67,19 +67,19 @@ public MembershipMachine() Arb.Register(); } - public override Gen> Next(MembershipModel model) + public override Gen> Next(MembershipModel model) { if (model.AllMembers.Count == 0) return AddNewMember.Generator(); return Gen.OneOf(ChangeMemberStatus.Generator(model.AllMembers.Keys), AddNewMember.Generator()); } - public override Arbitrary> Setup => Arb.From(Arb.Generate() - .Select(x => (Setup)x)); // compiler ceremony :( + public override Arbitrary> Setup => Arb.From(Arb.Generate() + .Select(x => (Setup)x)); // compiler ceremony :( #region Setup - public class MembershipSetup : Setup + public class MembershipSetup : Setup { private readonly Member[] _members; @@ -92,9 +92,9 @@ public MembershipSetup(UniqueAddress[] addresses) .ToArray(); } - public override MembershipState Actual() + public override MemberOrderingState Actual() { - return new MembershipState() { Members = ImmutableSortedSet.Empty.Union(_members) }; + return new MemberOrderingState() { Members = ImmutableSortedSet.Empty.Union(_members) }; } public override MembershipModel Model() @@ -112,12 +112,12 @@ public override string ToString() #region Operations - public class ChangeMemberStatus : Operation + public class ChangeMemberStatus : Operation { - public static Gen> Generator(IEnumerable
addresses) + public static Gen> Generator(IEnumerable
addresses) { var statusGen = ClusterGenerators.MemberStatusGenerator().Generator; - Func> generator = + Func> generator = (address, status) => new ChangeMemberStatus(address, status); var producer = FsharpDelegateHelper.Create(generator); @@ -143,7 +143,7 @@ public override bool Pre(MembershipModel model) return Member.AllowedTransitions[m.Status].Contains(NewStatus); } - public override Property Check(MembershipState actual, MembershipModel model) + public override Property Check(MemberOrderingState actual, MembershipModel model) { var members = actual.Members; @@ -169,11 +169,11 @@ public override string ToString() } } - public class AddNewMember : Operation + public class AddNewMember : Operation { - public static Gen> Generator() + public static Gen> Generator() { - return Arb.Generate().Select(x => (Operation)x); + return Arb.Generate().Select(x => (Operation)x); } private readonly UniqueAddress _address; @@ -193,7 +193,7 @@ public override bool Pre(MembershipModel model) return !member.UniqueAddress.Equals(_address); } - public override Property Check(MembershipState actual, MembershipModel model) + public override Property Check(MemberOrderingState actual, MembershipModel model) { var members = actual.Members; actual.Members = members.Add(new Member(_address, int.MaxValue, MemberStatus.Up, @@ -222,7 +222,7 @@ public override string ToString() #endregion } - public class MembershipState + public class MemberOrderingState { public ImmutableSortedSet Members { get; set; } } diff --git a/src/core/Akka.Cluster/AutoDown.cs b/src/core/Akka.Cluster/AutoDown.cs index 716e2502dc8..42167898622 100644 --- a/src/core/Akka.Cluster/AutoDown.cs +++ b/src/core/Akka.Cluster/AutoDown.cs @@ -10,6 +10,7 @@ using Akka.Actor; using Akka.Event; using Akka.Configuration; +using static Akka.Cluster.MembershipState; namespace Akka.Cluster { @@ -140,7 +141,7 @@ public override void Down(Address node) internal abstract class AutoDownBase : UntypedActor { private readonly ImmutableHashSet _skipMemberStatus = - Gossip.ConvergenceSkipUnreachableWithMemberStatus; + ConvergenceSkipUnreachableWithMemberStatus; private ImmutableDictionary _scheduledUnreachable = ImmutableDictionary.Create(); diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index 536c362d9b8..3fa6e599aa2 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -17,6 +17,7 @@ using Akka.Util; using Akka.Util.Internal; using Akka.Util.Internal.Collections; +using static Akka.Cluster.MembershipState; namespace Akka.Cluster { @@ -775,18 +776,18 @@ private interface IPublishMessage : INoSerializationVerificationNeeded { } internal sealed class PublishChanges : IPublishMessage { /// - /// Creates a new message with updated gossip. + /// Creates a new message with updated membership state. /// - /// The gossip to publish internally. - internal PublishChanges(Gossip newGossip) + /// The membership state to publish internally. + internal PublishChanges(MembershipState newState) { - NewGossip = newGossip; + NewState = newState; } /// /// The gossip being published. /// - public Gossip NewGossip { get; } + public MembershipState NewState { get; } } /// @@ -1001,7 +1002,8 @@ internal static string VclockName(UniqueAddress node) // note that self is not initially member, // and the SendGossip is not versioned for this 'Node' yet - private Gossip _latestGossip = Gossip.Empty; + private MembershipState _membershipState; + private Gossip LatestGossip => _membershipState.LatestGossip; private readonly bool _statsEnabled; private GossipStats _gossipStats = new GossipStats(); @@ -1017,7 +1019,7 @@ internal static string VclockName(UniqueAddress node) private bool _exitingTasksInProgress = false; private readonly TaskCompletionSource _selfExiting = new TaskCompletionSource(); private readonly CoordinatedShutdown _coordShutdown = CoordinatedShutdown.Get(Context.System); - private HashSet _exitingConfirmed = new HashSet(); + private ImmutableHashSet _exitingConfirmed = ImmutableHashSet.Empty; /// @@ -1027,6 +1029,7 @@ internal static string VclockName(UniqueAddress node) public ClusterCoreDaemon(IActorRef publisher) { _cluster = Cluster.Get(Context.System); + _membershipState = new MembershipState(Gossip.Empty, _cluster.SelfUniqueAddress); _publisher = publisher; SelfUniqueAddress = _cluster.SelfUniqueAddress; _vclockNode = VectorClock.Node.Create(VclockName(SelfUniqueAddress)); @@ -1087,7 +1090,7 @@ private void AddCoordinatedLeave() var self = Self; _coordShutdown.AddTask(CoordinatedShutdown.PhaseClusterExiting, "wait-exiting", () => { - if (_latestGossip.Members.IsEmpty) + if (LatestGossip.Members.IsEmpty) return Task.FromResult(Done.Instance); // not joined yet else return _selfExiting.Task; @@ -1146,7 +1149,7 @@ protected override void PostStop() _gossipTaskCancellable.Cancel(); _failureDetectorReaperTaskCancellable.Cancel(); _leaderActionsTaskCancellable.Cancel(); - if (_publishStatsTaskTaskCancellable != null) _publishStatsTaskTaskCancellable.Cancel(); + _publishStatsTaskTaskCancellable?.Cancel(); _selfExiting.TrySetResult(Done.Instance); } @@ -1157,12 +1160,12 @@ private void ExitingCompleted() _exitingTasksInProgress = false; // status Removed also before joining - if (_latestGossip.GetMember(SelfUniqueAddress).Status != MemberStatus.Removed) + if (_membershipState.SelfMember.Status != MemberStatus.Removed) { // mark as seen - _latestGossip = _latestGossip.Seen(SelfUniqueAddress); + _membershipState = _membershipState.Seen(); AssertLatestGossip(); - Publish(_latestGossip); + PublishMembershipState(); // Let others know (best effort) before shutdown. Otherwise they will not see // convergence of the Exiting state until they have detected this node as @@ -1175,16 +1178,15 @@ private void ExitingCompleted() SendGossipRandom(NumberOfGossipsBeforeShutdownWhenLeaderExits); // send ExitingConfirmed to two potential leaders - var membersWithoutSelf = _latestGossip.Members.Where(m => !m.UniqueAddress.Equals(SelfUniqueAddress)) + var membersWithoutSelf = LatestGossip.Members.Where(m => !m.UniqueAddress.Equals(SelfUniqueAddress)) .ToImmutableSortedSet(); - var leader = _latestGossip.LeaderOf(membersWithoutSelf, SelfUniqueAddress); + var leader = _membershipState.LeaderOf(membersWithoutSelf); if (leader != null) { ClusterCore(leader.Address).Tell(new InternalClusterAction.ExitingConfirmed(SelfUniqueAddress)); var leader2 = - _latestGossip.LeaderOf( - membersWithoutSelf.Where(x => !x.UniqueAddress.Equals(leader)).ToImmutableSortedSet(), - SelfUniqueAddress); + _membershipState.LeaderOf( + membersWithoutSelf.Where(x => !x.UniqueAddress.Equals(leader)).ToImmutableSortedSet()); if (leader2 != null) { ClusterCore(leader2.Address).Tell(new InternalClusterAction.ExitingConfirmed(SelfUniqueAddress)); @@ -1198,7 +1200,7 @@ private void ExitingCompleted() private void ReceiveExitingConfirmed(UniqueAddress node) { _cluster.LogInfo("Exiting confirmed [{0}]", node.Address); - _exitingConfirmed.Add(node); + _exitingConfirmed = _exitingConfirmed.Add(node); } private void CleanupExitingConfirmed() @@ -1206,7 +1208,7 @@ private void CleanupExitingConfirmed() // in case the actual removal was performed by another leader node if (_exitingConfirmed.Any()) { - _exitingConfirmed = new HashSet(_exitingConfirmed.Where(n => _latestGossip.Members.Any(m => m.UniqueAddress.Equals(n)))); + _exitingConfirmed = _exitingConfirmed.Where(n => LatestGossip.Members.Any(m => m.UniqueAddress.Equals(n))).ToImmutableHashSet(); } } @@ -1447,8 +1449,8 @@ protected override void Unhandled(object message) /// public void InitJoin() { - var selfStatus = _latestGossip.GetMember(SelfUniqueAddress).Status; - if (Gossip.RemoveUnreachableWithMemberStatus.Contains(selfStatus)) + var selfStatus = LatestGossip.GetMember(SelfUniqueAddress).Status; + if (RemoveUnreachableWithMemberStatus.Contains(selfStatus)) { _cluster.LogInfo("Sending InitJoinNack message from node [{0}] to [{1}]", SelfUniqueAddress.Address, Sender); @@ -1518,7 +1520,7 @@ public void Join(Address address) else { //TODO: Akka exception? - if (!_latestGossip.Members.IsEmpty) throw new InvalidOperationException("Join can only be done from an empty state"); + if (!LatestGossip.Members.IsEmpty) throw new InvalidOperationException("Join can only be done from an empty state"); // to support manual join when joining to seed nodes is stuck (no seed nodes available) StopSeedNodeProcess(); @@ -1567,7 +1569,7 @@ public void StopSeedNodeProcess() /// TBD public void Joining(UniqueAddress node, ImmutableHashSet roles, AppVersion appVersion) { - var selfStatus = _latestGossip.GetMember(SelfUniqueAddress).Status; + var selfStatus = LatestGossip.GetMember(SelfUniqueAddress).Status; if (!node.Address.Protocol.Equals(_cluster.SelfAddress.Protocol)) { _log.Warning("Member with wrong protocol tried to join, but was ignored, expected [{0}] but was [{1}]", @@ -1578,14 +1580,14 @@ public void Joining(UniqueAddress node, ImmutableHashSet roles, AppVersi _log.Warning("Member with wrong ActorSystem name tried to join, but was ignored, expected [{0}] but was [{1}]", _cluster.SelfAddress.System, node.Address.System); } - else if (Gossip.RemoveUnreachableWithMemberStatus.Contains(selfStatus)) + else if (RemoveUnreachableWithMemberStatus.Contains(selfStatus)) { _cluster.LogInfo("Trying to join [{0}] to [{1}] member, ignoring. Use a member that is Up instead.", node, selfStatus); } else { - var localMembers = _latestGossip.Members; + var localMembers = LatestGossip.Members; // check by address without uid to make sure that node with same host:port is not allowed // to join until previous node with that host:port has been removed from the cluster @@ -1596,7 +1598,7 @@ public void Joining(UniqueAddress node, ImmutableHashSet roles, AppVersi _cluster.LogInfo("Existing member [{0}] is joining again.", node); if (!node.Equals(SelfUniqueAddress)) { - Sender.Tell(new InternalClusterAction.Welcome(SelfUniqueAddress, _latestGossip)); + Sender.Tell(new InternalClusterAction.Welcome(SelfUniqueAddress, LatestGossip)); } } else if (localMember != null) @@ -1609,10 +1611,10 @@ public void Joining(UniqueAddress node, ImmutableHashSet roles, AppVersi if (localMember.Status != MemberStatus.Down) { // we can confirm it as terminated/unreachable immediately - var newReachability = _latestGossip.Overview.Reachability.Terminated( + var newReachability = LatestGossip.Overview.Reachability.Terminated( _cluster.SelfUniqueAddress, localMember.UniqueAddress); - var newOverview = _latestGossip.Overview.Copy(reachability: newReachability); - var newGossip = _latestGossip.Copy(overview: newOverview); + var newOverview = LatestGossip.Overview.Copy(reachability: newReachability); + var newGossip = LatestGossip.Copy(overview: newOverview); UpdateLatestGossip(newGossip); Downing(localMember.Address); } @@ -1627,7 +1629,7 @@ public void Joining(UniqueAddress node, ImmutableHashSet roles, AppVersi var newMembers = localMembers .Add(Member.Create(node, roles, appVersion)) .Add(Member.Create(_cluster.SelfUniqueAddress, _cluster.SelfRoles, _cluster.Settings.AppVersion)); - var newGossip = _latestGossip.Copy(members: newMembers); + var newGossip = LatestGossip.Copy(members: newMembers); UpdateLatestGossip(newGossip); @@ -1646,10 +1648,10 @@ public void Joining(UniqueAddress node, ImmutableHashSet roles, AppVersi else { _cluster.LogInfo("Node [{0}] is JOINING, roles [{1}], version [{2}]", node.Address, string.Join(",", roles), appVersion); - Sender.Tell(new InternalClusterAction.Welcome(SelfUniqueAddress, _latestGossip)); + Sender.Tell(new InternalClusterAction.Welcome(SelfUniqueAddress, LatestGossip)); } - Publish(_latestGossip); + PublishMembershipState(); } } } @@ -1663,7 +1665,7 @@ public void Joining(UniqueAddress node, ImmutableHashSet roles, AppVersi /// Welcome can only be done from an empty state public void Welcome(Address joinWith, UniqueAddress from, Gossip gossip) { - if (!_latestGossip.Members.IsEmpty) throw new InvalidOperationException("Welcome can only be done from an empty state"); + if (!LatestGossip.Members.IsEmpty) throw new InvalidOperationException("Welcome can only be done from an empty state"); if (!joinWith.Equals(from.Address)) { _cluster.LogInfo("Ignoring welcome from [{0}] when trying to join with [{1}]", from.Address, joinWith); @@ -1671,9 +1673,9 @@ public void Welcome(Address joinWith, UniqueAddress from, Gossip gossip) else { _cluster.LogInfo("Welcome from [{0}]", from.Address); - _latestGossip = gossip.Seen(SelfUniqueAddress); + _membershipState = _membershipState.Copy(gossip).Seen(); AssertLatestGossip(); - Publish(_latestGossip); + PublishMembershipState(); if (!from.Equals(SelfUniqueAddress)) GossipTo(from, Sender); BecomeInitialized(); @@ -1689,20 +1691,20 @@ public void Welcome(Address joinWith, UniqueAddress from, Gossip gossip) public void Leaving(Address address) { // only try to update if the node is available (in the member ring) - if (_latestGossip.Members.Any(m => m.Address.Equals(address) && (m.Status == MemberStatus.Joining || m.Status == MemberStatus.WeaklyUp || m.Status == MemberStatus.Up))) + if (LatestGossip.Members.Any(m => m.Address.Equals(address) && (m.Status == MemberStatus.Joining || m.Status == MemberStatus.WeaklyUp || m.Status == MemberStatus.Up))) { // mark node as LEAVING - var newMembers = _latestGossip.Members.Select(m => + var newMembers = LatestGossip.Members.Select(m => { if (m.Address == address) return m.Copy(status: MemberStatus.Leaving); return m; }).ToImmutableSortedSet(); // mark node as LEAVING - var newGossip = _latestGossip.Copy(members: newMembers); + var newGossip = LatestGossip.Copy(members: newMembers); UpdateLatestGossip(newGossip); _cluster.LogInfo("Marked address [{0}] as [{1}]", address, MemberStatus.Leaving); - Publish(_latestGossip); + PublishMembershipState(); // immediate gossip to speed up the leaving process SendGossip(); } @@ -1725,11 +1727,11 @@ public void Shutdown() /// The address of the member that will be downed. public void Downing(Address address) { - var localGossip = _latestGossip; + var localGossip = LatestGossip; var localMembers = localGossip.Members; var localOverview = localGossip.Overview; var localSeen = localOverview.Seen; - var localReachability = localOverview.Reachability; + var localReachability = _membershipState.DcReachability; // check if the node to DOWN is in the 'members' set var member = localMembers.FirstOrDefault(m => m.Address == address); @@ -1740,17 +1742,11 @@ public void Downing(Address address) else _cluster.LogInfo("Marking unreachable node [{0}] as [{1}]", member.Address, MemberStatus.Down); - // replace member (changed status) - var newMembers = localMembers.Remove(member).Add(member.Copy(MemberStatus.Down)); - // remove nodes marked as DOWN from the 'seen' table - var newSeen = localSeen.Remove(member.UniqueAddress); - //update gossip overview - var newOverview = localOverview.Copy(seen: newSeen); - var newGossip = localGossip.Copy(members: newMembers, overview: newOverview); //update gossip + var newGossip = localGossip.MarkAsDown(member); //update gossip UpdateLatestGossip(newGossip); - Publish(_latestGossip); + PublishMembershipState(); if (address == _cluster.SelfAddress) { @@ -1779,7 +1775,7 @@ public void Downing(Address address) /// TBD public void Quarantined(UniqueAddress node) { - var localGossip = _latestGossip; + var localGossip = LatestGossip; if (localGossip.HasMember(node)) { var newReachability = localGossip.Overview.Reachability.Terminated(SelfUniqueAddress, node); @@ -1788,7 +1784,7 @@ public void Quarantined(UniqueAddress node) UpdateLatestGossip(newGossip); _log.Warning("Cluster Node [{0}] - Marking node as TERMINATED [{1}], due to quarantine. Node roles [{2}]. It must still be marked as down before it's removed.", Self, node.Address, string.Join(",", _cluster.SelfRoles)); - Publish(_latestGossip); + PublishMembershipState(); } } @@ -1799,14 +1795,14 @@ public void Quarantined(UniqueAddress node) public void ReceiveGossipStatus(GossipStatus status) { var from = status.From; - if (!_latestGossip.Overview.Reachability.IsReachable(SelfUniqueAddress, from)) + if (!LatestGossip.Overview.Reachability.IsReachable(SelfUniqueAddress, from)) _cluster.LogInfo("Ignoring received gossip status from unreachable [{0}]", from); - else if (_latestGossip.Members.All(m => !m.UniqueAddress.Equals(from))) + else if (LatestGossip.Members.All(m => !m.UniqueAddress.Equals(from))) _cluster.LogInfo("Cluster Node [{0}] - Ignoring received gossip status from unknown [{1}]", _cluster.SelfAddress, from); else { - var comparison = status.Version.CompareTo(_latestGossip.Version); + var comparison = status.Version.CompareTo(LatestGossip.Version); switch (comparison) { case VectorClock.Ordering.Same: @@ -1858,7 +1854,7 @@ public ReceiveGossipType ReceiveGossip(GossipEnvelope envelope) { var from = envelope.From; var remoteGossip = envelope.Gossip; - var localGossip = _latestGossip; + var localGossip = LatestGossip; if (remoteGossip.Equals(Gossip.Empty)) { @@ -1922,7 +1918,7 @@ public ReceiveGossipType ReceiveGossip(GossipEnvelope envelope) // Removal of member itself is handled in merge (pickHighestPriority) var prunedLocalGossip = localGossip.Members.Aggregate(localGossip, (g, m) => { - if (Gossip.RemoveUnreachableWithMemberStatus.Contains(m.Status) && !remoteGossip.Members.Contains(m)) + if (RemoveUnreachableWithMemberStatus.Contains(m.Status) && !remoteGossip.Members.Contains(m)) { _log.Debug("Cluster Node [{0}] - Pruned conflicting local gossip: {1}", _cluster.SelfAddress, m); return g.Prune(VectorClock.Node.Create(VclockName(m.UniqueAddress))); @@ -1932,7 +1928,7 @@ public ReceiveGossipType ReceiveGossip(GossipEnvelope envelope) var prunedRemoteGossip = remoteGossip.Members.Aggregate(remoteGossip, (g, m) => { - if (Gossip.RemoveUnreachableWithMemberStatus.Contains(m.Status) && !localGossip.Members.Contains(m)) + if (RemoveUnreachableWithMemberStatus.Contains(m.Status) && !localGossip.Members.Contains(m)) { _log.Debug("Cluster Node [{0}] - Pruned conflicting remote gossip: {1}", _cluster.SelfAddress, m); return g.Prune(VectorClock.Node.Create(VclockName(m.UniqueAddress))); @@ -1952,17 +1948,17 @@ public ReceiveGossipType ReceiveGossip(GossipEnvelope envelope) // the exiting tasks have been completed. if (_exitingTasksInProgress) { - _latestGossip = winningGossip; + _membershipState = _membershipState.Copy(winningGossip); } else { - _latestGossip = winningGossip.Seen(SelfUniqueAddress); + _membershipState = _membershipState.Copy(winningGossip.Seen(SelfUniqueAddress)); } AssertLatestGossip(); // for all new joining nodes we remove them from the failure detector - foreach (var node in _latestGossip.Members) + foreach (var node in LatestGossip.Members) { if (!localGossip.Members.Contains(node)) { @@ -1998,9 +1994,9 @@ public ReceiveGossipType ReceiveGossip(GossipEnvelope envelope) } } - Publish(_latestGossip); + PublishMembershipState(); - var selfStatus = _latestGossip.GetMember(SelfUniqueAddress).Status; + var selfStatus = LatestGossip.GetMember(SelfUniqueAddress).Status; if (selfStatus == MemberStatus.Exiting && !_exitingTasksInProgress) { // ExitingCompleted will be received via CoordinatedShutdown to continue @@ -2056,17 +2052,17 @@ public void GossipSpeedupTick() /// public bool IsGossipSpeedupNeeded() { - return _latestGossip.Members.Any(m => m.Status == MemberStatus.Down) || - _latestGossip.Overview.Seen.Count < _latestGossip.Members.Count / 2; + return LatestGossip.Members.Any(m => m.Status == MemberStatus.Down) || + LatestGossip.Overview.Seen.Count < LatestGossip.Members.Count / 2; } private void SendGossipRandom(int n) { if (!IsSingletonCluster && n > 0) { - var localGossip = _latestGossip; + var localGossip = LatestGossip; var possibleTargets = - localGossip.Members.Where(m => ValidNodeForGossip(m.UniqueAddress)) + localGossip.Members.Where(m => _membershipState.ValidNodeForGossip(m.UniqueAddress)) .Select(m => m.UniqueAddress) .ToList(); var randomTargets = possibleTargets.Count <= n ? possibleTargets : possibleTargets.Shuffle().Slice(0, n); @@ -2081,7 +2077,7 @@ public void SendGossip() { if (!IsSingletonCluster) { - var localGossip = _latestGossip; + var localGossip = LatestGossip; ImmutableList preferredGossipTarget; @@ -2089,8 +2085,8 @@ public void SendGossip() { // If it's time to try to gossip to some nodes with a different view // gossip to a random alive member with preference to a member with older gossip version - preferredGossipTarget = ImmutableList.CreateRange(localGossip.Members.Where(m => !localGossip.SeenByNode(m.UniqueAddress) && - ValidNodeForGossip(m.UniqueAddress)).Select(m => m.UniqueAddress)); + preferredGossipTarget = ImmutableList.CreateRange(localGossip.Members.Where(m => !localGossip.SeenByNode(m.UniqueAddress) + && _membershipState.ValidNodeForGossip(m.UniqueAddress)).Select(m => m.UniqueAddress)); } else { @@ -2109,7 +2105,7 @@ public void SendGossip() var peer = SelectRandomNode( ImmutableList.CreateRange( - localGossip.Members.Where(m => ValidNodeForGossip(m.UniqueAddress)) + localGossip.Members.Where(m => _membershipState.ValidNodeForGossip(m.UniqueAddress)) .Select(m => m.UniqueAddress))); if (peer != null) @@ -2129,7 +2125,7 @@ public double AdjustedGossipDifferentViewProbability { get { - var size = _latestGossip.Members.Count; + var size = LatestGossip.Members.Count; var low = _cluster.Settings.ReduceGossipDifferentViewProbability; var high = low * 3; // start reduction when cluster is larger than configured ReduceGossipDifferentViewProbability @@ -2153,7 +2149,7 @@ public double AdjustedGossipDifferentViewProbability /// public void LeaderActions() { - if (_latestGossip.IsLeader(SelfUniqueAddress, SelfUniqueAddress)) + if (_membershipState.IsLeader(SelfUniqueAddress)) { // only run the leader actions if we are the LEADER if (!_isCurrentlyLeader) @@ -2163,7 +2159,7 @@ public void LeaderActions() } const int firstNotice = 20; const int periodicNotice = 60; - if (_latestGossip.Convergence(SelfUniqueAddress, _exitingConfirmed)) + if (_membershipState.Convergence(_exitingConfirmed)) { if (_leaderActionCounter >= firstNotice) _cluster.LogInfo("Leader can perform its duties again"); @@ -2181,12 +2177,12 @@ public void LeaderActions() { _cluster.LogInfo( "Leader can currently not perform its duties, reachability status: [{0}], member status: [{1}]", - _latestGossip.ReachabilityExcludingDownedObservers, - string.Join(", ", _latestGossip.Members + _membershipState.DcReachabilityExcludingDownedObservers, + string.Join(", ", LatestGossip.Members .Select(m => string.Format("${0} ${1} seen=${2}", m.Address, m.Status, - _latestGossip.SeenByNode(m.UniqueAddress))))); + LatestGossip.SeenByNode(m.UniqueAddress))))); } } } @@ -2202,13 +2198,13 @@ public void LeaderActions() private void MoveJoiningToWeaklyUp() { - var localGossip = _latestGossip; + var localGossip = LatestGossip; var localMembers = localGossip.Members; var enoughMembers = IsMinNrOfMembersFulfilled(); bool IsJoiningToWeaklyUp(Member m) => m.Status == MemberStatus.Joining && enoughMembers - && _latestGossip.ReachabilityExcludingDownedObservers.Value.IsReachable(m.UniqueAddress); + && _membershipState.DcReachabilityExcludingDownedObservers.IsReachable(m.UniqueAddress); var changedMembers = localMembers .Where(IsJoiningToWeaklyUp) @@ -2228,21 +2224,21 @@ bool IsJoiningToWeaklyUp(Member m) => m.Status == MemberStatus.Joining _cluster.LogInfo("Leader is moving node [{0}] to [{1}]", m.Address, m.Status); } - Publish(newGossip); + PublishMembershipState(); if (_cluster.Settings.PublishStatsInterval == TimeSpan.Zero) PublishInternalStats(); } } private void ShutdownSelfWhenDown() { - if (_latestGossip.GetMember(SelfUniqueAddress).Status == MemberStatus.Down) + if (LatestGossip.GetMember(SelfUniqueAddress).Status == MemberStatus.Down) { // When all reachable have seen the state this member will shutdown itself when it has // status Down. The down commands should spread before we shutdown. - var unreachable = _latestGossip.Overview.Reachability.AllUnreachableOrTerminated; - var downed = _latestGossip.Members.Where(m => m.Status == MemberStatus.Down) + var unreachable = LatestGossip.Overview.Reachability.AllUnreachableOrTerminated; + var downed = LatestGossip.Members.Where(m => m.Status == MemberStatus.Down) .Select(m => m.UniqueAddress).ToList(); - if (_selfDownCounter >= MaxTicksBeforeShuttingDownMyself || downed.All(node => unreachable.Contains(node) || _latestGossip.SeenByNode(node))) + if (_selfDownCounter >= MaxTicksBeforeShuttingDownMyself || downed.All(node => unreachable.Contains(node) || LatestGossip.SeenByNode(node))) { // the reason for not shutting down immediately is to give the gossip a chance to spread // the downing information to other downed nodes, so that they can shutdown themselves @@ -2269,10 +2265,10 @@ private void ShutdownSelfWhenDown() /// public bool IsMinNrOfMembersFulfilled() { - return _latestGossip.Members.Count >= _cluster.Settings.MinNrOfMembers + return LatestGossip.Members.Count >= _cluster.Settings.MinNrOfMembers && _cluster.Settings .MinNrOfMembersOfRole - .All(x => _latestGossip.Members.Count(c => c.HasRole(x.Key)) >= x.Value); + .All(x => LatestGossip.Members.Count(c => c.HasRole(x.Key)) >= x.Value); } /// @@ -2290,7 +2286,7 @@ public bool IsMinNrOfMembersFulfilled() /// public void LeaderActionsOnConvergence() { - var localGossip = _latestGossip; + var localGossip = LatestGossip; var localMembers = localGossip.Members; var localOverview = localGossip.Overview; var localSeen = localOverview.Seen; @@ -2300,7 +2296,7 @@ public void LeaderActionsOnConvergence() var removedUnreachable = localOverview.Reachability.AllUnreachableOrTerminated.Select(localGossip.GetMember) - .Where(m => Gossip.RemoveUnreachableWithMemberStatus.Contains(m.Status)) + .Where(m => RemoveUnreachableWithMemberStatus.Contains(m.Status)) .ToImmutableHashSet(); var removedExitingConfirmed = @@ -2381,7 +2377,7 @@ public void LeaderActionsOnConvergence() } UpdateLatestGossip(newGossip); - _exitingConfirmed = new HashSet(_exitingConfirmed.Except(removedExitingConfirmed)); + _exitingConfirmed = _exitingConfirmed.Except(removedExitingConfirmed); // log status changes foreach (var m in changedMembers) @@ -2399,7 +2395,7 @@ public void LeaderActionsOnConvergence() _cluster.LogInfo("Leader is removing confirmed Exiting node [{0}]", m.Address); } - Publish(_latestGossip); + PublishMembershipState(); GossipExitingMembersToOldest(changedMembers.Where(i => i.Status == MemberStatus.Exiting)); } } @@ -2410,7 +2406,7 @@ public void LeaderActionsOnConvergence() /// private void GossipExitingMembersToOldest(IEnumerable exitingMembers) { - var targets = GossipTargetsForExitingMembers(_latestGossip, exitingMembers); + var targets = GossipTargetsForExitingMembers(LatestGossip, exitingMembers); if (targets != null && targets.Any()) { if (_log.IsDebugEnabled) @@ -2432,7 +2428,7 @@ public void ReapUnreachableMembers() { // only scrutinize if we are a non-singleton cluster - var localGossip = _latestGossip; + var localGossip = LatestGossip; var localOverview = localGossip.Overview; var localMembers = localGossip.Members; @@ -2457,7 +2453,7 @@ public void ReapUnreachableMembers() newReachability1, (reachability, m) => reachability.Reachable(SelfUniqueAddress, m.UniqueAddress)); - if (!newReachability2.Equals(localOverview.Reachability)) + if (!ReferenceEquals(newReachability2, localOverview.Reachability)) { var newOverview = localOverview.Copy(reachability: newReachability2); var newGossip = localGossip.Copy(overview: newOverview); @@ -2479,7 +2475,7 @@ public void ReapUnreachableMembers() if (!newlyDetectedReachableMembers.IsEmpty) _cluster.LogInfo("Marking node(s) as REACHABLE [{0}]. Node roles [{1}]", newlyDetectedReachableMembers.Select(m => m.ToString()).Aggregate((a, b) => a + ", " + b), string.Join(",", _cluster.SelfRoles)); - Publish(_latestGossip); + PublishMembershipState(); } } } @@ -2501,7 +2497,12 @@ public UniqueAddress SelectRandomNode(ImmutableList nodes) /// public bool IsSingletonCluster { - get { return _latestGossip.IsSingletonCluster; } + get { return LatestGossip.IsSingletonCluster; } + } + + public UniqueAddress SelfUniqueAddress1 + { + get { return SelfUniqueAddress; } } /// @@ -2510,7 +2511,7 @@ public bool IsSingletonCluster /// TBD public void SendGossipTo(Address address) { - foreach (var m in _latestGossip.Members) + foreach (var m in LatestGossip.Members) { if (m.Address.Equals(address)) GossipTo(m.UniqueAddress); @@ -2523,8 +2524,8 @@ public void SendGossipTo(Address address) /// The address of the node we want to send gossip to. public void GossipTo(UniqueAddress node) { - if (ValidNodeForGossip(node)) - ClusterCore(node.Address).Tell(new GossipEnvelope(SelfUniqueAddress, node, _latestGossip)); + if (_membershipState.ValidNodeForGossip(node)) + ClusterCore(node.Address).Tell(new GossipEnvelope(SelfUniqueAddress, node, LatestGossip)); } /// @@ -2534,8 +2535,8 @@ public void GossipTo(UniqueAddress node) /// TBD public void GossipTo(UniqueAddress node, IActorRef destination) { - if (ValidNodeForGossip(node)) - destination.Tell(new GossipEnvelope(SelfUniqueAddress, node, _latestGossip)); + if (_membershipState.ValidNodeForGossip(node)) + destination.Tell(new GossipEnvelope(SelfUniqueAddress, node, LatestGossip)); } /// @@ -2544,8 +2545,8 @@ public void GossipTo(UniqueAddress node, IActorRef destination) /// TBD public void GossipStatusTo(UniqueAddress node) { - if (ValidNodeForGossip(node)) - ClusterCore(node.Address).Tell(new GossipStatus(SelfUniqueAddress, _latestGossip.Version)); + if (_membershipState.ValidNodeForGossip(node)) + ClusterCore(node.Address).Tell(new GossipStatus(SelfUniqueAddress, LatestGossip.Version)); } /// @@ -2555,18 +2556,8 @@ public void GossipStatusTo(UniqueAddress node) /// TBD public void GossipStatusTo(UniqueAddress node, IActorRef destination) { - if (ValidNodeForGossip(node)) - destination.Tell(new GossipStatus(SelfUniqueAddress, _latestGossip.Version)); - } - - /// - /// TBD - /// - /// TBD - /// TBD - public bool ValidNodeForGossip(UniqueAddress node) - { - return !node.Equals(SelfUniqueAddress) && _latestGossip.Overview.Reachability.IsReachable(SelfUniqueAddress, node); + if (_membershipState.ValidNodeForGossip(node)) + destination.Tell(new GossipStatus(SelfUniqueAddress, LatestGossip.Version)); } /// @@ -2580,7 +2571,8 @@ public static IEnumerable GossipTargetsForExitingMembers(Gossip latestGo if (exitingMembers.Any()) { var roles = exitingMembers.SelectMany(m => m.Roles); - var membersSortedByAge = latestGossip.Members.OrderBy(m => m, Member.AgeOrdering); + var membersSortedByAge = latestGossip.Members + .OrderBy(m => m, Member.AgeOrdering).ToImmutableHashSet(); var targets = new HashSet(); var t = membersSortedByAge.Take(2).ToArray(); // 2 oldest of all nodes @@ -2601,25 +2593,31 @@ public static IEnumerable GossipTargetsForExitingMembers(Gossip latestGo /// /// Updates the local gossip with the latest received from over the network. /// - /// The new gossip to merge with our own. - public void UpdateLatestGossip(Gossip newGossip) + /// The new gossip to merge with our own. + public void UpdateLatestGossip(Gossip gossip) { // Updating the vclock version for the changes - var versionedGossip = newGossip.Increment(_vclockNode); + var versionedGossip = gossip.Increment(_vclockNode); - // Don't mark gossip state as seen while exiting is in progress, e.g. - // shutting down singleton actors. This delays removal of the member until - // the exiting tasks have been completed. - if (_exitingTasksInProgress) - _latestGossip = versionedGossip.ClearSeen(); - else + Gossip PickLatest() { - // Nobody else has seen this gossip but us - var seenVersionedGossip = versionedGossip.OnlySeen(SelfUniqueAddress); + if (_exitingTasksInProgress) + return versionedGossip.ClearSeen(); + else + { + // Nobody else has seen this gossip but us + var seenVersionedGossip = versionedGossip.OnlySeen(SelfUniqueAddress); - // Update the state with the new gossip - _latestGossip = seenVersionedGossip; + // Update the state with the new gossip + return seenVersionedGossip; + } } + + // Don't mark gossip state as seen while exiting is in progress, e.g. + // shutting down singleton actors. This delays removal of the member until + // the exiting tasks have been completed. + var newGossip = PickLatest(); + _membershipState = _membershipState.Copy(newGossip); AssertLatestGossip(); } @@ -2629,19 +2627,21 @@ public void UpdateLatestGossip(Gossip newGossip) /// Thrown if the VectorClock is corrupt and has not been pruned properly. public void AssertLatestGossip() { - if (Cluster.IsAssertInvariantsEnabled && _latestGossip.Version.Versions.Count > _latestGossip.Members.Count) + if (Cluster.IsAssertInvariantsEnabled && LatestGossip.Version.Versions.Count > LatestGossip.Members.Count) { - throw new InvalidOperationException($"Too many vector clock entries in gossip state {_latestGossip}"); + throw new InvalidOperationException($"Too many vector clock entries in gossip state {LatestGossip}"); } } /// /// Publishes gossip to other nodes in the cluster. /// - /// The new gossip to share. - public void Publish(Gossip newGossip) + public void PublishMembershipState() { - _publisher.Tell(new InternalClusterAction.PublishChanges(newGossip)); + if (_cluster.Settings.VerboseGossipReceivedLogging) + _log.Debug("Cluster Node [{0}] - New gossip published [{0}]", SelfUniqueAddress, _membershipState.LatestGossip); + + _publisher.Tell(new InternalClusterAction.PublishChanges(_membershipState)); if (_cluster.Settings.PublishStatsInterval == TimeSpan.Zero) { PublishInternalStats(); @@ -2653,8 +2653,8 @@ public void Publish(Gossip newGossip) /// public void PublishInternalStats() { - var vclockStats = new VectorClockStats(_latestGossip.Version.Versions.Count, - _latestGossip.Members.Count(m => _latestGossip.SeenByNode(m.UniqueAddress))); + var vclockStats = new VectorClockStats(LatestGossip.Version.Versions.Count, + LatestGossip.Members.Count(m => LatestGossip.SeenByNode(m.UniqueAddress))); _publisher.Tell(new ClusterEvent.CurrentInternalStats(_gossipStats, vclockStats)); } diff --git a/src/core/Akka.Cluster/ClusterEvent.cs b/src/core/Akka.Cluster/ClusterEvent.cs index 5f9c9175ff5..3697b22b764 100644 --- a/src/core/Akka.Cluster/ClusterEvent.cs +++ b/src/core/Akka.Cluster/ClusterEvent.cs @@ -829,44 +829,37 @@ public override bool Equals(object obj) } } - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - internal static ImmutableList DiffUnreachable(Gossip oldGossip, Gossip newGossip, UniqueAddress selfUniqueAddress) - { - if (newGossip.Equals(oldGossip)) + /// + /// INTERNAL API + /// + internal static ImmutableList DiffUnreachable(MembershipState oldState, MembershipState newState) + { + if (ReferenceEquals(newState, oldState)) { return ImmutableList.Empty; } - var oldUnreachableNodes = oldGossip.Overview.Reachability.AllUnreachableOrTerminated; - return newGossip.Overview.Reachability.AllUnreachableOrTerminated - .Where(node => !oldUnreachableNodes.Contains(node) && !node.Equals(selfUniqueAddress)) - .Select(node => new UnreachableMember(newGossip.GetMember(node))) + var oldUnreachableNodes = oldState.Overview.Reachability.AllUnreachableOrTerminated; + return newState.Overview.Reachability.AllUnreachableOrTerminated + .Where(node => !oldUnreachableNodes.Contains(node) && !node.Equals(newState.SelfUniqueAddress)) + .Select(node => new UnreachableMember(newState.LatestGossip.GetMember(node))) .ToImmutableList(); } - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - internal static ImmutableList DiffReachable(Gossip oldGossip, Gossip newGossip, UniqueAddress selfUniqueAddress) + /// + /// INTERNAL API + /// + internal static ImmutableList DiffReachable(MembershipState oldState, MembershipState newState) { - if (newGossip.Equals(oldGossip)) + if (ReferenceEquals(newState, oldState)) { return ImmutableList.Empty; } - return oldGossip.Overview.Reachability.AllUnreachable - .Where(node => newGossip.HasMember(node) && newGossip.Overview.Reachability.IsReachable(node) && !node.Equals(selfUniqueAddress)) - .Select(node => new ReachableMember(newGossip.GetMember(node))) + return oldState.Overview.Reachability.AllUnreachable + .Where(node => newState.LatestGossip.HasMember(node) && newState.Overview.Reachability.IsReachable(node) + && !node.Equals(newState.SelfUniqueAddress)) + .Select(node => new ReachableMember(newState.LatestGossip.GetMember(node))) .ToImmutableList(); } @@ -874,19 +867,20 @@ internal static ImmutableList DiffReachable(Gossip oldGossip, G /// Compares two instances and uses them to publish the appropriate /// for any given change to the membership of the current cluster. /// - /// The previous gossip instance. - /// The new gossip instance. + /// The previous gossip instance. + /// The new gossip instance. /// A possibly empty set of membership events to be published to all subscribers. - internal static ImmutableList DiffMemberEvents(Gossip oldGossip, Gossip newGossip) + internal static ImmutableList DiffMemberEvents(MembershipState oldState, MembershipState newState) { - if (newGossip.Equals(oldGossip)) + if (ReferenceEquals(newState, oldState)) { return ImmutableList.Empty; } - var newMembers = newGossip.Members.Except(oldGossip.Members); - var membersGroupedByAddress = newGossip.Members - .Concat(oldGossip.Members) + var newMembers = newState.Members.Except(oldState.Members); + + var membersGroupedByAddress = newState.Members + .Concat(oldState.Members) .GroupBy(m => m.UniqueAddress); var changedMembers = membersGroupedByAddress @@ -896,7 +890,7 @@ internal static ImmutableList DiffMemberEvents(Gossip oldGossip, G .Select(g => g.First()); var memberEvents = CollectMemberEvents(newMembers.Union(changedMembers)); - var removedMembers = oldGossip.Members.Except(newGossip.Members); + var removedMembers = oldState.Members.Except(newState.Members); var removedEvents = removedMembers.Select(m => new MemberRemoved(m.Copy(status: MemberStatus.Removed), m.Status)); return memberEvents.Concat(removedEvents).ToImmutableList(); @@ -931,70 +925,48 @@ private static IEnumerable CollectMemberEvents(IEnumerable } /// - /// TBD + /// INTERNAL API /// - /// TBD - /// TBD - /// TBD - /// TBD - internal static ImmutableList DiffLeader(Gossip oldGossip, Gossip newGossip, UniqueAddress selfUniqueAddress) + internal static ImmutableList DiffLeader(MembershipState oldState, MembershipState newState) { - var newLeader = newGossip.Leader(selfUniqueAddress); - if ((newLeader == null && oldGossip.Leader(selfUniqueAddress) == null) - || newLeader != null && newLeader.Equals(oldGossip.Leader(selfUniqueAddress))) + var newLeader = newState.Leader; + if (newLeader == oldState.Leader) return ImmutableList.Empty; - return ImmutableList.Create(newLeader == null - ? new LeaderChanged(null) - : new LeaderChanged(newLeader.Address)); + return ImmutableList.Create(new LeaderChanged(newLeader?.Address)); } /// - /// TBD + /// INTERNAL API /// - /// TBD - /// TBD - /// TBD - /// TBD - internal static ImmutableHashSet DiffRolesLeader(Gossip oldGossip, Gossip newGossip, UniqueAddress selfUniqueAddress) + internal static ImmutableHashSet DiffRolesLeader(MembershipState oldState, MembershipState newState) { - return InternalDiffRolesLeader(oldGossip, newGossip, selfUniqueAddress).ToImmutableHashSet(); + return InternalDiffRolesLeader(oldState, newState).ToImmutableHashSet(); } - private static IEnumerable InternalDiffRolesLeader(Gossip oldGossip, Gossip newGossip, UniqueAddress selfUniqueAddress) + private static IEnumerable InternalDiffRolesLeader(MembershipState oldState, MembershipState newState) { - foreach (var role in oldGossip.AllRoles.Union(newGossip.AllRoles)) + foreach (var role in oldState.LatestGossip.AllRoles.Union(newState.LatestGossip.AllRoles)) { - var newLeader = newGossip.RoleLeader(role, selfUniqueAddress); - if (newLeader == null && oldGossip.RoleLeader(role, selfUniqueAddress) != null) - yield return new RoleLeaderChanged(role, null); - if (newLeader != null && !newLeader.Equals(oldGossip.RoleLeader(role, selfUniqueAddress))) - yield return new RoleLeaderChanged(role, newLeader.Address); + var newLeader = newState.RoleLeader(role); + if (newLeader != oldState.RoleLeader(role)) + yield return new RoleLeaderChanged(role, newLeader?.Address); } } /// - /// Used for checking convergence when we don't have any information from the cluster daemon. - /// - private static readonly HashSet EmptySet = new HashSet(); - - /// - /// TBD + /// INTERNAL API /// - /// TBD - /// TBD - /// TBD - /// TBD - internal static ImmutableList DiffSeen(Gossip oldGossip, Gossip newGossip, UniqueAddress selfUniqueAddress) + internal static ImmutableList DiffSeen(MembershipState oldState, MembershipState newState) { - if (newGossip.Equals(oldGossip)) + if (ReferenceEquals(newState, oldState)) { return ImmutableList.Empty; } - var newConvergence = newGossip.Convergence(selfUniqueAddress, EmptySet); - var newSeenBy = newGossip.SeenBy; - if (!newConvergence.Equals(oldGossip.Convergence(selfUniqueAddress, EmptySet)) || !newSeenBy.SequenceEqual(oldGossip.SeenBy)) + var newConvergence = newState.Convergence(ImmutableHashSet.Empty); + var newSeenBy = newState.LatestGossip.SeenBy; + if (!newConvergence.Equals(oldState.Convergence(ImmutableHashSet.Empty)) || !newSeenBy.SequenceEqual(oldState.LatestGossip.SeenBy)) { return ImmutableList.Create(new SeenChanged(newConvergence, newSeenBy.Select(s => s.Address).ToImmutableHashSet())); } @@ -1003,17 +975,14 @@ internal static ImmutableList DiffSeen(Gossip oldGossip, Gossip new } /// - /// TBD + /// INTERNAL API /// - /// TBD - /// TBD - /// TBD - internal static ImmutableList DiffReachability(Gossip oldGossip, Gossip newGossip) + internal static ImmutableList DiffReachability(MembershipState oldState, MembershipState newState) { - if (newGossip.Overview.Reachability.Equals(oldGossip.Overview.Reachability)) + if (newState.Overview.Reachability.Equals(oldState.Overview.Reachability)) return ImmutableList.Empty; - return ImmutableList.Create(new ReachabilityChanged(newGossip.Overview.Reachability)); + return ImmutableList.Create(new ReachabilityChanged(newState.Overview.Reachability)); } } @@ -1024,18 +993,19 @@ internal static ImmutableList DiffReachability(Gossip oldGo /// internal sealed class ClusterDomainEventPublisher : ReceiveActor, IRequiresMessageQueue { - private Gossip _latestGossip; private readonly UniqueAddress _selfUniqueAddress = Cluster.Get(Context.System).SelfUniqueAddress; + private readonly MembershipState _emptyMembershipState = new MembershipState(Gossip.Empty, Cluster.Get(Context.System).SelfUniqueAddress); + private MembershipState _membershipState; /// /// Default constructor for ClusterDomainEventPublisher. /// public ClusterDomainEventPublisher() { - _latestGossip = Gossip.Empty; _eventStream = Context.System.EventStream; + _membershipState = _emptyMembershipState; - Receive(newGossip => PublishChanges(newGossip.NewGossip)); + Receive(p => PublishChanges(p.NewState)); Receive(currentStats => PublishInternalStats(currentStats)); Receive(receiver => SendCurrentClusterState(receiver.Receiver)); Receive(sub => Subscribe(sub.Subscriber, sub.InitialStateMode, sub.To)); @@ -1054,7 +1024,7 @@ protected override void PostStop() { // publish the final removed state before shutting down Publish(ClusterEvent.ClusterShuttingDown.Instance); - PublishChanges(Gossip.Empty); + PublishChanges(_emptyMembershipState); } private readonly EventStream _eventStream; @@ -1065,21 +1035,22 @@ protected override void PostStop() /// private void SendCurrentClusterState(IActorRef receiver) { - var unreachable = _latestGossip.Overview.Reachability.AllUnreachableOrTerminated - .Where(node => !node.Equals(_selfUniqueAddress)) - .Select(node => _latestGossip.GetMember(node)) + var unreachable = _membershipState.LatestGossip.Overview.Reachability + .AllUnreachableOrTerminated.Where(x => x != _selfUniqueAddress) + .Select(x => _membershipState.LatestGossip.GetMember(x)) .ToImmutableHashSet(); var state = new ClusterEvent.CurrentClusterState( - members: _latestGossip.Members, + members: _membershipState.Members, unreachable: unreachable, - seenBy: _latestGossip.SeenBy.Select(s => s.Address).ToImmutableHashSet(), - leader: _latestGossip.Leader(_selfUniqueAddress) == null ? null : _latestGossip.Leader(_selfUniqueAddress).Address, - roleLeaderMap: _latestGossip.AllRoles.ToImmutableDictionary(r => r, r => - { - var leader = _latestGossip.RoleLeader(r, _selfUniqueAddress); - return leader == null ? null : leader.Address; - })); + seenBy: _membershipState.LatestGossip.SeenBy.Select(s => s.Address).ToImmutableHashSet(), + leader: _membershipState.Leader?.Address, + roleLeaderMap: _membershipState.LatestGossip.AllRoles + .ToImmutableDictionary(r => r, r => + { + var leader = _membershipState.RoleLeader(r); + return leader?.Address; + })); receiver.Tell(state); } @@ -1093,7 +1064,7 @@ private void Subscribe(IActorRef subscriber, ClusterEvent.SubscriptionInitialSta if (to.Any(o => o.IsAssignableFrom(eventType))) subscriber.Tell(@event); }; - PublishDiff(Gossip.Empty, _latestGossip, pub); + PublishDiff(_emptyMembershipState, _membershipState, pub); } else if (initMode == ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot) { @@ -1109,24 +1080,24 @@ private void Unsubscribe(IActorRef subscriber, Type to) else _eventStream.Unsubscribe(subscriber, to); } - private void PublishChanges(Gossip newGossip) + private void PublishChanges(MembershipState newState) { - var oldGossip = _latestGossip; - // keep the _latestGossip to be sent to new subscribers - _latestGossip = newGossip; - PublishDiff(oldGossip, newGossip, Publish); + var oldState = _membershipState; + // keep the latest state to be sent to new subscribers + _membershipState = newState; + PublishDiff(oldState, newState, Publish); } - private void PublishDiff(Gossip oldGossip, Gossip newGossip, Action pub) + private void PublishDiff(MembershipState oldState, MembershipState newState, Action pub) { - foreach (var @event in ClusterEvent.DiffMemberEvents(oldGossip, newGossip)) pub(@event); - foreach (var @event in ClusterEvent.DiffUnreachable(oldGossip, newGossip, _selfUniqueAddress)) pub(@event); - foreach (var @event in ClusterEvent.DiffReachable(oldGossip, newGossip, _selfUniqueAddress)) pub(@event); - foreach (var @event in ClusterEvent.DiffLeader(oldGossip, newGossip, _selfUniqueAddress)) pub(@event); - foreach (var @event in ClusterEvent.DiffRolesLeader(oldGossip, newGossip, _selfUniqueAddress)) pub(@event); + foreach (var @event in ClusterEvent.DiffMemberEvents(oldState, newState)) pub(@event); + foreach (var @event in ClusterEvent.DiffUnreachable(oldState, newState)) pub(@event); + foreach (var @event in ClusterEvent.DiffReachable(oldState, newState)) pub(@event); + foreach (var @event in ClusterEvent.DiffLeader(oldState, newState)) pub(@event); + foreach (var @event in ClusterEvent.DiffRolesLeader(oldState, newState)) pub(@event); // publish internal SeenState for testing purposes - foreach (var @event in ClusterEvent.DiffSeen(oldGossip, newGossip, _selfUniqueAddress)) pub(@event); - foreach (var @event in ClusterEvent.DiffReachability(oldGossip, newGossip)) pub(@event); + foreach (var @event in ClusterEvent.DiffSeen(oldState, newState)) pub(@event); + foreach (var @event in ClusterEvent.DiffReachability(oldState, newState)) pub(@event); } private void PublishInternalStats(ClusterEvent.CurrentInternalStats currentStats) @@ -1141,7 +1112,7 @@ private void Publish(object @event) private void ClearState() { - _latestGossip = Gossip.Empty; + _membershipState = _emptyMembershipState; } } } diff --git a/src/core/Akka.Cluster/Gossip.cs b/src/core/Akka.Cluster/Gossip.cs index 0a680a4d939..09104aef329 100644 --- a/src/core/Akka.Cluster/Gossip.cs +++ b/src/core/Akka.Cluster/Gossip.cs @@ -68,24 +68,6 @@ public static Gossip Create(ImmutableSortedSet members) return Empty.Copy(members: members); } - private static readonly ImmutableHashSet LeaderMemberStatus = - ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving); - - private static readonly ImmutableHashSet ConvergenceMemberStatus = - ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving); - - /// - /// If there are unreachable members in the cluster with any of these statuses, they will be skipped during convergence checks. - /// - public static readonly ImmutableHashSet ConvergenceSkipUnreachableWithMemberStatus = - ImmutableHashSet.Create(MemberStatus.Down, MemberStatus.Exiting); - - /// - /// If there are unreachable members in the cluster with any of these statuses, they will be pruned from the local gossip - /// - public static readonly ImmutableHashSet RemoveUnreachableWithMemberStatus = - ImmutableHashSet.Create(MemberStatus.Down, MemberStatus.Exiting); - readonly ImmutableSortedSet _members; readonly GossipOverview _overview; readonly VectorClock _version; @@ -134,7 +116,7 @@ public Gossip(ImmutableSortedSet members, GossipOverview overview, Vecto ReachabilityExcludingDownedObservers = new Lazy(() => { - var downed = Members.Where(m => m.Status == MemberStatus.Down).ToList(); + var downed = Members.Where(m => m.Status == MemberStatus.Down); return Overview.Reachability.RemoveObservers(downed.Select(m => m.UniqueAddress).ToImmutableHashSet()); }); @@ -291,92 +273,11 @@ public Gossip Merge(Gossip that) return new Gossip(mergedMembers, new GossipOverview(mergedSeen, mergedReachability), mergedVClock); } - - /// - /// First check that: - /// 1. we don't have any members that are unreachable, or - /// 2. all unreachable members in the set have status DOWN or EXITING - /// Else we can't continue to check for convergence. When that is done - /// we check that all members with a convergence status is in the seen - /// table and has the latest vector clock version. - /// - /// The unique address of the node checking for convergence. - /// The set of nodes who have been confirmed to be exiting. - /// true if convergence has been achieved. false otherwise. - public bool Convergence(UniqueAddress selfUniqueAddress, HashSet exitingConfirmed) - { - var unreachable = ReachabilityExcludingDownedObservers.Value.AllUnreachableOrTerminated - .Where(node => node != selfUniqueAddress && !exitingConfirmed.Contains(node)) - .Select(GetMember); - - return unreachable.All(m => ConvergenceSkipUnreachableWithMemberStatus.Contains(m.Status)) - && !_members.Any(m => ConvergenceMemberStatus.Contains(m.Status) - && !(SeenByNode(m.UniqueAddress) || exitingConfirmed.Contains(m.UniqueAddress))); - } - /// /// TBD /// public Lazy ReachabilityExcludingDownedObservers { get; } - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - public bool IsLeader(UniqueAddress node, UniqueAddress selfUniqueAddress) - { - return Leader(selfUniqueAddress) == node && node != null; - } - - /// - /// TBD - /// - /// TBD - /// TBD - public UniqueAddress Leader(UniqueAddress selfUniqueAddress) - { - return LeaderOf(_members, selfUniqueAddress); - } - - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - public UniqueAddress RoleLeader(string role, UniqueAddress selfUniqueAddress) - { - var roleMembers = _members - .Where(m => m.HasRole(role)) - .ToImmutableSortedSet(); - - return LeaderOf(roleMembers, selfUniqueAddress); - } - - /// - /// Determine which node is the leader of the given range of members. - /// - /// All members in the cluster. - /// The address of the current node. - /// null if is empty. The of the leader otherwise. - public UniqueAddress LeaderOf(ImmutableSortedSet mbrs, UniqueAddress selfUniqueAddress) - { - var reachableMembers = (_overview.Reachability.IsAllReachable - ? mbrs.Where(m => m.Status != MemberStatus.Down) - : mbrs - .Where(m => m.Status != MemberStatus.Down && _overview.Reachability.IsReachable(m.UniqueAddress) || m.UniqueAddress == selfUniqueAddress)) - .ToImmutableSortedSet(); - - if (!reachableMembers.Any()) return null; - - var member = reachableMembers.FirstOrDefault(m => LeaderMemberStatus.Contains(m.Status)) ?? - reachableMembers.Min(Member.LeaderStatusOrdering); - - return member.UniqueAddress; - } - /// /// TBD /// @@ -445,6 +346,18 @@ public Gossip Prune(VectorClock.Node removedNode) return new Gossip(Members, Overview, newVersion); } + public Gossip MarkAsDown(Member member) + { + // replace member (changed status) + var newMembers = Members.Remove(member).Add(member.Copy(MemberStatus.Down)); + // remove nodes marked as DOWN from the 'seen' table + var newSeen = Overview.Seen.Remove(member.UniqueAddress); + + //update gossip overview + var newOverview = Overview.Copy(seen: newSeen); + return Copy(newMembers, overview: newOverview); + } + /// public override string ToString() { diff --git a/src/core/Akka.Cluster/Member.cs b/src/core/Akka.Cluster/Member.cs index 15971d7eb6f..047580c7ec2 100644 --- a/src/core/Akka.Cluster/Member.cs +++ b/src/core/Akka.Cluster/Member.cs @@ -370,7 +370,7 @@ public static ImmutableHashSet PickHighestPriority(IEnumerable a else { var m = g.First(); - if (!Gossip.RemoveUnreachableWithMemberStatus.Contains(m.Status)) acc.Add(m); + if (!MembershipState.RemoveUnreachableWithMemberStatus.Contains(m.Status)) acc.Add(m); } } return acc.ToImmutableHashSet(); diff --git a/src/core/Akka.Cluster/MembershipState.cs b/src/core/Akka.Cluster/MembershipState.cs new file mode 100644 index 00000000000..0b70aad5495 --- /dev/null +++ b/src/core/Akka.Cluster/MembershipState.cs @@ -0,0 +1,209 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2021 Lightbend Inc. +// Copyright (C) 2013-2021 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Immutable; +using System.Linq; +using Akka.Annotations; +using Akka.Util; + +namespace Akka.Cluster +{ + /// + /// INTERNAL API + /// + [InternalApi] + internal sealed class MembershipState : IEquatable + { + private static readonly ImmutableHashSet LeaderMemberStatus = + ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving); + + private static readonly ImmutableHashSet ConvergenceMemberStatus = + ImmutableHashSet.Create(MemberStatus.Up, MemberStatus.Leaving); + + /// + /// If there are unreachable members in the cluster with any of these statuses, they will be skipped during convergence checks. + /// + public static readonly ImmutableHashSet ConvergenceSkipUnreachableWithMemberStatus = + ImmutableHashSet.Create(MemberStatus.Down, MemberStatus.Exiting); + + /// + /// If there are unreachable members in the cluster with any of these statuses, they will be pruned from the local gossip + /// + public static readonly ImmutableHashSet RemoveUnreachableWithMemberStatus = + ImmutableHashSet.Create(MemberStatus.Down, MemberStatus.Exiting); + + public MembershipState(Gossip latestGossip, UniqueAddress selfUniqueAddress) + { + LatestGossip = latestGossip; + SelfUniqueAddress = selfUniqueAddress; + } + + private Member _selfMember = null; + + public Member SelfMember + { + get + { + if (_selfMember == null) + { + _selfMember = LatestGossip.GetMember(SelfUniqueAddress); + } + + return _selfMember; + } + } + + public Gossip LatestGossip { get; } + + public UniqueAddress SelfUniqueAddress { get; } + + public GossipOverview Overview => LatestGossip.Overview; + + public ImmutableSortedSet Members => LatestGossip.Members; + + /// + /// TODO: this will eventually need to be made DC-aware and tailored specifically to the current DC + /// + public Reachability DcReachability => Overview.Reachability; + + private Option _reachabilityExcludingDownedObservers = Option.None; + public Reachability DcReachabilityExcludingDownedObservers + { + get + { + if (_reachabilityExcludingDownedObservers.IsEmpty) + { + // TODO: adjust for data center + var membersToExclude = Members + .Where(x => x.Status == MemberStatus.Down) + .Select(x => x.UniqueAddress).ToImmutableHashSet(); + _reachabilityExcludingDownedObservers = Overview.Reachability.RemoveObservers(membersToExclude); + } + + return _reachabilityExcludingDownedObservers.Value; + } + } + + public bool IsReachableExcludingDownedObservers(UniqueAddress toAddress) + { + if (!LatestGossip.HasMember(toAddress)) return false; + + // TODO: check for multiple DCs + return LatestGossip.ReachabilityExcludingDownedObservers.Value.IsReachable(toAddress); + } + + public UniqueAddress Leader => LeaderOf(Members); + + public UniqueAddress LeaderOf(IImmutableSet mbrs) + { + var reachability = DcReachability; + var reachableMembers = (reachability.IsAllReachable + ? mbrs.Where(m => m.Status != MemberStatus.Down) + : mbrs + .Where(m => m.Status != MemberStatus.Down && reachability.IsReachable(m.UniqueAddress) || m.UniqueAddress == SelfUniqueAddress)) + .ToImmutableSortedSet(); + + if (!reachableMembers.Any()) return null; + + var member = reachableMembers.FirstOrDefault(m => LeaderMemberStatus.Contains(m.Status)) ?? + reachableMembers.Min(Member.LeaderStatusOrdering); + + return member.UniqueAddress; + } + + public bool IsLeader(UniqueAddress node) + { + return Leader != null && Leader.Equals(node); + } + + public UniqueAddress RoleLeader(string role) + { + return LeaderOf(Members.Where(x => x.HasRole(role)).ToImmutableHashSet()); + } + + /// + /// First check that: + /// 1. we don't have any members that are unreachable, or + /// 2. all unreachable members in the set have status DOWN or EXITING + /// Else we can't continue to check for convergence. When that is done + /// we check that all members with a convergence status is in the seen + /// table and has the latest vector clock version. + /// + /// The set of nodes who have been confirmed to be exiting. + /// true if convergence has been achieved. false otherwise. + public bool Convergence(IImmutableSet exitingConfirmed) + { + // If another member in the data center that is UP or LEAVING + // and has not seen this gossip or is exiting + // convergence cannot be reached + bool MemberHinderingConvergenceExists() + { + return Members.Any(x => ConvergenceMemberStatus.Contains(x.Status) + && !(LatestGossip.SeenByNode(x.UniqueAddress) || + exitingConfirmed.Contains(x.UniqueAddress))); + } + + // Find cluster members in the data center that are unreachable from other members of the data center + // excluding observations from members outside of the data center, that have status DOWN or is passed in as confirmed exiting. + var unreachable = DcReachabilityExcludingDownedObservers.AllUnreachableOrTerminated + .Where(node => node != SelfUniqueAddress && !exitingConfirmed.Contains(node)) + .Select(x => LatestGossip.GetMember(x)); + + // unreachables outside of the data center or with status DOWN or EXITING does not affect convergence + var allUnreachablesCanBeIgnored = + unreachable.All(m => ConvergenceSkipUnreachableWithMemberStatus.Contains(m.Status)); + + return allUnreachablesCanBeIgnored && !MemberHinderingConvergenceExists(); + } + + /// + /// Copies the current and marks the as Seen + /// by the . + /// + /// A new instance with the updated seen records. + public MembershipState Seen() => Copy(LatestGossip.Seen(SelfUniqueAddress)); + + public MembershipState Copy(Gossip gossip = null, UniqueAddress selfUniqueAddress = null) + { + return new MembershipState(gossip ?? LatestGossip, selfUniqueAddress ?? SelfUniqueAddress); + } + + public bool Equals(MembershipState other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return SelfUniqueAddress.Equals(other.SelfUniqueAddress) && LatestGossip.Equals(other.LatestGossip); + } + + public override bool Equals(object obj) + { + return ReferenceEquals(this, obj) || obj is MembershipState other && Equals(other); + } + + public override int GetHashCode() + { + unchecked + { + return (LatestGossip.GetHashCode() * 397) ^ SelfUniqueAddress.GetHashCode(); + } + } + + /// + /// Never gossip to self and not to node marked as unreachable by self (heartbeat + /// messages are not getting through so no point in trying to gossip). + /// + /// Nodes marked as unreachable by others are still valid targets for gossip. + /// + /// The node to check for gossip validity. + /// true if we can gossip to this node, false otherwise. + public bool ValidNodeForGossip(UniqueAddress node) + { + return !node.Equals(SelfUniqueAddress) && Overview.Reachability.IsReachable(SelfUniqueAddress, node); + } + } +}