Skip to content

Commit

Permalink
Supress ActorSelectionMessage with DeadLetterSuppression (migrated from
Browse files Browse the repository at this point in the history
akka/akka#28341)

* for example the Cluster InitJoin message is marked with DeadLetterSuppression
  but was anyway logged because sent with actorSelection
* for other WrappedMessage than ActorSelectionMessage we shouldn't unwrap and publish
  the inner in SuppressedDeadLetter because that might loose some information
* therefore those are silenced in the DeadLetterListener instead

Better deadLetter logging of wrapped messages (migrated from akka/akka#28253)

Logging of UnhandledMessage (migrated from akka/akka#28414)
* make use of the existing logging of dead letter
  also for UnhandledMessage

Add Dropped to Akka.Actor (migrated partially from akka/akka#27160)
Log Dropped from DeadLetterListener
  • Loading branch information
zbynek001 committed Mar 31, 2021
1 parent 411a1e0 commit f65671b
Show file tree
Hide file tree
Showing 16 changed files with 406 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public MessageExtractor(int maxNumberOfShards)
/// and have the message types themselves carry identifiers.
/// </para>
/// </summary>
public sealed class ShardingEnvelope
public sealed class ShardingEnvelope: IWrappedMessage
{
public string EntityId { get; }
public object Message { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public override string ToString()
/// TBD
/// </summary>
[Serializable]
public sealed class Publish : IDistributedPubSubMessage, IEquatable<Publish>
public sealed class Publish : IDistributedPubSubMessage, IEquatable<Publish>, IWrappedMessage
{
/// <summary>
/// TBD
Expand Down Expand Up @@ -420,7 +420,7 @@ public override string ToString()
/// TBD
/// </summary>
[Serializable]
public sealed class Send : IDistributedPubSubMessage, IEquatable<Send>
public sealed class Send : IDistributedPubSubMessage, IEquatable<Send>, IWrappedMessage
{
/// <summary>
/// TBD
Expand Down Expand Up @@ -487,7 +487,7 @@ public override string ToString()
/// TBD
/// </summary>
[Serializable]
public sealed class SendToAll : IDistributedPubSubMessage, IEquatable<SendToAll>
public sealed class SendToAll : IDistributedPubSubMessage, IEquatable<SendToAll>, IWrappedMessage
{
/// <summary>
/// TBD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ namespace Akka.Cluster.Sharding
public Akka.Cluster.Sharding.ShardedDaemonProcessSettings WithRole(string role) { }
public Akka.Cluster.Sharding.ShardedDaemonProcessSettings WithShardingSettings(Akka.Cluster.Sharding.ClusterShardingSettings shardingSettings) { }
}
public sealed class ShardingEnvelope
public sealed class ShardingEnvelope : Akka.Actor.IWrappedMessage
{
public ShardingEnvelope(string entityId, object message) { }
public string EntityId { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ namespace Akka.Cluster.Tools.PublishSubscribe
public static Akka.Cluster.Tools.PublishSubscribe.GetTopics Instance { get; }
}
public interface IDistributedPubSubMessage { }
public sealed class Publish : Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable<Akka.Cluster.Tools.PublishSubscribe.Publish>
public sealed class Publish : Akka.Actor.IWrappedMessage, Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable<Akka.Cluster.Tools.PublishSubscribe.Publish>
{
public Publish(string topic, object message, bool sendOneMessageToEachGroup = False) { }
public object Message { get; }
Expand Down Expand Up @@ -258,7 +258,7 @@ namespace Akka.Cluster.Tools.PublishSubscribe
public override int GetHashCode() { }
public override string ToString() { }
}
public sealed class Send : Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable<Akka.Cluster.Tools.PublishSubscribe.Send>
public sealed class Send : Akka.Actor.IWrappedMessage, Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable<Akka.Cluster.Tools.PublishSubscribe.Send>
{
public Send(string path, object message, bool localAffinity = False) { }
public bool LocalAffinity { get; }
Expand All @@ -269,7 +269,7 @@ namespace Akka.Cluster.Tools.PublishSubscribe
public override int GetHashCode() { }
public override string ToString() { }
}
public sealed class SendToAll : Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable<Akka.Cluster.Tools.PublishSubscribe.SendToAll>
public sealed class SendToAll : Akka.Actor.IWrappedMessage, Akka.Cluster.Tools.PublishSubscribe.IDistributedPubSubMessage, System.IEquatable<Akka.Cluster.Tools.PublishSubscribe.SendToAll>
{
public SendToAll(string path, object message, bool excludeSelf = False) { }
public bool ExcludeSelf { get; }
Expand Down
25 changes: 18 additions & 7 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ namespace Akka.Actor
public void Tell(object message, Akka.Actor.IActorRef sender = null) { }
public override string ToString() { }
}
public class ActorSelectionMessage : Akka.Actor.IAutoReceivedMessage, Akka.Actor.IPossiblyHarmful
public class ActorSelectionMessage : Akka.Actor.IAutoReceivedMessage, Akka.Actor.IPossiblyHarmful, Akka.Actor.IWrappedMessage
{
public ActorSelectionMessage(object message, Akka.Actor.SelectionPathElement[] elements, bool wildCardFanOut = False) { }
public Akka.Actor.SelectionPathElement[] Elements { get; }
Expand Down Expand Up @@ -1145,6 +1145,10 @@ namespace Akka.Actor
Akka.Actor.ITimerScheduler Timers { get; set; }
}
public interface IWithUnboundedStash : Akka.Actor.IActorStash, Akka.Dispatch.IRequiresMessageQueue<Akka.Dispatch.IUnboundedDequeBasedMessageQueueSemantics> { }
public interface IWrappedMessage
{
object Message { get; }
}
public sealed class Identify : Akka.Actor.IAutoReceivedMessage, Akka.Actor.INotInfluenceReceiveTimeout
{
public Identify(object messageId) { }
Expand Down Expand Up @@ -1797,6 +1801,10 @@ namespace Akka.Actor
protected void RunTask(System.Func<System.Threading.Tasks.Task> action) { }
}
public delegate void UntypedReceive(object message);
public class static WrappedMessage
{
public static object Unwrap(object message) { }
}
}
namespace Akka.Actor.Dsl
{
Expand Down Expand Up @@ -2879,7 +2887,7 @@ namespace Akka.Event
{
protected ActorEventBus() { }
}
public abstract class AllDeadLetters
public abstract class AllDeadLetters : Akka.Actor.IWrappedMessage
{
protected AllDeadLetters(object message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
public object Message { get; }
Expand Down Expand Up @@ -2933,6 +2941,12 @@ namespace Akka.Event
protected virtual void Print(Akka.Event.LogEvent logEvent) { }
protected override bool Receive(object message) { }
}
public sealed class Dropped : Akka.Event.AllDeadLetters
{
public Dropped(object message, string reason, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
public Dropped(object message, string reason, Akka.Actor.IActorRef recipient) { }
public string Reason { get; }
}
public class DummyClassForStringSources
{
public DummyClassForStringSources() { }
Expand Down Expand Up @@ -3155,12 +3169,9 @@ namespace Akka.Event
public TraceLogger() { }
protected override void OnReceive(object message) { }
}
public sealed class UnhandledMessage
public sealed class UnhandledMessage : Akka.Event.AllDeadLetters, Akka.Actor.IWrappedMessage
{
public UnhandledMessage(object message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
public object Message { get; }
public Akka.Actor.IActorRef Recipient { get; }
public Akka.Actor.IActorRef Sender { get; }
}
public class Warning : Akka.Event.LogEvent
{
Expand Down Expand Up @@ -4088,7 +4099,7 @@ namespace Akka.Routing
public Akka.Util.ISurrogated FromSurrogate(Akka.Actor.ActorSystem system) { }
}
}
public sealed class ConsistentHashableEnvelope : Akka.Routing.RouterEnvelope, Akka.Routing.IConsistentHashable
public sealed class ConsistentHashableEnvelope : Akka.Routing.RouterEnvelope, Akka.Actor.IWrappedMessage, Akka.Routing.IConsistentHashable
{
public ConsistentHashableEnvelope(object message, object hashKey) { }
public object ConsistentHashKey { get; }
Expand Down
29 changes: 29 additions & 0 deletions src/core/Akka.Tests/Actor/DeadLetterSupressionSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ public void Must_suppress_message_from_default_dead_letters_logging_sent_to_dead
allDeadLetter.Recipient.Should().Be(deadActor);

allListener.ExpectNoMsg(200.Milliseconds());

// unwrap for ActorSelection
Sys.ActorSelection(deadActor.Path).Tell(new SuppressedMessage());
Sys.ActorSelection(deadActor.Path).Tell(new NormalMessage());

// the recipient ref isn't the same as deadActor here so only checking the message
deadLetter = deadListener.ExpectMsg<DeadLetter>();//
deadLetter.Message.Should().BeOfType<NormalMessage>();
suppressedDeadLetter = suppressedListener.ExpectMsg<SuppressedDeadLetter>();
suppressedDeadLetter.Message.Should().BeOfType<SuppressedMessage>();

deadListener.ExpectNoMsg(200.Milliseconds());
suppressedListener.ExpectNoMsg(200.Milliseconds());
}

[Fact]
Expand Down Expand Up @@ -123,6 +136,22 @@ public void Must_suppress_message_from_default_dead_letters_logging_sent_to_dead
deadListener.ExpectNoMsg(TimeSpan.Zero);
suppressedListener.ExpectNoMsg(TimeSpan.Zero);
allListener.ExpectNoMsg(TimeSpan.Zero);

// unwrap for ActorSelection
Sys.ActorSelection(Sys.DeadLetters.Path).Tell(new SuppressedMessage());
Sys.ActorSelection(Sys.DeadLetters.Path).Tell(new NormalMessage());

deadLetter = deadListener.ExpectMsg<DeadLetter>();
deadLetter.Message.Should().BeOfType<NormalMessage>();
deadLetter.Sender.Should().Be(TestActor);
deadLetter.Recipient.Should().Be(Sys.DeadLetters);
suppressedDeadLetter = suppressedListener.ExpectMsg<SuppressedDeadLetter>();
suppressedDeadLetter.Message.Should().BeOfType<SuppressedMessage>();
suppressedDeadLetter.Sender.Should().Be(TestActor);
suppressedDeadLetter.Recipient.Should().Be(Sys.DeadLetters);

deadListener.ExpectNoMsg(200.Milliseconds());
suppressedListener.ExpectNoMsg(200.Milliseconds());
}
}
}
68 changes: 58 additions & 10 deletions src/core/Akka.Tests/Actor/DeadLetterSuspensionSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,54 @@
using System.Threading;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using Akka.TestKit;
using Xunit;

namespace Akka.Tests.Actor
{
public class DeadLetterSuspensionSpec : AkkaSpec
{
private class Dropping : ActorBase
{
public static Props Props() => Akka.Actor.Props.Create(() => new Dropping());

protected override bool Receive(object message)
{
switch (message)
{
case int n:
Context.System.EventStream.Publish(new Dropped(n, "Don't like numbers", Self));
return true;
}
return false;
}
}

private class Unandled : ActorBase
{
public static Props Props() => Akka.Actor.Props.Create(() => new Unandled());

protected override bool Receive(object message)
{
switch (message)
{
case int n:
Unhandled(n);
return true;
}
return false;
}
}

private static readonly Config Config = ConfigurationFactory.ParseString(@"
akka.loglevel = INFO
akka.log-dead-letters = 3
akka.log-dead-letters = 4
akka.log-dead-letters-suspend-duration = 2s");

private readonly IActorRef _deadActor;
private readonly IActorRef _droppingActor;
private readonly IActorRef _unhandledActor;

public DeadLetterSuspensionSpec()
: base(Config)
Expand All @@ -29,10 +64,20 @@ public DeadLetterSuspensionSpec()
Watch(_deadActor);
_deadActor.Tell(PoisonPill.Instance);
ExpectTerminated(_deadActor);

_droppingActor = Sys.ActorOf(Dropping.Props(), "droppingActor");
_unhandledActor = Sys.ActorOf(Unandled.Props(), "unhandledActor");
}

private string ExpectedDeadLettersLogMessage(int count) =>
$"Message [{count.GetType().Name}] from {TestActor.Path} to {_deadActor.Path} was not delivered. [{count}] dead letters encountered";
$"Message [{count.GetType().Name}] from {TestActor} to {_deadActor} was not delivered. [{count}] dead letters encountered";

private string ExpectedDroppedLogMessage(int count) =>
$"Message [{count.GetType().Name}] to {_droppingActor} was dropped. Don't like numbers. [{count}] dead letters encountered";

private string ExpectedUnhandledLogMessage(int count) =>
$"Message [{count.GetType().Name}] from {TestActor} to {_unhandledActor} was unhandled. [{count}] dead letters encountered";


[Fact]
public void Must_suspend_dead_letters_logging_when_reaching_akka_log_dead_letters_and_then_re_enable()
Expand All @@ -42,28 +87,31 @@ public void Must_suspend_dead_letters_logging_when_reaching_akka_log_dead_letter
.Expect(1, () => _deadActor.Tell(1));

EventFilter
.Info(start: ExpectedDeadLettersLogMessage(2))
.Expect(1, () => _deadActor.Tell(2));
.Info(start: ExpectedDroppedLogMessage(2))
.Expect(1, () => _droppingActor.Tell(2));

EventFilter
.Info(start: ExpectedDeadLettersLogMessage(3) + ", no more dead letters will be logged in next")
.Expect(1, () => _deadActor.Tell(3));
.Info(start: ExpectedUnhandledLogMessage(3))
.Expect(1, () => _unhandledActor.Tell(3));

_deadActor.Tell(4);
EventFilter
.Info(start: ExpectedDeadLettersLogMessage(4) + ", no more dead letters will be logged in next")
.Expect(1, () => _deadActor.Tell(4));
_deadActor.Tell(5);
_droppingActor.Tell(6);

// let suspend-duration elapse
Thread.Sleep(2050);

// re-enabled
EventFilter
.Info(start: ExpectedDeadLettersLogMessage(6) + ", of which 2 were not logged")
.Expect(1, () => _deadActor.Tell(6));
.Info(start: ExpectedDeadLettersLogMessage(7) + ", of which 2 were not logged")
.Expect(1, () => _deadActor.Tell(7));

// reset count
EventFilter
.Info(start: ExpectedDeadLettersLogMessage(1))
.Expect(1, () => _deadActor.Tell(7));
.Expect(1, () => _deadActor.Tell(8));
}
}
}
1 change: 1 addition & 0 deletions src/core/Akka.Tests/Event/EventStreamSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ private static string GetDebugUnhandledMessagesConfig()
akka {
actor.serialize-messages = off
actor.debug.unhandled = on
log-dead-letters = off
stdout-loglevel = DEBUG
loglevel = DEBUG
loggers = [""%logger%""]
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Actor/ActorSelection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public override string ToString()
/// <summary>
/// Used to deliver messages via <see cref="ActorSelection"/>.
/// </summary>
public class ActorSelectionMessage : IAutoReceivedMessage, IPossiblyHarmful
public class ActorSelectionMessage : IAutoReceivedMessage, IPossiblyHarmful, IWrappedMessage
{
/// <summary>
/// Initializes a new instance of the <see cref="ActorSelectionMessage" /> class.
Expand Down
Loading

0 comments on commit f65671b

Please sign in to comment.