Skip to content

Commit

Permalink
Merge branch 'dev' into retry-support
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb authored Apr 13, 2021
2 parents 54513d0 + fc5b043 commit 799b419
Show file tree
Hide file tree
Showing 23 changed files with 1,590 additions and 2,903 deletions.
1 change: 0 additions & 1 deletion build.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,6 @@ Target "Protobuf" <| fun _ ->
|> append (sprintf "-I=%s" (__SOURCE_DIRECTORY__ @@ "/src/protobuf/common") )
|> append (sprintf "--csharp_out=internal_access:%s" (__SOURCE_DIRECTORY__ @@ destinationPath))
|> append "--csharp_opt=file_extension=.g.cs"
|> append "--experimental_allow_proto3_optional"
|> append (__SOURCE_DIRECTORY__ @@ "/src/protobuf" @@ protoName)
|> toText

Expand Down
37 changes: 37 additions & 0 deletions docs/articles/clustering/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,43 @@ public sealed class MessageExtractor : HashCodeMessageExtractor

Using `ShardRegion.StartEntity` implies, that you're able to infer a shard id given an entity id alone. For this reason, in example above we modified a cluster sharding routing logic to make use of `HashCodeMessageExtractor` - in this variant, shard id doesn't have to be provided explicitly, as it will be computed from the hash of entity id itself. Notice a `maxNumberOfShards`, which is the maximum available number of shards allowed for this type of an actor - this value must never change during a single lifetime of a cluster.

### Terminating remembered entities
One complication that `akka.cluster.sharding.remember-entities = true` introduces is that your sharded entity actors can no longer be terminated through the normal Akka.NET channels, i.e. `Context.Stop(Self)`, `PoisonPill.Instance`, and the like. This is because as part of the `remember-entities` contract - the sharding system is going to insist on keeping all remembered entities alive until explictily told to stop.

To terminate a remembered entity, the sharded entity actor needs to send a [`Passivate` command](xref:Akka.Cluster.Sharding.Passivate) _to its parent actor_ in order to signal to the sharding system that we no longer need to remember this particular entity.

```csharp
protected override bool ReceiveCommand(object message)
{
switch (message)
{
case Increment _:
Persist(new CounterChanged(1), UpdateState);
return true;
case Decrement _:
Persist(new CounterChanged(-1), UpdateState);
return true;
case Get _:
Sender.Tell(_count);
return true;
case ReceiveTimeout _:
// send Passivate to parent (shard actor) to stop remembering this entity.
// shard actor will send us back a `Stop.Instance` message
// as our "shutdown" signal - at which point we can terminate normally.
Context.Parent.Tell(new Passivate(Stop.Instance));
return true;
case Stop _:
Context.Stop(Self);
return true;
}
return false;
}
```

It is common to simply use `Context.Parent.Tell(new Passivate(PoisonPill.Instance));` to passivate and shutdown remembered-entity actors.

To recreate a remembered entity actor after it has been passivated all you have to do is message the `ShardRegion` actor with a message containing the entity's `EntityId` again just like how you instantiated the actor the first time.

## Retrieving sharding state

You can inspect current sharding stats by using following messages:
Expand Down
2 changes: 1 addition & 1 deletion src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<HyperionVersion>0.9.17</HyperionVersion>
<NewtonsoftJsonVersion>13.0.1</NewtonsoftJsonVersion>
<NBenchVersion>2.0.1</NBenchVersion>
<ProtobufVersion>3.15.7</ProtobufVersion>
<ProtobufVersion>3.15.8</ProtobufVersion>
<NetCoreTestVersion>netcoreapp3.1</NetCoreTestVersion>
<NetTestVersion>net5.0</NetTestVersion>
<NetFrameworkTestVersion>net471</NetFrameworkTestVersion>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterShardingMessages.g.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: ClusterShardingMessages.proto
#pragma warning disable 1591, 0612, 3021
Expand Down
4 changes: 2 additions & 2 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ private Terminate()
/// reduce memory consumption. This is done by the application specific implementation of
/// the entity actors for example by defining receive timeout (<see cref="IActorContext.SetReceiveTimeout"/>).
/// If a message is already enqueued to the entity when it stops itself the enqueued message
/// in the mailbox will be dropped. To support graceful passivation without loosing such
/// in the mailbox will be dropped. To support graceful passivation without losing such
/// messages the entity actor can send this <see cref="Passivate"/> message to its parent <see cref="ShardRegion"/>.
/// The specified wrapped <see cref="StopMessage"/> will be sent back to the entity, which is
/// then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion`
/// between reception of <see cref="Passivate"/> and termination of the entity. Such buffered messages
/// are thereafter delivered to a new incarnation of the entity.
///
/// <see cref="PoisonPill"/> is a perfectly fine <see cref="StopMessage"/>.
/// <see cref="PoisonPill.Instance"/> is a perfectly fine <see cref="StopMessage"/>.
/// </summary>
[Serializable]
public sealed class Passivate : IShardRegionCommand
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterClientMessages.g.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: ClusterClientMessages.proto
#pragma warning disable 1591, 0612, 3021
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
//-----------------------------------------------------------------------
// <copyright file="DistributedPubSubMessages.g.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: DistributedPubSubMessages.proto
#pragma warning disable 1591, 0612, 3021
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

