-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Shard
failed to unwrap buffered messages correctly
#7452
Merged
Aaronontheweb
merged 14 commits into
akkadotnet:dev
from
Arkatufus:Fix-Shard-failed-to-unwrap-buffered-messages
Jan 13, 2025
Merged
Changes from 9 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
68c5af8
Add reproduction test
Arkatufus f2cf56a
Merge branch 'dev' into Fix-Shard-failed-to-unwrap-buffered-messages
Arkatufus c5f3416
Merge branch 'dev' into Fix-Shard-failed-to-unwrap-buffered-messages
Arkatufus 5a1d320
Fix code error
Arkatufus 2682ecf
Merge branch 'Fix-Shard-failed-to-unwrap-buffered-messages' of github…
Arkatufus 13e30cc
Improve code
Arkatufus b121a16
Add IShardingBufferMessageAdapter.UnApply
Arkatufus aea5f1f
Fix broken unit test
Arkatufus 6c9d9d9
Update API approval list
Arkatufus 3915446
Trim dead codes
Arkatufus 91e4034
Merge branch 'dev' into Fix-Shard-failed-to-unwrap-buffered-messages
Aaronontheweb b810103
Merge branch 'dev' into Fix-Shard-failed-to-unwrap-buffered-messages
Arkatufus 3370550
Merge branch 'dev' into Fix-Shard-failed-to-unwrap-buffered-messages
Aaronontheweb b797621
Merge branch 'dev' into Fix-Shard-failed-to-unwrap-buffered-messages
Arkatufus File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
250 changes: 250 additions & 0 deletions
250
src/contrib/cluster/Akka.Cluster.Sharding.Tests/WrappedShardBufferedMessageSpec.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,250 @@ | ||
// ----------------------------------------------------------------------- | ||
// <copyright file="WrappedShardBufferedMessageSpec.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2025 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
// ----------------------------------------------------------------------- | ||
|
||
using System.Collections.Immutable; | ||
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Cluster.Sharding.Internal; | ||
using Akka.Cluster.Tools.Singleton; | ||
using Akka.Configuration; | ||
using Akka.Event; | ||
using Akka.TestKit; | ||
using FluentAssertions; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
|
||
namespace Akka.Cluster.Sharding.Tests; | ||
|
||
public class WrappedShardBufferedMessageSpec: AkkaSpec | ||
{ | ||
#region Custom Classes | ||
|
||
private sealed class Message | ||
{ | ||
public static readonly Message Instance = new(); | ||
|
||
public string Payload { get; } = Msg; | ||
|
||
private Message() | ||
{ } | ||
|
||
public override string ToString() | ||
=> $"[Message: {Payload}]"; | ||
} | ||
|
||
private sealed class MyEnvelope : IWrappedMessage | ||
{ | ||
public MyEnvelope(object message) | ||
{ | ||
Message = message; | ||
} | ||
|
||
public object Message { get; } | ||
} | ||
|
||
private sealed class BufferMessageAdapter: IShardingBufferMessageAdapter | ||
{ | ||
public object Apply(object message, IActorContext context) | ||
=> new MyEnvelope(message); | ||
|
||
public object UnApply(object message, IActorContext context) | ||
{ | ||
return message is MyEnvelope envelope ? envelope.Message : message; | ||
} | ||
} | ||
|
||
private class EchoActor: UntypedActor | ||
{ | ||
private readonly ILoggingAdapter _log = Context.GetLogger(); | ||
protected override void OnReceive(object message) | ||
{ | ||
_log.Info($">>>> OnReceive {message}"); | ||
if(message is string) | ||
Sender.Tell(message); | ||
else | ||
Unhandled(message); | ||
} | ||
} | ||
|
||
private sealed class FakeRememberEntitiesProvider: IRememberEntitiesProvider | ||
{ | ||
private readonly IActorRef _probe; | ||
|
||
public FakeRememberEntitiesProvider(IActorRef probe) | ||
{ | ||
_probe = probe; | ||
} | ||
|
||
public Props CoordinatorStoreProps() => FakeCoordinatorStoreActor.Props(); | ||
|
||
public Props ShardStoreProps(string shardId) => FakeShardStoreActor.Props(shardId, _probe); | ||
} | ||
|
||
private class ShardStoreCreated | ||
{ | ||
public ShardStoreCreated(IActorRef store, string shardId) | ||
{ | ||
Store = store; | ||
ShardId = shardId; | ||
} | ||
|
||
public IActorRef Store { get; } | ||
public string ShardId { get; } | ||
} | ||
|
||
private class CoordinatorStoreCreated | ||
{ | ||
public CoordinatorStoreCreated(IActorRef store) | ||
{ | ||
Store = store; | ||
} | ||
|
||
public IActorRef Store { get; } | ||
} | ||
|
||
private class FakeShardStoreActor : ActorBase | ||
{ | ||
public static Props Props(string shardId, IActorRef probe) => Actor.Props.Create(() => new FakeShardStoreActor(shardId, probe)); | ||
|
||
private readonly string _shardId; | ||
private readonly IActorRef _probe; | ||
|
||
public FakeShardStoreActor(string shardId, IActorRef probe) | ||
{ | ||
_shardId = shardId; | ||
_probe = probe; | ||
Context.System.EventStream.Publish(new ShardStoreCreated(Self, shardId)); | ||
} | ||
|
||
protected override bool Receive(object message) | ||
{ | ||
switch (message) | ||
{ | ||
case RememberEntitiesShardStore.GetEntities: | ||
Sender.Tell(new RememberEntitiesShardStore.RememberedEntities(ImmutableHashSet<string>.Empty)); | ||
return true; | ||
case RememberEntitiesShardStore.Update m: | ||
_probe.Tell(new RememberEntitiesShardStore.UpdateDone(m.Started, m.Stopped)); | ||
return true; | ||
} | ||
return false; | ||
} | ||
} | ||
|
||
private class FakeCoordinatorStoreActor : ActorBase | ||
{ | ||
public static Props Props() => Actor.Props.Create(() => new FakeCoordinatorStoreActor()); | ||
|
||
public FakeCoordinatorStoreActor() | ||
{ | ||
Context.System.EventStream.Publish(new CoordinatorStoreCreated(Context.Self)); | ||
} | ||
|
||
protected override bool Receive(object message) | ||
{ | ||
switch (message) | ||
{ | ||
case RememberEntitiesCoordinatorStore.GetShards _: | ||
Sender.Tell(new RememberEntitiesCoordinatorStore.RememberedShards(ImmutableHashSet<string>.Empty)); | ||
return true; | ||
case RememberEntitiesCoordinatorStore.AddShard m: | ||
Sender.Tell(new RememberEntitiesCoordinatorStore.UpdateDone(m.ShardId)); | ||
return true; | ||
} | ||
return false; | ||
} | ||
} | ||
|
||
private static Config GetConfig() | ||
{ | ||
return ConfigurationFactory.ParseString(@" | ||
akka.loglevel=DEBUG | ||
akka.actor.provider = cluster | ||
akka.remote.dot-netty.tcp.port = 0 | ||
akka.cluster.sharding.state-store-mode = ddata | ||
akka.cluster.sharding.remember-entities = on | ||
|
||
# no leaks between test runs thank you | ||
akka.cluster.sharding.distributed-data.durable.keys = [] | ||
akka.cluster.sharding.verbose-debug-logging = on | ||
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on") | ||
|
||
.WithFallback(Sharding.ClusterSharding.DefaultConfig()) | ||
.WithFallback(DistributedData.DistributedData.DefaultConfig()) | ||
.WithFallback(ClusterSingleton.DefaultConfig()); | ||
} | ||
|
||
private class MessageExtractor: HashCodeMessageExtractor | ||
{ | ||
public MessageExtractor() : base(10) | ||
{ | ||
} | ||
|
||
public override string EntityId(object message) | ||
{ | ||
return message switch | ||
{ | ||
Message m => m.Payload, | ||
Passivate => Msg, | ||
_ => null | ||
}; | ||
} | ||
} | ||
|
||
#endregion | ||
|
||
private const string Msg = "hit"; | ||
private readonly IActorRef _shard; | ||
private IActorRef _store; | ||
|
||
public WrappedShardBufferedMessageSpec(ITestOutputHelper output) : base(GetConfig(), output) | ||
{ | ||
Sys.EventStream.Subscribe(TestActor, typeof(ShardStoreCreated)); | ||
Sys.EventStream.Subscribe(TestActor, typeof(CoordinatorStoreCreated)); | ||
|
||
_shard = ChildActorOf(Shard.Props( | ||
typeName: "test", | ||
shardId: "test", | ||
entityProps: _ => Props.Create(() => new EchoActor()), | ||
settings: ClusterShardingSettings.Create(Sys), | ||
extractor: new ExtractorAdapter(new MessageExtractor()), | ||
handOffStopMessage: PoisonPill.Instance, | ||
rememberEntitiesProvider: new FakeRememberEntitiesProvider(TestActor), | ||
bufferMessageAdapter: new BufferMessageAdapter())); | ||
} | ||
|
||
private async Task<RememberEntitiesShardStore.UpdateDone> ExpectShardStartup() | ||
{ | ||
var createdEvent = await ExpectMsgAsync<ShardStoreCreated>(); | ||
createdEvent.ShardId.Should().Be("test"); | ||
|
||
_store = createdEvent.Store; | ||
|
||
await ExpectMsgAsync<ShardInitialized>(); | ||
|
||
_shard.Tell(new ShardRegion.StartEntity(Msg)); | ||
|
||
return await ExpectMsgAsync<RememberEntitiesShardStore.UpdateDone>(); | ||
} | ||
|
||
[Fact(DisplayName = "Message wrapped in ShardingEnvelope, buffered by Shard, transformed by BufferMessageAdapter, must arrive in entity actor")] | ||
public async Task WrappedMessageDelivery() | ||
{ | ||
IgnoreMessages<ShardRegion.StartEntityAck>(); | ||
|
||
var continueMessage = await ExpectShardStartup(); | ||
|
||
// this message should be buffered | ||
_shard.Tell(new ShardingEnvelope(Msg, Msg)); | ||
await Task.Yield(); | ||
|
||
// Tell shard to continue processing | ||
_shard.Tell(continueMessage); | ||
|
||
await ExpectMsgAsync(Msg); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of testing the wrong thing here - the
Unapply
method should be handling this.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no its not, we're testing
MyEnvelope
, thatMessage
class is the actual message being sent, not astring
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, you're right, there's something wrong with the repro code