From fb3eb71311c31db83739922dba7bf78f61148d86 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 27 May 2022 03:41:32 +0700 Subject: [PATCH] Backport of #5967 - Allow PersistentShardCoordinator to tolerate duplicate ShardHomeAllocated messages --- .../PersistentShardCoordinator.cs | 12 +++---- .../Akka.Cluster.Sharding/ShardCoordinator.cs | 34 +++++++++++++++---- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs index 3b3fc5cf843..46e59af24b4 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs @@ -88,14 +88,14 @@ protected override bool ReceiveRecover(object message) switch (evt) { case ShardRegionRegistered _: - State = State.Updated(evt); + State = State.Updated(evt, true); return true; case ShardRegionProxyRegistered _: - State = State.Updated(evt); + State = State.Updated(evt, true); return true; case ShardRegionTerminated regionTerminated: if (State.Regions.ContainsKey(regionTerminated.Region)) - State = State.Updated(evt); + State = State.Updated(evt, true); else //Log.Debug( // "{0}: ShardRegionTerminated, but region {1} was not registered. This inconsistency is due to that " + @@ -107,13 +107,13 @@ protected override bool ReceiveRecover(object message) return true; case ShardRegionProxyTerminated proxyTerminated: if (State.RegionProxies.Contains(proxyTerminated.RegionProxy)) - State = State.Updated(evt); + State = State.Updated(evt, true); return true; case ShardHomeAllocated _: - State = State.Updated(evt); + State = State.Updated(evt, true); return true; case ShardHomeDeallocated _: - State = State.Updated(evt); + State = State.Updated(evt, true); return true; case ShardCoordinatorInitialized _: // not used here diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs index 74643e3c647..e36e40496ff 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardCoordinator.cs @@ -1144,12 +1144,15 @@ public override string ToString() } /// - /// TBD + /// Feed an event into the ShardCoordinator state. /// - /// TBD - /// TBD - /// TBD - public CoordinatorState Updated(IDomainEvent e) + /// The event to process. + /// A flag to indicate if we're trying to recover state previously stored in the database. + /// We need to be more tolerant when this happens in the name of trying to accelerate recovery, so the system doesn't compromise + /// itself and go offline. + /// Thrown if an event is illegal in the current state. + /// An update copy of this state. + public CoordinatorState Updated(IDomainEvent e, bool isRecovering = false) { switch (e) { @@ -1188,7 +1191,26 @@ public CoordinatorState Updated(IDomainEvent e) if (!Regions.TryGetValue(message.Region, out var shardRegions)) throw new ArgumentException($"Region {message.Region} not registered: {this}", nameof(e)); if (Shards.ContainsKey(message.Shard)) - throw new ArgumentException($"Shard {message.Shard} already allocated: {this}", nameof(e)); + { + if(!isRecovering) + throw new ArgumentException($"Shard {message.Shard} already allocated: {this}", + nameof(e)); + + // per https://github.com/akkadotnet/akka.net/issues/5604 + // we're going to allow new value to overwrite previous + var newRegions = Regions; + var previousRegion = Shards[message.Shard]; + if (Regions.TryGetValue(previousRegion, out var previousShards)) + { + newRegions = newRegions.SetItem(previousRegion, + previousShards.Remove(message.Shard)); + } + var newUnallocatedShardsRecovery = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards; + return Copy( + shards: Shards.SetItem(message.Shard, message.Region), + regions: newRegions.SetItem(message.Region, shardRegions.Add(message.Shard)), + unallocatedShards: newUnallocatedShardsRecovery); + } var newUnallocatedShards = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards; return Copy(