using System;
using System.Collections.Immutable;
using System.Linq;
using Akka.Actor;
using Akka.Cluster;
using Akka.Cluster.TestKit;
using Akka.Configuration;
using Akka.Remote.TestKit;
Expand Down Expand Up @@ -48,31 +50,41 @@ public class DurablePruningSpec : MultiNodeClusterSpec
private readonly GCounterKey keyA = new GCounterKey("A");
private readonly IActorRef replicator;

protected DurablePruningSpec() : this(new DurablePruningSpecConfig())
public DurablePruningSpec() : this(new DurablePruningSpecConfig())
{
}

protected DurablePruningSpec(DurablePruningSpecConfig config) : base(config, typeof(DurablePruningSpec))
{
InitialParticipantsValueFactory = Roles.Count;
cluster = Akka.Cluster.Cluster.Get(Sys);
replicator = StartReplicator(Sys);
timeout = Dilated(TimeSpan.FromSeconds(5));
}

protected override int InitialParticipantsValueFactory { get; }
protected override int InitialParticipantsValueFactory => Roles.Count;

[MultiNodeFact(Skip = "FIXME")]
[MultiNodeFact]
public void Pruning_of_durable_CRDT_should_move_data_from_removed_node()
{
Join(first, first);
Join(second, first);

var sys2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config);
var cluster2 = Akka.Cluster.Cluster.Get(sys2);
var distributedData2 = DistributedData.Get(sys2);
var replicator2 = StartReplicator(sys2);
var probe2 = new TestProbe(sys2, new XunitAssertions());
cluster2.Join(Node(first).Address);

AwaitAssert(() =>
{
cluster.State.Members.Count.ShouldBe(4);
cluster.State.Members.All(m => m.Status == MemberStatus.Up).ShouldBe(true);
cluster2.State.Members.Count.ShouldBe(4);
cluster2.State.Members.All(m => m.Status == MemberStatus.Up).ShouldBe(true);
}, TimeSpan.FromSeconds(10));
EnterBarrier("joined");

Within(TimeSpan.FromSeconds(5), () => AwaitAssert(() =>
{
replicator.Tell(Dsl.GetReplicaCount);
Expand All @@ -81,10 +93,10 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node()
probe2.ExpectMsg(new ReplicaCount(4));
}));

replicator.Tell(Dsl.Update(keyA, GCounter.Empty, WriteLocal.Instance, c => c.Increment(cluster)));
replicator.Tell(Dsl.Update(keyA, GCounter.Empty, WriteLocal.Instance, c => c.Increment(cluster, 3)));
ExpectMsg(new UpdateSuccess(keyA, null));

replicator2.Tell(Dsl.Update(keyA, GCounter.Empty, WriteLocal.Instance, c => c.Increment(cluster2, 2)), probe2.Ref);
replicator2.Tell(Dsl.Update(keyA, GCounter.Empty, WriteLocal.Instance, c => c.Increment(cluster2.SelfUniqueAddress, 2)), probe2.Ref);
probe2.ExpectMsg(new UpdateSuccess(keyA, null));

