From 3920e600f4e677aea0d7605fab2e485e63082da7 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 18 Dec 2024 10:58:51 -0600 Subject: [PATCH 1/6] repro for failed DeadLetter suppression with IWrappedMessage --- src/core/Akka.Tests/Actor/DeadLettersSpec.cs | 24 ++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/core/Akka.Tests/Actor/DeadLettersSpec.cs b/src/core/Akka.Tests/Actor/DeadLettersSpec.cs index 4ffc52bc6cd..2cf2b7d9c7d 100644 --- a/src/core/Akka.Tests/Actor/DeadLettersSpec.cs +++ b/src/core/Akka.Tests/Actor/DeadLettersSpec.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +using System; using System.Threading.Tasks; using Akka.Actor; using Akka.Event; @@ -23,6 +24,29 @@ public async Task Can_send_messages_to_dead_letters() Sys.DeadLetters.Tell("foobar"); await ExpectMsgAsync(deadLetter=>deadLetter.Message.Equals("foobar")); } + + private sealed record WrappedClass(object Message) : IWrappedMessage; + + private sealed class SuppressedMessage : IDeadLetterSuppression + { + + } + + [Fact] + public async Task ShouldLogNormalWrappedMessages() + { + Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter)); + Sys.DeadLetters.Tell(new WrappedClass("chocolate-beans")); + await ExpectMsgAsync(deadLetter=>deadLetter.Message.ToString()!.Contains("chocolate-beans")); + } + + [Fact] + public async Task ShouldNotLogWrappedMessagesWithDeadLetterSuppression() + { + Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter)); + Sys.DeadLetters.Tell(new WrappedClass(new SuppressedMessage())); + await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(50)); + } } } From 5a23638d4ffad97f263120d56b8b8f8e1d1bc041 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 18 Dec 2024 11:19:53 -0600 Subject: [PATCH 2/6] added reproduction specs for IWrappedMessage --- src/core/Akka.Tests/Actor/DeadLettersSpec.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/core/Akka.Tests/Actor/DeadLettersSpec.cs b/src/core/Akka.Tests/Actor/DeadLettersSpec.cs index 2cf2b7d9c7d..03ef6b67aac 100644 --- a/src/core/Akka.Tests/Actor/DeadLettersSpec.cs +++ b/src/core/Akka.Tests/Actor/DeadLettersSpec.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Event; @@ -37,15 +38,20 @@ public async Task ShouldLogNormalWrappedMessages() { Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter)); Sys.DeadLetters.Tell(new WrappedClass("chocolate-beans")); - await ExpectMsgAsync(deadLetter=>deadLetter.Message.ToString()!.Contains("chocolate-beans")); + + // this is just to make the test deterministic + await ExpectMsgAsync(); } [Fact] public async Task ShouldNotLogWrappedMessagesWithDeadLetterSuppression() { - Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter)); + Sys.EventStream.Subscribe(TestActor, typeof(AllDeadLetters)); Sys.DeadLetters.Tell(new WrappedClass(new SuppressedMessage())); - await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(50)); + + // this is just to make the test deterministic + var msg = await ExpectMsgAsync(); + msg.Message.ToString()!.Contains("SuppressedMessage").ShouldBeTrue(); } } } From a262e184882101f0b092ea339dc724f51f6f789e Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 18 Dec 2024 11:20:05 -0600 Subject: [PATCH 3/6] minor cleanup --- src/core/Akka/Actor/BuiltInActors.cs | 23 ++++++++++++----------- src/core/Akka/Event/UnhandledMessage.cs | 2 +- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/core/Akka/Actor/BuiltInActors.cs b/src/core/Akka/Actor/BuiltInActors.cs index 7ef187eb314..c2d67503da2 100644 --- a/src/core/Akka/Actor/BuiltInActors.cs +++ b/src/core/Akka/Actor/BuiltInActors.cs @@ -226,19 +226,20 @@ public DeadLetterActorRef(IActorRefProvider provider, ActorPath path, EventStrea /// This exception is thrown if the given is undefined. protected override void TellInternal(object message, IActorRef sender) { - if (message == null) throw new InvalidMessageException("Message is null"); - var i = message as Identify; - if (i != null) + switch (message) { - sender.Tell(new ActorIdentity(i.MessageId, ActorRefs.Nobody)); - return; - } - var d = message as DeadLetter; - if (d != null) - { - if (!SpecialHandle(d.Message, d.Sender)) { _eventStream.Publish(d); } - return; + case null: + throw new InvalidMessageException("Message is null"); + case Identify i: + sender.Tell(new ActorIdentity(i.MessageId, ActorRefs.Nobody)); + return; + case DeadLetter d: + { + if (!SpecialHandle(d.Message, d.Sender)) { _eventStream.Publish(d); } + return; + } } + if (!SpecialHandle(message, sender)) { _eventStream.Publish(new DeadLetter(message, sender.IsNobody() ? Provider.DeadLetters : sender, this)); } } diff --git a/src/core/Akka/Event/UnhandledMessage.cs b/src/core/Akka/Event/UnhandledMessage.cs index 8202f1dffd6..69cd68f2f99 100644 --- a/src/core/Akka/Event/UnhandledMessage.cs +++ b/src/core/Akka/Event/UnhandledMessage.cs @@ -12,7 +12,7 @@ namespace Akka.Event /// /// This message is published to the EventStream whenever an Actor receives a message it doesn't understand /// - public sealed class UnhandledMessage : AllDeadLetters, IWrappedMessage + public sealed class UnhandledMessage : AllDeadLetters { /// /// Initializes a new instance of the class. From d41798ad3dbff349c252a3922840d049912747ef Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 18 Dec 2024 11:39:07 -0600 Subject: [PATCH 4/6] fixed: no longer log `IWrappedMessage`s which contain any `DeadLetter`s --- src/core/Akka.Tests/Actor/DeadLettersSpec.cs | 43 +++++++++++++++++++- src/core/Akka/Actor/BuiltInActors.cs | 21 ++++++++++ src/core/Akka/Actor/EmptyLocalActorRef.cs | 16 ++++---- 3 files changed, 71 insertions(+), 9 deletions(-) diff --git a/src/core/Akka.Tests/Actor/DeadLettersSpec.cs b/src/core/Akka.Tests/Actor/DeadLettersSpec.cs index 03ef6b67aac..aabee988c9c 100644 --- a/src/core/Akka.Tests/Actor/DeadLettersSpec.cs +++ b/src/core/Akka.Tests/Actor/DeadLettersSpec.cs @@ -5,8 +5,6 @@ // //----------------------------------------------------------------------- -using System; -using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Event; @@ -15,6 +13,47 @@ namespace Akka.Tests { + + public class WrappedMessagesSpec + { + private sealed record WrappedClass(object Message) : IWrappedMessage; + + private sealed record WrappedSuppressedClass(object Message) : IWrappedMessage, IDeadLetterSuppression; + + private sealed class SuppressedMessage : IDeadLetterSuppression + { + + } + + + [Fact] + public void ShouldUnwrapWrappedMessage() + { + var message = new WrappedClass("chocolate-beans"); + var unwrapped = WrappedMessage.Unwrap(message); + unwrapped.ShouldBe("chocolate-beans"); + } + + public static readonly TheoryData SuppressedMessages = new() + { + {new SuppressedMessage(), true}, + {new WrappedClass(new SuppressedMessage()), true}, + {new WrappedClass(new WrappedClass(new SuppressedMessage())), true}, + {new WrappedClass(new WrappedClass("chocolate-beans")), false}, + {new WrappedSuppressedClass("foo"), true}, + {new WrappedClass(new WrappedSuppressedClass("chocolate-beans")), true}, + {new WrappedClass("chocolate-beans"), false}, + {"chocolate-beans", false} + }; + + [Theory] + [MemberData(nameof(SuppressedMessages))] + public void ShouldDetectIfWrappedMessageIsSuppressed(object message, bool expected) + { + var isSuppressed = WrappedMessage.IsDeadLetterSuppressedAnywhere(message, out _); + isSuppressed.ShouldBe(expected); + } + } public class DeadLettersSpec : AkkaSpec { diff --git a/src/core/Akka/Actor/BuiltInActors.cs b/src/core/Akka/Actor/BuiltInActors.cs index c2d67503da2..8adea6b41d3 100644 --- a/src/core/Akka/Actor/BuiltInActors.cs +++ b/src/core/Akka/Actor/BuiltInActors.cs @@ -197,6 +197,27 @@ public static object Unwrap(object message) } return message; } + + /// + /// Is this message marked as "suppress dead letters" anywhere in its "wrap stack" + /// + internal static bool IsDeadLetterSuppressedAnywhere(object message, out IDeadLetterSuppression? suppressed) + { + var loopMessage = message; + while(loopMessage is not IDeadLetterSuppression && loopMessage is IWrappedMessage wm) + { + loopMessage = wm.Message; + } + + if (loopMessage is IDeadLetterSuppression suppression) + { + suppressed = suppression; + return true; + } + + suppressed = null; + return false; + } } /// diff --git a/src/core/Akka/Actor/EmptyLocalActorRef.cs b/src/core/Akka/Actor/EmptyLocalActorRef.cs index be7ed417c11..22e117f7e19 100644 --- a/src/core/Akka/Actor/EmptyLocalActorRef.cs +++ b/src/core/Akka/Actor/EmptyLocalActorRef.cs @@ -78,13 +78,15 @@ public override void SendSystemMessage(ISystemMessage message) } /// - /// TBD + /// Performs filtering on special messages that need different DeadLetter treatment. /// - /// TBD - /// TBD - /// TBD - protected virtual bool SpecialHandle(object message, IActorRef sender) + /// The raw message payload. + /// The sender. + /// true if this method handled the message. + protected virtual bool SpecialHandle(object possiblyWrapped, IActorRef sender) { + var message = WrappedMessage.Unwrap(possiblyWrapped); + if (message is Watch watch) { if (watch.Watchee.Equals(this) && !watch.Watcher.Equals(this)) @@ -123,9 +125,9 @@ protected virtual bool SpecialHandle(object message, IActorRef sender) return true; } - if (message is IDeadLetterSuppression deadLetterSuppression) + if (WrappedMessage.IsDeadLetterSuppressedAnywhere(possiblyWrapped, out var suppressed)) { - PublishSupressedDeadLetter(deadLetterSuppression, sender); + PublishSupressedDeadLetter(suppressed, sender); return true; } From 4083e04d15df09d456fc3a275aec875631494f76 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 18 Dec 2024 11:44:10 -0600 Subject: [PATCH 5/6] fix data loss --- src/core/Akka/Actor/EmptyLocalActorRef.cs | 4 ++-- src/core/Akka/Event/DeadLetter.cs | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/core/Akka/Actor/EmptyLocalActorRef.cs b/src/core/Akka/Actor/EmptyLocalActorRef.cs index 22e117f7e19..279f912a80a 100644 --- a/src/core/Akka/Actor/EmptyLocalActorRef.cs +++ b/src/core/Akka/Actor/EmptyLocalActorRef.cs @@ -127,14 +127,14 @@ protected virtual bool SpecialHandle(object possiblyWrapped, IActorRef sender) if (WrappedMessage.IsDeadLetterSuppressedAnywhere(possiblyWrapped, out var suppressed)) { - PublishSupressedDeadLetter(suppressed, sender); + PublishSupressedDeadLetter(possiblyWrapped, sender); return true; } return false; } - private void PublishSupressedDeadLetter(IDeadLetterSuppression msg, IActorRef sender) + private void PublishSupressedDeadLetter(object msg, IActorRef sender) { _eventStream.Publish(new SuppressedDeadLetter(msg, sender.IsNobody() ? _provider.DeadLetters : sender, this)); } diff --git a/src/core/Akka/Event/DeadLetter.cs b/src/core/Akka/Event/DeadLetter.cs index f2175264a06..2cffd681a38 100644 --- a/src/core/Akka/Event/DeadLetter.cs +++ b/src/core/Akka/Event/DeadLetter.cs @@ -109,6 +109,21 @@ public SuppressedDeadLetter(IDeadLetterSuppression message, IActorRef sender, IA if (sender == null) throw new ArgumentNullException(nameof(sender), "SuppressedDeadLetter sender may not be null"); if (recipient == null) throw new ArgumentNullException(nameof(recipient), "SuppressedDeadLetter recipient may not be null"); } + + /// + /// Initializes a new instance of the class. + /// + /// The original message that could not be delivered. + /// The actor that sent the message. + /// The actor that was to receive the message. + /// + /// This exception is thrown when either the sender or the recipient is undefined. + /// + public SuppressedDeadLetter(object message, IActorRef sender, IActorRef recipient) : base(message, sender, recipient) + { + if (sender == null) throw new ArgumentNullException(nameof(sender), "SuppressedDeadLetter sender may not be null"); + if (recipient == null) throw new ArgumentNullException(nameof(recipient), "SuppressedDeadLetter recipient may not be null"); + } } /// From 299d96e5fc77d67f67f4cb54b29fa7b9bcf87620 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 18 Dec 2024 11:47:03 -0600 Subject: [PATCH 6/6] Revert "fix data loss" This reverts commit 4083e04d15df09d456fc3a275aec875631494f76. --- src/core/Akka/Actor/EmptyLocalActorRef.cs | 4 ++-- src/core/Akka/Event/DeadLetter.cs | 15 --------------- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/src/core/Akka/Actor/EmptyLocalActorRef.cs b/src/core/Akka/Actor/EmptyLocalActorRef.cs index 279f912a80a..22e117f7e19 100644 --- a/src/core/Akka/Actor/EmptyLocalActorRef.cs +++ b/src/core/Akka/Actor/EmptyLocalActorRef.cs @@ -127,14 +127,14 @@ protected virtual bool SpecialHandle(object possiblyWrapped, IActorRef sender) if (WrappedMessage.IsDeadLetterSuppressedAnywhere(possiblyWrapped, out var suppressed)) { - PublishSupressedDeadLetter(possiblyWrapped, sender); + PublishSupressedDeadLetter(suppressed, sender); return true; } return false; } - private void PublishSupressedDeadLetter(object msg, IActorRef sender) + private void PublishSupressedDeadLetter(IDeadLetterSuppression msg, IActorRef sender) { _eventStream.Publish(new SuppressedDeadLetter(msg, sender.IsNobody() ? _provider.DeadLetters : sender, this)); } diff --git a/src/core/Akka/Event/DeadLetter.cs b/src/core/Akka/Event/DeadLetter.cs index 2cffd681a38..f2175264a06 100644 --- a/src/core/Akka/Event/DeadLetter.cs +++ b/src/core/Akka/Event/DeadLetter.cs @@ -109,21 +109,6 @@ public SuppressedDeadLetter(IDeadLetterSuppression message, IActorRef sender, IA if (sender == null) throw new ArgumentNullException(nameof(sender), "SuppressedDeadLetter sender may not be null"); if (recipient == null) throw new ArgumentNullException(nameof(recipient), "SuppressedDeadLetter recipient may not be null"); } - - /// - /// Initializes a new instance of the class. - /// - /// The original message that could not be delivered. - /// The actor that sent the message. - /// The actor that was to receive the message. - /// - /// This exception is thrown when either the sender or the recipient is undefined. - /// - public SuppressedDeadLetter(object message, IActorRef sender, IActorRef recipient) : base(message, sender, recipient) - { - if (sender == null) throw new ArgumentNullException(nameof(sender), "SuppressedDeadLetter sender may not be null"); - if (recipient == null) throw new ArgumentNullException(nameof(recipient), "SuppressedDeadLetter recipient may not be null"); - } } ///