diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index 3fa6e599aa2..d2e27929c61 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -2691,6 +2691,7 @@ internal sealed class JoinSeedNodeProcess : UntypedActor private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly ImmutableList<Address> _seeds; + private readonly ImmutableList<Address> _otherSeeds; private readonly Address _selfAddress; private int _attempts = 0; @@ -2706,6 +2707,7 @@ public JoinSeedNodeProcess(ImmutableList<Address> seeds) { _selfAddress = Cluster.Get(Context.System).SelfAddress; _seeds = seeds; + _otherSeeds = _seeds.Remove(_selfAddress); if (seeds.IsEmpty || seeds.Head() == _selfAddress) throw new ArgumentException("Join seed node should not be done"); Context.SetReceiveTimeout(Cluster.Get(Context.System).Settings.SeedNodeTimeout); @@ -2725,46 +2727,53 @@ protected override void PreStart() /// <param name="message">TBD</param> protected override void OnReceive(object message) { - if (message is InternalClusterAction.JoinSeenNode) + switch (message) { - //send InitJoin to all seed nodes (except myself) - foreach (var path in _seeds.Where(x => x != _selfAddress) - .Select(y => Context.ActorSelection(Context.Parent.Path.ToStringWithAddress(y)))) + case InternalClusterAction.JoinSeenNode _: { - path.Tell(new InternalClusterAction.InitJoin()); + //send InitJoin to all seed nodes (except myself) + foreach (var path in _otherSeeds + .Select(y => Context.ActorSelection(Context.Parent.Path.ToStringWithAddress(y)))) + { + path.Tell(new InternalClusterAction.InitJoin()); + } + _attempts++; + break; } - _attempts++; - } - else if (message is InternalClusterAction.InitJoinAck) - { - //first InitJoinAck reply - var initJoinAck = (InternalClusterAction.InitJoinAck)message; - Context.Parent.Tell(new ClusterUserAction.JoinTo(initJoinAck.Address)); - Context.Become(Done); - } - else if (message is InternalClusterAction.InitJoinNack) { } //that seed was uninitialized - else if (message is ReceiveTimeout) - { - if (_attempts >= 2) - _log.Warning( - "Couldn't join seed nodes after [{0}] attempts, will try again. seed-nodes=[{1}]", - _attempts, string.Join(",", _seeds.Where(x => !x.Equals(_selfAddress)))); - //no InitJoinAck received - try again - Self.Tell(new InternalClusterAction.JoinSeenNode()); - } - else - { - Unhandled(message); + case InternalClusterAction.InitJoinAck initJoinAck: + //first InitJoinAck reply + Context.Parent.Tell(new ClusterUserAction.JoinTo(initJoinAck.Address)); + Context.Become(Done); + break; + case InternalClusterAction.InitJoinNack _: + break; //that seed was uninitialized + case ReceiveTimeout _: + { + if (_attempts >= 2) + _log.Warning( + "Couldn't join seed nodes after [{0}] attempts, will try again. seed-nodes=[{1}]", + _attempts, string.Join(",", _seeds.Where(x => !x.Equals(_selfAddress)))); + //no InitJoinAck received - try again + Self.Tell(new InternalClusterAction.JoinSeenNode()); + break; + } + default: + Unhandled(message); + break; } } private void Done(object message) { - if (message is InternalClusterAction.InitJoinAck) + switch (message) { - //already received one, skip the rest + case InternalClusterAction.InitJoinAck _: + //already received one, skip the rest + break; + case ReceiveTimeout _: + Context.Stop(Self); + break; } - else if (message is ReceiveTimeout) Context.Stop(Self); } } @@ -2784,6 +2793,7 @@ internal sealed class FirstSeedNodeProcess : UntypedActor { private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly ImmutableList<Address> _seeds; private ImmutableList<Address> _remainingSeeds; private readonly Address _selfAddress; private readonly Cluster _cluster; @@ -2791,9 +2801,9 @@ internal sealed class FirstSeedNodeProcess : UntypedActor private readonly ICancelable _retryTaskToken; /// <summary> - /// TBD + /// Launches a new instance of the "first seed node" joining process. /// </summary> - /// <param name="seeds">TBD</param> + /// <param name="seeds">The set of seed nodes to join.</param> /// <exception cref="ArgumentException"> /// This exception is thrown when either the number of specified <paramref name="seeds"/> is less than or equal to 1 /// or the first listed seed is a reference to the <see cref="IActorContext.System">IUntypedActorContext.System</see>'s address. @@ -2806,63 +2816,63 @@ public FirstSeedNodeProcess(ImmutableList<Address> seeds) if (seeds.Count <= 1 || seeds.Head() != _selfAddress) throw new ArgumentException("Join seed node should not be done"); + _seeds = seeds; _remainingSeeds = seeds.Remove(_selfAddress); _timeout = Deadline.Now + _cluster.Settings.SeedNodeTimeout; _retryTaskToken = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1), Self, new InternalClusterAction.JoinSeenNode(), Self); Self.Tell(new InternalClusterAction.JoinSeenNode()); } - /// <summary> - /// TBD - /// </summary> protected override void PostStop() { _retryTaskToken.Cancel(); } - /// <summary> - /// TBD - /// </summary> - /// <param name="message">TBD</param> protected override void OnReceive(object message) { - if (message is InternalClusterAction.JoinSeenNode) + switch (message) { - if (_timeout.HasTimeLeft) + case InternalClusterAction.JoinSeenNode _ when _timeout.HasTimeLeft: { // send InitJoin to remaining seed nodes (except myself) foreach (var seed in _remainingSeeds.Select( - x => Context.ActorSelection(Context.Parent.Path.ToStringWithAddress(x)))) + x => Context.ActorSelection(Context.Parent.Path.ToStringWithAddress(x)))) seed.Tell(new InternalClusterAction.InitJoin()); + break; } - else + case InternalClusterAction.JoinSeenNode _: { + if (_log.IsDebugEnabled) + { + _log.Debug("Couldn't join other seed nodes, will join myself. seed-nodes=[{0}]", string.Join(",", _seeds)); + } // no InitJoinAck received, initialize new cluster by joining myself Context.Parent.Tell(new ClusterUserAction.JoinTo(_selfAddress)); Context.Stop(Self); + break; } - } - else if (message is InternalClusterAction.InitJoinAck) - { - // first InitJoinAck reply, join existing cluster - var initJoinAck = (InternalClusterAction.InitJoinAck)message; - Context.Parent.Tell(new ClusterUserAction.JoinTo(initJoinAck.Address)); - Context.Stop(Self); - } - else if (message is InternalClusterAction.InitJoinNack) - { - var initJoinNack = (InternalClusterAction.InitJoinNack)message; - _remainingSeeds = _remainingSeeds.Remove(initJoinNack.Address); - if (_remainingSeeds.IsEmpty) - { - // initialize new cluster by joining myself when nacks from all other seed nodes - Context.Parent.Tell(new ClusterUserAction.JoinTo(_selfAddress)); + case InternalClusterAction.InitJoinAck initJoinAck: + _log.Info("Received InitJoinAck message from [{0}] to [{1}]", initJoinAck.Address, _selfAddress); + // first InitJoinAck reply, join existing cluster + Context.Parent.Tell(new ClusterUserAction.JoinTo(initJoinAck.Address)); Context.Stop(Self); + break; + case InternalClusterAction.InitJoinNack initJoinNack: + { + _log.Info("Received InitJoinNack message from [{0}] to [{1}]", initJoinNack.Address, _selfAddress); + _remainingSeeds = _remainingSeeds.Remove(initJoinNack.Address); + if (_remainingSeeds.IsEmpty) + { + // initialize new cluster by joining myself when nacks from all other seed nodes + Context.Parent.Tell(new ClusterUserAction.JoinTo(_selfAddress)); + Context.Stop(Self); + } + + break; } - } - else - { - Unhandled(message); + default: + Unhandled(message); + break; } } }