EnterBarrier("updates-done");
Expand Down Expand Up @@ -135,8 +147,9 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node()

RunOn(() =>
{
var addr = cluster2.SelfAddress;
var sys3 = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString(@"
var address = cluster2.SelfAddress;
var sys3 = ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString($@"
akka.remote.dot-netty.tcp.port = {address.Port}
").WithFallback(Sys.Settings.Config));
var cluster3 = Akka.Cluster.Cluster.Get(sys3);
var replicator3 = StartReplicator(sys3);
Expand All @@ -151,20 +164,28 @@ public void Pruning_of_durable_CRDT_should_move_data_from_removed_node()
replicator3.Tell(Dsl.Get(keyA, ReadLocal.Instance), probe3.Ref);
var counter4 = probe3.ExpectMsg<GetSuccess>().Get(keyA);
var value = counter4.Value;
values.Add((int) value);
values = values.Add((int) value);
value.ShouldBe(10UL);
counter4.State.Count.ShouldBe(3);
});
values.ShouldBe(ImmutableHashSet.Create(10));
});

// all must at least have seen it as joining
AwaitAssert(() =>
{
cluster3.State.Members.Count.ShouldBe(4);
cluster3.State.Members.All(m => m.Status == MemberStatus.Up).ShouldBeTrue();
}, TimeSpan.FromSeconds(10));

// after merging with others
replicator3.Tell(Dsl.Get(keyA, new ReadAll(RemainingOrDefault)));
var counter5 = ExpectMsg<GetSuccess>().Get(keyA);
counter5.Value.ShouldBe(10UL);
counter5.State.Count.ShouldBe(3);

}, first);

EnterBarrier("sys3-started");

replicator.Tell(Dsl.Get(keyA, new ReadAll(RemainingOrDefault)));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
//-----------------------------------------------------------------------
// <copyright file="ReplicatedDataMessages.g.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: ReplicatedDataMessages.proto
#pragma warning disable 1591, 0612, 3021
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
//-----------------------------------------------------------------------
// <copyright file="ReplicatorMessages.g.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: ReplicatorMessages.proto
#pragma warning disable 1591, 0612, 3021
Expand All @@ -16,7 +9,7 @@
using scg = global::System.Collections.Generic;
namespace Akka.DistributedData.Serialization.Proto.Msg {

/// <summary>Holder for reflection information generated from replicatormessages.proto</summary>
/// <summary>Holder for reflection information generated from ReplicatorMessages.proto</summary>
internal static partial class ReplicatorMessagesReflection {

#region Descriptor
Expand All @@ -29,7 +22,7 @@ internal static partial class ReplicatorMessagesReflection {
static ReplicatorMessagesReflection() {
byte[] descriptorData = global::System.Convert.FromBase64String(
string.Concat(
"ChhyZXBsaWNhdG9ybWVzc2FnZXMucHJvdG8SLEFra2EuRGlzdHJpYnV0ZWRE",
"ChhSZXBsaWNhdG9yTWVzc2FnZXMucHJvdG8SLEFra2EuRGlzdHJpYnV0ZWRE",
"YXRhLlNlcmlhbGl6YXRpb24uUHJvdG8uTXNnIp0CCgNHZXQSRwoDa2V5GAEg",
"ASgLMjouQWtrYS5EaXN0cmlidXRlZERhdGEuU2VyaWFsaXphdGlvbi5Qcm90",
"by5Nc2cuT3RoZXJNZXNzYWdlEhMKC2NvbnNpc3RlbmN5GAIgASgREg8KB3Rp",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private static byte[] JoinToByteArray(InternalClusterAction.Join join)
private static InternalClusterAction.Join JoinFrom(byte[] bytes)
{
var join = Proto.Msg.Join.Parser.ParseFrom(bytes);
AppVersion ver = join.HasAppVersion ? AppVersion.Create(join.AppVersion) : AppVersion.Zero;
var ver = !string.IsNullOrEmpty(join.AppVersion) ? AppVersion.Create(join.AppVersion) : AppVersion.Zero;
return new InternalClusterAction.Join(UniqueAddressFrom(join.Node), join.Roles.ToImmutableHashSet(), ver);
}

Expand Down
Loading

0 comments on commit 799b419

Please sign in to comment.