-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
akka.cluster.sharding - storage api update #4629
Conversation
dafef2f
to
05a658c
Compare
765616c
to
1956620
Compare
This should be ready to review.
which can use different persistent mode configured via
There should be no breaking changes from user perspective. Only some internal messages/objects were moved. |
Log Dropped from DeadLetterListener
* make use of the existing logging of dead letter also for UnhandledMessage Supress ActorSelectionMessage with DeadLetterSuppression (migrated from akka/akka#28341) * for example the Cluster InitJoin message is marked with DeadLetterSuppression but was anyway logged because sent with actorSelection * for other WrappedMessage than ActorSelectionMessage we shouldn't unwrap and publish the inner in SuppressedDeadLetter because that might loose some information * therefore those are silenced in the DeadLetterListener instead Better deadLetter logging of wrapped messages (migrated from akka/akka#28253)
1956620
to
aee94d3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made it through 38/104 files - going to work on the rest tomorrow. Liking what I'm seeing so far but it's only been a review of the tests.
|
||
namespace Akka.Cluster.Sharding.Tests | ||
{ | ||
public class ClusterShardCoordinatorDowningSpecConfig : MultiNodeClusterShardingConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Liking the common base class for testing sharding. Makes a lot of sense that we add that.
} | ||
# don't leak ddata state across runs | ||
akka.cluster.sharding.distributed-data.durable.keys = [] | ||
akka.persistence.journal.leveldb-shared.store.native = off |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a JVM setting? Pretty sure we don't have any LevelDb support.
EnterBarrier("received failed stats from timed out shards vs empty"); | ||
} | ||
|
||
private void Querying_cluster_sharding_must_return_shard_state_of_sharding_regions_if_one_or_more_shards_timeout_versus_all_as_empty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice - I've run into this issue before with pbm cluster-sharding region stats -r {regionName}
before; glad this has been fixed.
"""); | ||
} | ||
|
||
akka.persistence.snapshot-store.plugin = ""akka.persistence.memory-snapshot-store-shared"" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
} | ||
|
||
[Fact] | ||
public void ClusterShardingMessageSerializer_must_serialize_ShardRegionQuery() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - all of the new tests in this class look fine, but I haven't checked the underlying serializer impl itself which is where I know @Arkatufus raised some points a while back.
some of these specs are failing - we'll investigate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
65/104
src/contrib/cluster/Akka.Cluster.Sharding.Tests/EntityTerminationSpec.cs
Show resolved
Hide resolved
src/contrib/cluster/Akka.Cluster.Sharding.Tests/PersistentShardingMigrationSpec.cs
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
70/104
private readonly bool _rememberEntities; | ||
private sealed class RememberEntitiesStoreStopped | ||
{ | ||
public static RememberEntitiesStoreStopped Instance = new RememberEntitiesStoreStopped(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to make this readonly
(nitpick)
replicator, | ||
majorityMinCap, | ||
rememberEntitiesStoreProvider)) | ||
.WithDeploy(Deploy.Local); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That'll make sure that we don't end up trying to serialize this actor when serialize-creators=on
- good catch
private bool _terminating = false; | ||
private readonly UniqueAddress _selfUniqueAddress; | ||
private readonly LWWRegisterKey<ShardCoordinator.CoordinatorState> _coordinatorStateKey; | ||
private ImmutableHashSet<(IActorRef, GetShardHome)> _getShardHomeRequests = ImmutableHashSet<(IActorRef, GetShardHome)>.Empty; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does GetShardHome
contain a "by value" GetHashCode
implementation? Otherwise this might be problematic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we do
akka.net/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs
Lines 445 to 451 in 21762b9
public override int GetHashCode() | |
{ | |
unchecked | |
{ | |
return Shard?.GetHashCode() ?? 0; | |
} | |
} |
case GetFailure m when m.Key.Equals(_coordinatorStateKey): | ||
_initialStateRetries++; | ||
var template = | ||
"{0}: The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout': {1} millis (retrying). Has ClusterSharding been started on all nodes?"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compiler might do this on the fly anyway but might as well declare a const
here
|
||
namespace Akka.Cluster.Sharding.External | ||
{ | ||
public class ClientTimeoutException : Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to add some XML-DOC comments here to explain how / why this error is raised
/// <summary> | ||
/// API May Change | ||
/// </summary> | ||
public sealed class ExternalShardAllocation : IExtension |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public sealed class ExternalShardAllocation : IExtension | |
[ApiMayChange] | |
public sealed class ExternalShardAllocation : IExtension |
using System; | ||
using System.Collections.Concurrent; | ||
using Akka.Actor; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using Akka.Annotations; | |
} | ||
|
||
/// <summary> | ||
/// uses a string primitive types are optimized in ddata to not serialize every entity separately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
return new LWWDictionaryKey<ShardId, string>($"external-sharding-{typeName}"); | ||
} | ||
|
||
private class DDataStateActor : ActorBase, IWithUnboundedStash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd appreciate an internal comment here explaining this actor's role in the design, as that may not be obvious to the next person who has to work on this
{ | ||
using ShardId = String; | ||
|
||
public class ExternalShardAllocationStrategy : IStartableAllocationStrategy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be sealed so we don't end up with multiple GetShardLocation
implementations?
Still going through this file by file, but I've left another review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the Akka.Annotations
usage here
{ | ||
using ShardId = String; | ||
|
||
public sealed class ShardLocations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to double check the protobuf compatibility, but otherwise this looks fine - I'm on the fence about doing this in v1.4. Might need to pull it in as part of v1.5.
{ | ||
_system = system; | ||
_typeName = typeName; | ||
_log = Logging.GetLogger(system, GetType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might not be a bad idea to enrich this logger name with the _typeName
using ShardId = String; | ||
|
||
/// <summary> | ||
/// Only intended for testing, not an extension point. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thank you
_replicator.Tell(Dsl.Get(_allShardsKey, _readMajority)); | ||
} | ||
|
||
protected override bool Receive(object message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/// By splitting the elements over 5 keys we can support 10000 entities per shard. | ||
/// The Gossip message size of 5 ORSet with 2000 ids is around 200 KiB. | ||
/// This is by intention not configurable because it's important to have the same | ||
/// configuration on each node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to benchmark our DData payload size to make sure that this is true in Akka.NET as well
WrittenMigrationMarker = writtenMigrationMarker; | ||
} | ||
|
||
public IImmutableSet<string> Shards { get; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably also be typed ShardId
{ | ||
return new ShardsQueryResult<T>( | ||
failed: ps.Where(i => i.task.IsCanceled || i.task.IsFaulted).Select(i => i.shard).ToImmutableHashSet(), | ||
responses: ps.Where(i => !i.task.IsCanceled && !i.task.IsFaulted).Select(i => i.task.Result).ToImmutableList(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering about the Task.Result
call here - I'll need to use the IDE for this, but I'd prefer it if we just passed the results of the task into this method rather than the Task
themselves. Difficult to tell from the PR review dialog.
@@ -21,6 +21,28 @@ namespace Akka.TestKit.Xunit2 | |||
/// </summary> | |||
public class TestKit : TestKitBase , IDisposable | |||
{ | |||
private class PrefixedOutput : ITestOutputHelper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great idea.
# When 'remember-entities' is enabled and the state store mode is ddata this controls | ||
# how the remembered entities and shards are stored. Possible values are "eventsourced" and "ddata" | ||
# Default is ddata for backwards compatibility. | ||
remember-entities-store = "ddata" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense
} | ||
|
||
message ShardRegionWithAddress{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know enough about Protobuf here to say for certain if renaming a .proto
file is a breaking change or not. I'm going to need to do some manual testing with these changes anyway.
Merged via #5857 |
Another big one ^^
This MR is updating sharding to current scala version, including unit tests and multinode tests.
What's still missing compared to scala:
ExternalShardAllocation
RemoveInternalClusterShardingData
appThere were quite many changes related to the storage api change, so basically some parts were completely rewritten. Especially
Shard
andShardCoordinator
.Also includes some pre-requirements to make some part easier, especially
MessageBuffer
andDropped
deadletter. Not sure if you'll like it.Tests are now usingsqlite
persistence providerTests are now using shared inmem persistence providers for both journal & snapshots to avoid issues with files
In theory, everything "should" be backwards compatible
Additionally included in this pr: