Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed: IWrappedMessage + IDeadLetterSuppression handling #7413

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions src/core/Akka.Tests/Actor/DeadLettersSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,47 @@

namespace Akka.Tests
{

public class WrappedMessagesSpec
{
private sealed record WrappedClass(object Message) : IWrappedMessage;

private sealed record WrappedSuppressedClass(object Message) : IWrappedMessage, IDeadLetterSuppression;

private sealed class SuppressedMessage : IDeadLetterSuppression
{

}


[Fact]
public void ShouldUnwrapWrappedMessage()
{
var message = new WrappedClass("chocolate-beans");
var unwrapped = WrappedMessage.Unwrap(message);
unwrapped.ShouldBe("chocolate-beans");
}

public static readonly TheoryData<object, bool> SuppressedMessages = new()
{
{new SuppressedMessage(), true},
{new WrappedClass(new SuppressedMessage()), true},
{new WrappedClass(new WrappedClass(new SuppressedMessage())), true},
{new WrappedClass(new WrappedClass("chocolate-beans")), false},
{new WrappedSuppressedClass("foo"), true},
{new WrappedClass(new WrappedSuppressedClass("chocolate-beans")), true},
{new WrappedClass("chocolate-beans"), false},
{"chocolate-beans", false}
};

[Theory]
[MemberData(nameof(SuppressedMessages))]
public void ShouldDetectIfWrappedMessageIsSuppressed(object message, bool expected)
{
var isSuppressed = WrappedMessage.IsDeadLetterSuppressedAnywhere(message, out _);
isSuppressed.ShouldBe(expected);
}
}

public class DeadLettersSpec : AkkaSpec
{
Expand All @@ -23,6 +64,34 @@ public async Task Can_send_messages_to_dead_letters()
Sys.DeadLetters.Tell("foobar");
await ExpectMsgAsync<DeadLetter>(deadLetter=>deadLetter.Message.Equals("foobar"));
}

private sealed record WrappedClass(object Message) : IWrappedMessage;

private sealed class SuppressedMessage : IDeadLetterSuppression
{

}

[Fact]
public async Task ShouldLogNormalWrappedMessages()
{
Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter));
Sys.DeadLetters.Tell(new WrappedClass("chocolate-beans"));

// this is just to make the test deterministic
await ExpectMsgAsync<DeadLetter>();
}

[Fact]
public async Task ShouldNotLogWrappedMessagesWithDeadLetterSuppression()
{
Sys.EventStream.Subscribe(TestActor, typeof(AllDeadLetters));
Sys.DeadLetters.Tell(new WrappedClass(new SuppressedMessage()));

// this is just to make the test deterministic
var msg = await ExpectMsgAsync<SuppressedDeadLetter>();
msg.Message.ToString()!.Contains("SuppressedMessage").ShouldBeTrue();
}
}
}

44 changes: 33 additions & 11 deletions src/core/Akka/Actor/BuiltInActors.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,27 @@ public static object Unwrap(object message)
}
return message;
}

/// <summary>
/// Is this message marked as "suppress dead letters" anywhere in its "wrap stack"
/// </summary>
internal static bool IsDeadLetterSuppressedAnywhere(object message, out IDeadLetterSuppression? suppressed)
{
var loopMessage = message;
while(loopMessage is not IDeadLetterSuppression && loopMessage is IWrappedMessage wm)
{
loopMessage = wm.Message;
}

if (loopMessage is IDeadLetterSuppression suppression)
{
suppressed = suppression;
return true;
}

suppressed = null;
return false;
}
}

/// <summary>
Expand Down Expand Up @@ -226,19 +247,20 @@ public DeadLetterActorRef(IActorRefProvider provider, ActorPath path, EventStrea
/// <exception cref="InvalidMessageException">This exception is thrown if the given <paramref name="message"/> is undefined.</exception>
protected override void TellInternal(object message, IActorRef sender)
{
if (message == null) throw new InvalidMessageException("Message is null");
var i = message as Identify;
if (i != null)
{
sender.Tell(new ActorIdentity(i.MessageId, ActorRefs.Nobody));
return;
}
var d = message as DeadLetter;
if (d != null)
switch (message)
{
if (!SpecialHandle(d.Message, d.Sender)) { _eventStream.Publish(d); }
return;
case null:
throw new InvalidMessageException("Message is null");
case Identify i:
sender.Tell(new ActorIdentity(i.MessageId, ActorRefs.Nobody));
return;
case DeadLetter d:
{
if (!SpecialHandle(d.Message, d.Sender)) { _eventStream.Publish(d); }
return;
}
}

if (!SpecialHandle(message, sender)) { _eventStream.Publish(new DeadLetter(message, sender.IsNobody() ? Provider.DeadLetters : sender, this)); }
}

Expand Down
16 changes: 9 additions & 7 deletions src/core/Akka/Actor/EmptyLocalActorRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ public override void SendSystemMessage(ISystemMessage message)
}

/// <summary>
/// TBD
/// Performs filtering on special messages that need different DeadLetter treatment.
/// </summary>
/// <param name="message">TBD</param>
/// <param name="sender">TBD</param>
/// <returns>TBD</returns>
protected virtual bool SpecialHandle(object message, IActorRef sender)
/// <param name="possiblyWrapped">The raw message payload.</param>
/// <param name="sender">The sender.</param>
/// <returns><c>true</c> if this method handled the message.</returns>
protected virtual bool SpecialHandle(object possiblyWrapped, IActorRef sender)
{
var message = WrappedMessage.Unwrap(possiblyWrapped);

if (message is Watch watch)
{
if (watch.Watchee.Equals(this) && !watch.Watcher.Equals(this))
Expand Down Expand Up @@ -123,9 +125,9 @@ protected virtual bool SpecialHandle(object message, IActorRef sender)
return true;
}

if (message is IDeadLetterSuppression deadLetterSuppression)
if (WrappedMessage.IsDeadLetterSuppressedAnywhere(possiblyWrapped, out var suppressed))
{
PublishSupressedDeadLetter(deadLetterSuppression, sender);
PublishSupressedDeadLetter(suppressed, sender);
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Event/UnhandledMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Akka.Event
/// <summary>
/// This message is published to the EventStream whenever an Actor receives a message it doesn't understand
/// </summary>
public sealed class UnhandledMessage : AllDeadLetters, IWrappedMessage
public sealed class UnhandledMessage : AllDeadLetters
{
/// <summary>
/// Initializes a new instance of the <see cref="UnhandledMessage" /> class.
Expand Down
Loading