Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Shard failed to unwrap buffered messages correctly #7452

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public async Task Persistent_Shard_must_recover_from_failing_entity(Props entity
settings,
new TestMessageExtractor(),
PoisonPill.Instance,
provider
provider,
null
));

Sys.EventStream.Subscribe<Error>(TestActor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public object Apply(object message, IActorContext context)
_counter.IncrementAndGet();
return message;
}

public object UnApply(object message, IActorContext context)
{
return message;
}
}

private const string ShardTypeName = "Caat";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// -----------------------------------------------------------------------
// <copyright file="WrappedShardBufferedMessageSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Collections.Immutable;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Sharding.Internal;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Sharding.Tests;

public class WrappedShardBufferedMessageSpec: AkkaSpec
{
#region Custom Classes

private sealed class MyEnvelope : IWrappedMessage
{
public MyEnvelope(object message)
{
Message = message;
}

public object Message { get; }
}

private sealed class BufferMessageAdapter: IShardingBufferMessageAdapter
{
public object Apply(object message, IActorContext context)
=> new MyEnvelope(message);

public object UnApply(object message, IActorContext context)
{
return message is MyEnvelope envelope ? envelope.Message : message;
}
}

private class EchoActor: UntypedActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
protected override void OnReceive(object message)
{
_log.Info($">>>> OnReceive {message.GetType()}: {message}");
if(message is string)
Sender.Tell(message);
else
Unhandled(message);
}
}

private sealed class FakeRememberEntitiesProvider: IRememberEntitiesProvider
{
private readonly IActorRef _probe;

public FakeRememberEntitiesProvider(IActorRef probe)
{
_probe = probe;
}

public Props CoordinatorStoreProps() => FakeCoordinatorStoreActor.Props();

public Props ShardStoreProps(string shardId) => FakeShardStoreActor.Props(shardId, _probe);
}

private class ShardStoreCreated
{
public ShardStoreCreated(IActorRef store, string shardId)
{
Store = store;
ShardId = shardId;
}

public IActorRef Store { get; }
public string ShardId { get; }
}

private class CoordinatorStoreCreated
{
public CoordinatorStoreCreated(IActorRef store)
{
Store = store;
}

public IActorRef Store { get; }
}

private class FakeShardStoreActor : ActorBase
{
public static Props Props(string shardId, IActorRef probe) => Actor.Props.Create(() => new FakeShardStoreActor(shardId, probe));

private readonly string _shardId;
private readonly IActorRef _probe;

public FakeShardStoreActor(string shardId, IActorRef probe)
{
_shardId = shardId;
_probe = probe;
Context.System.EventStream.Publish(new ShardStoreCreated(Self, shardId));
}

protected override bool Receive(object message)
{
switch (message)
{
case RememberEntitiesShardStore.GetEntities:
Sender.Tell(new RememberEntitiesShardStore.RememberedEntities(ImmutableHashSet<string>.Empty));
return true;
case RememberEntitiesShardStore.Update m:
_probe.Tell(new RememberEntitiesShardStore.UpdateDone(m.Started, m.Stopped));
return true;
}
return false;
}
}

private class FakeCoordinatorStoreActor : ActorBase
{
public static Props Props() => Actor.Props.Create(() => new FakeCoordinatorStoreActor());

public FakeCoordinatorStoreActor()
{
Context.System.EventStream.Publish(new CoordinatorStoreCreated(Context.Self));
}

protected override bool Receive(object message)
{
switch (message)
{
case RememberEntitiesCoordinatorStore.GetShards _:
Sender.Tell(new RememberEntitiesCoordinatorStore.RememberedShards(ImmutableHashSet<string>.Empty));
return true;
case RememberEntitiesCoordinatorStore.AddShard m:
Sender.Tell(new RememberEntitiesCoordinatorStore.UpdateDone(m.ShardId));
return true;
}
return false;
}
}

