Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sharding tolerant reader #5976

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,9 @@ public IImmutableSet<ShardId> AllShards
/// Feed an event into the ShardCoordinator state.
/// </summary>
/// <param name="e">The event to process.</param>
/// <param name="isRecovering">A flag to indicate if we're trying to recover state previously stored in the database.
/// We need to be more tolerant when this happens in the name of trying to accelerate recovery, so the system doesn't compromise
/// itself and go offline.</param>
/// <exception cref="ArgumentException">Thrown if an event is illegal in the current state.</exception>
/// <returns>An update copy of this state.</returns>
public State Updated(IDomainEvent e, bool isRecovering = false)
public State Updated(IDomainEvent e)
{
switch (e)
{
Expand Down Expand Up @@ -156,27 +153,7 @@ public State Updated(IDomainEvent e, bool isRecovering = false)
if (!Regions.TryGetValue(message.Region, out var shardRegions))
throw new ArgumentException($"Region {message.Region} not registered", nameof(e));
if (Shards.ContainsKey(message.Shard))
{
if (!isRecovering)
throw new ArgumentException($"Shard {message.Shard} is already allocated",
nameof(e));

// per https://github.com/akkadotnet/akka.net/issues/5604
// we're going to allow new value to overwrite previous
var newRegions = Regions;
var previousRegion = Shards[message.Shard];
if (Regions.TryGetValue(previousRegion, out var previousShards))
{
newRegions = newRegions.SetItem(previousRegion,
previousShards.Remove(message.Shard));
}
var newUnallocatedShardsRecovery = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards;
return Copy(
shards: Shards.SetItem(message.Shard, message.Region),
regions: newRegions.SetItem(message.Region, shardRegions.Add(message.Shard)),
unallocatedShards: newUnallocatedShardsRecovery);
}

throw new ArgumentException($"Shard {message.Shard} is already allocated", nameof(e));

var newUnallocatedShards = RememberEntities ? UnallocatedShards.Remove(message.Shard) : UnallocatedShards;
return Copy(
Expand Down Expand Up @@ -1369,26 +1346,31 @@ protected override bool ReceiveRecover(object message)
switch (evt)
{
case ShardRegionRegistered _:
CurrentState = CurrentState.Updated(evt, true);
CurrentState = CurrentState.Updated(evt);
return true;
case ShardRegionProxyRegistered _:
CurrentState = CurrentState.Updated(evt, true);
CurrentState = CurrentState.Updated(evt);
return true;
case ShardRegionTerminated regionTerminated:
if (CurrentState.Regions.ContainsKey(regionTerminated.Region))
CurrentState = CurrentState.Updated(evt, true);
CurrentState = CurrentState.Updated(evt);
else
Log.Debug("ShardRegionTerminated but region {0} was not registered", regionTerminated.Region);
return true;
case ShardRegionProxyTerminated proxyTerminated:
if (CurrentState.RegionProxies.Contains(proxyTerminated.RegionProxy))
CurrentState = CurrentState.Updated(evt, true);
CurrentState = CurrentState.Updated(evt);
return true;
case ShardHomeAllocated _:
CurrentState = CurrentState.Updated(evt, true);
case ShardHomeAllocated homeAllocated:
// if we already have identical ShardHomeAllocated data, skip processing it
// addresses https://github.com/akkadotnet/akka.net/issues/5604
if (CurrentState.Shards.TryGetValue(homeAllocated.Shard, out var currentShardRegion)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had an identical duplicate message. Ignore it and continue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be much safer - cc @ismaelhamed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This looks more like what we've done in the past (basically delete newest ShardHomeAllocated events).

I think this is due to having two active clusters at the same time, probably because of a network partition during shutdown of some of the nodes.

There's another take of this one in which you get shards that have not being allocated yet: akka/akka#17955

But it's been ages since I've seen any of those, to be honest.

&& Equals(homeAllocated.Region, currentShardRegion))
return true;
CurrentState = CurrentState.Updated(evt);
return true;
case ShardHomeDeallocated _:
CurrentState = CurrentState.Updated(evt, true);
CurrentState = CurrentState.Updated(evt);
return true;
}
return false;
Expand Down