private static Config GetConfig()
{
return ConfigurationFactory.ParseString(@"
akka.loglevel=DEBUG
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.cluster.sharding.state-store-mode = ddata
akka.cluster.sharding.remember-entities = on

# no leaks between test runs thank you
akka.cluster.sharding.distributed-data.durable.keys = []
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on")

.WithFallback(Sharding.ClusterSharding.DefaultConfig())
.WithFallback(DistributedData.DistributedData.DefaultConfig())
.WithFallback(ClusterSingleton.DefaultConfig());
}

#endregion

private const string Msg = "hit";
private readonly IActorRef _shard;
private IActorRef _store;

public WrappedShardBufferedMessageSpec(ITestOutputHelper output) : base(GetConfig(), output)
{
Sys.EventStream.Subscribe(TestActor, typeof(ShardStoreCreated));
Sys.EventStream.Subscribe(TestActor, typeof(CoordinatorStoreCreated));

_shard = ChildActorOf(Shard.Props(
typeName: "test",
shardId: "test",
entityProps: _ => Props.Create(() => new EchoActor()),
settings: ClusterShardingSettings.Create(Sys),
extractor: new ExtractorAdapter(HashCodeMessageExtractor.Create(10, m => m.ToString())),
handOffStopMessage: PoisonPill.Instance,
rememberEntitiesProvider: new FakeRememberEntitiesProvider(TestActor),
bufferMessageAdapter: new BufferMessageAdapter()));
}

private async Task<RememberEntitiesShardStore.UpdateDone> ExpectShardStartup()
{
var createdEvent = await ExpectMsgAsync<ShardStoreCreated>();
createdEvent.ShardId.Should().Be("test");

_store = createdEvent.Store;

await ExpectMsgAsync<ShardInitialized>();

_shard.Tell(new ShardRegion.StartEntity(Msg));

return await ExpectMsgAsync<RememberEntitiesShardStore.UpdateDone>();
}

[Fact(DisplayName = "Message wrapped in ShardingEnvelope, buffered by Shard, transformed by BufferMessageAdapter, must arrive in entity actor")]
public async Task WrappedMessageDelivery()
{
IgnoreMessages<ShardRegion.StartEntityAck>();

var continueMessage = await ExpectShardStartup();

// this message should be buffered
_shard.Tell(new ShardingEnvelope(Msg, Msg));
await Task.Yield();

// Tell shard to continue processing
_shard.Tell(continueMessage);

await ExpectMsgAsync(Msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace Akka.Cluster.Sharding;
public interface IShardingBufferMessageAdapter
{
public object Apply(object message, IActorContext context);
public object UnApply(object message, IActorContext context);
}

[InternalApi]
Expand All @@ -24,6 +25,8 @@ internal class EmptyBufferMessageAdapter: IShardingBufferMessageAdapter
private EmptyBufferMessageAdapter()
{
}

public object Apply(object message, IActorContext context) => message;

public object UnApply(object message, IActorContext context) => message;
}
13 changes: 8 additions & 5 deletions src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ public static Props Props(
ClusterShardingSettings settings,
IMessageExtractor extractor,
object handOffStopMessage,
IRememberEntitiesProvider? rememberEntitiesProvider)
IRememberEntitiesProvider? rememberEntitiesProvider,
IShardingBufferMessageAdapter? bufferMessageAdapter)
{
return Actor.Props.Create(() => new Shard(
typeName,
Expand All @@ -353,7 +354,8 @@ public static Props Props(
settings,
extractor,
handOffStopMessage,
rememberEntitiesProvider)).WithDeploy(Deploy.Local);
rememberEntitiesProvider,
bufferMessageAdapter)).WithDeploy(Deploy.Local);
}

[Serializable]
Expand Down Expand Up @@ -976,7 +978,8 @@ public Shard(
ClusterShardingSettings settings,
IMessageExtractor extractor,
object handOffStopMessage,
IRememberEntitiesProvider? rememberEntitiesProvider)
IRememberEntitiesProvider? rememberEntitiesProvider,
IShardingBufferMessageAdapter? bufferMessageAdapter)
{
_typeName = typeName;
_shardId = shardId;
Expand Down Expand Up @@ -1020,7 +1023,7 @@ public Shard(
_leaseRetryInterval = settings.LeaseSettings.LeaseRetryInterval;
}

_bufferMessageAdapter = ClusterSharding.Get(Context.System).BufferMessageAdapter;
_bufferMessageAdapter = bufferMessageAdapter ?? EmptyBufferMessageAdapter.Instance;
}

protected override SupervisorStrategy SupervisorStrategy()
Expand Down Expand Up @@ -2001,7 +2004,7 @@ private void SendMsgBuffer(EntityId entityId)
if (WrappedMessage.Unwrap(message) is ShardRegion.StartEntity se)
StartEntity(se.EntityId, @ref);
else
DeliverMessage(entityId, message, @ref);
DeliverMessage(entityId, _bufferMessageAdapter.UnApply(message, Context), @ref);
}

TouchLastMessageTimestamp(entityId);
Expand Down
3 changes: 2 additions & 1 deletion src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,8 @@ private IActorRef GetShard(ShardId id)
_settings,
_messageExtractor,
_handOffStopMessage,
_rememberEntitiesProvider)
_rememberEntitiesProvider,
_bufferMessageAdapter)
.WithDispatcher(Context.Props.Dispatcher), name));

_shardsByRef = _shardsByRef.SetItem(shardRef, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ namespace Akka.Cluster.Sharding
public interface IShardingBufferMessageAdapter
{
object Apply(object message, Akka.Actor.IActorContext context);
object UnApply(object message, Akka.Actor.IActorContext context);
}
public interface IStartableAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ namespace Akka.Cluster.Sharding
public interface IShardingBufferMessageAdapter
{
object Apply(object message, Akka.Actor.IActorContext context);
object UnApply(object message, Akka.Actor.IActorContext context);
}
public interface IStartableAllocationStrategy : Akka.Actor.INoSerializationVerificationNeeded, Akka.Cluster.Sharding.IShardAllocationStrategy
{
Expand Down
Loading