Skip to content

Commit

Permalink
Akka.Actor: tuck all scheduled Tell messages into IScheduledMsg e…
Browse files Browse the repository at this point in the history
…nvelope (#6461)

* added `IScheduledTellMsg` interface to make it easier to filter out scheduled messages

* fixed compilation issue

* Make sure that IScheduledTellMsg gets unwrapped everywhere

* Move TellInternal changes to the base class MinimalActorRef instead

* Fix FlowSplitWhenSpec async code

* Remove TellInterceptor, move code to ScheduledTell

---------

Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
  • Loading branch information
Aaronontheweb and Arkatufus authored Aug 25, 2023
1 parent 24cbbb3 commit a91bb5e
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Sharding.Internal;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Akka.TestKit.Xunit2.Attributes;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -140,7 +143,8 @@ public void RememberEntitiesStarter_must_inform_the_shard_when_entities_has_been
ExpectTerminated(rememberEntityStarter);
}

[LocalFact(SkipLocal = "Racy in Azure AzDo, strict timing does not work well on AzDo")]
// TODO: check the timing code to make sure that this actually works, it was flaky/racy even when run locally.
[LocalFact(SkipLocal = "Racy unit test, suspected bad code underneath")]
public void RememberEntitiesStarter_must_try_start_all_entities_in_a_throttled_way_with_entity_recovery_strategy_constant()
{
var regionProbe = CreateTestProbe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2134,6 +2134,11 @@ namespace Akka.Actor.Internal
public UnboundedStashImpl(Akka.Actor.IActorContext context) { }
}
}
namespace Akka.Actor.Scheduler
{
[Akka.Annotations.InternalApiAttribute()]
public interface IScheduledTellMsg : Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.IWrappedMessage { }
}
namespace Akka.Actor.Setup
{
public sealed class ActorSystemSetup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,11 @@ namespace Akka.Actor.Internal
public override int Capacity { get; }
}
}
namespace Akka.Actor.Scheduler
{
[Akka.Annotations.InternalApiAttribute()]
public interface IScheduledTellMsg : Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.IWrappedMessage { }
}
namespace Akka.Actor.Setup
{
public sealed class ActorSystemSetup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2159,6 +2159,11 @@ namespace Akka.Actor.Internal
public override int Capacity { get; }
}
}
namespace Akka.Actor.Scheduler
{
[Akka.Annotations.InternalApiAttribute()]
public interface IScheduledTellMsg : Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.IWrappedMessage { }
}
namespace Akka.Actor.Setup
{
public sealed class ActorSystemSetup
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ public RemoteDeadLetterActorRef(IActorRefProvider provider, ActorPath actorPath,
protected override void TellInternal(object message, IActorRef sender)
{
var deadLetter = message as DeadLetter;

if (message is EndpointManager.Send send)
{
if (send.Seq == null)
Expand Down
229 changes: 112 additions & 117 deletions src/core/Akka.Streams.Tests/Dsl/FlowSplitWhenSpec.cs

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions src/core/Akka/Actor/ActorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using Akka.Actor.Internal;
using Akka.Actor.Scheduler;
using Akka.Event;

namespace Akka.Actor
Expand Down Expand Up @@ -178,9 +179,9 @@ protected static IActorContext Context
/// <returns>TBD</returns>
protected internal virtual bool AroundReceive(Receive receive, object message)
{
if (message is Scheduler.TimerScheduler.ITimerMsg tm)
if (message is TimerScheduler.ITimerMsg tm)
{
if (this is IWithTimers withTimers && withTimers.Timers is Scheduler.TimerScheduler timers)
if (this is IWithTimers { Timers: TimerScheduler timers })
{
switch (timers.InterceptTimerMsg(Context.System.Log, tm))
{
Expand All @@ -192,7 +193,7 @@ protected internal virtual bool AroundReceive(Receive receive, object message)
// discard
return true;

case object m:
case var m:
if (this is IActorStash)
{
var actorCell = (ActorCell)Context;
Expand Down Expand Up @@ -242,8 +243,7 @@ protected internal virtual bool AroundReceive(Receive receive, object message)
/// </exception>
protected virtual void Unhandled(object message)
{
var terminatedMessage = message as Terminated;
if (terminatedMessage != null)
if (message is Terminated terminatedMessage)
{
throw new DeathPactException(terminatedMessage.ActorRef);
}
Expand Down
42 changes: 17 additions & 25 deletions src/core/Akka/Actor/ActorCell.DefaultMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Akka.Event;
using Debug = Akka.Event.Debug;
using System.Globalization;
using Akka.Actor.Scheduler;

namespace Akka.Actor
{
Expand Down Expand Up @@ -55,14 +56,16 @@ public int CurrentEnvelopeId
/// </exception>>
public void Invoke(Envelope envelope)
{

var message = envelope.Message;
var influenceReceiveTimeout = !(message is INotInfluenceReceiveTimeout);
if (message is IScheduledTellMsg scheduled)
message = scheduled.Message;

var influenceReceiveTimeout = message is not INotInfluenceReceiveTimeout;

try
{
// Akka JVM doesn't have these lines
CurrentMessage = envelope.Message;
CurrentMessage = message;
_currentEnvelopeId++;
if (_currentEnvelopeId == int.MaxValue) _currentEnvelopeId = 0;

Expand All @@ -72,14 +75,16 @@ public void Invoke(Envelope envelope)
{
CancelReceiveTimeout();
}


if (message is IAutoReceivedMessage)
{
AutoReceiveMessage(envelope);
}
else
{
ReceiveMessage(message);
// Intentional, we want to preserve IScheduledMsg
ReceiveMessage(envelope.Message);
}
CurrentMessage = null;
}
Expand All @@ -105,24 +110,7 @@ private IActorRef MatchSender(Envelope envelope)
var sender = envelope.Sender;
return sender ?? System.DeadLetters;
}


/*
def autoReceiveMessage(msg: Envelope): Unit = {
if (system.settings.DebugAutoReceive)
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case t: Terminated ⇒ receivedTerminated(t)
case AddressTerminated(address) ⇒ addressTerminated(address)
case Kill ⇒ throw new ActorKilledException("Kill")
case PoisonPill ⇒ self.stop()
case sel: ActorSelectionMessage ⇒ receiveSelection(sel)
case Identify(messageId) ⇒ sender() ! ActorIdentity(messageId, Some(self))
}
}
*/


/// <summary>
/// TBD
/// </summary>
Expand All @@ -133,15 +121,16 @@ msg.message match {
protected internal virtual void AutoReceiveMessage(Envelope envelope)
{
var message = envelope.Message;
if (message is IScheduledTellMsg scheduled)
message = scheduled.Message;

var actor = _actor;
var actorType = actor != null ? actor.GetType() : null;
var actorType = actor?.GetType();

if (System.Settings.DebugAutoReceive)
Publish(new Debug(Self.Path.ToString(), actorType, "received AutoReceiveMessage " + message));

var m = envelope.Message;
switch (m)
switch (message)
{
case Terminated terminated:
ReceivedTerminated(terminated);
Expand Down Expand Up @@ -190,6 +179,9 @@ public void ReceiveMessageForTest(Envelope envelope)
/// <param name="message">The message that will be sent to the actor.</param>
protected virtual void ReceiveMessage(object message)
{
if (message is IScheduledTellMsg scheduled)
message = scheduled.Message;

var wasHandled = _actor.AroundReceive(_state.GetCurrentBehavior(), message);

if (System.Settings.AddLoggingReceive && _actor is ILogReceive)
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka/Actor/ActorRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor.Internal;
using Akka.Actor.Scheduler;
using Akka.Annotations;
using Akka.Dispatch.SysMsg;
using Akka.Event;
Expand Down Expand Up @@ -106,7 +107,7 @@ public FutureActorRef(TaskCompletionSource<T> result, ActorPath path, IActorRefP
protected override void TellInternal(object message, IActorRef sender)
{
var handled = false;

switch (message)
{
case ISystemMessage msg:
Expand Down
7 changes: 6 additions & 1 deletion src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor.Scheduler;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Event;
Expand Down Expand Up @@ -548,7 +549,11 @@ private sealed class ScheduledTell : IRunnable
public ScheduledTell(ICanTell receiver, object message, IActorRef sender)
{
_receiver = receiver;
_message = message;
_message = receiver is not ActorRefWithCell
? message
: message is INotInfluenceReceiveTimeout
? new ScheduledTellMsgNoInfluenceReceiveTimeout(message)
: new ScheduledTellMsg(message);
_sender = sender;
}

Expand Down
47 changes: 47 additions & 0 deletions src/core/Akka/Actor/Scheduler/IScheduledTellMsg.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//-----------------------------------------------------------------------
// <copyright file="IScheduledMsg.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Annotations;

namespace Akka.Actor.Scheduler;

/// <summary>
/// Marker interface used to indicate the presence of a scheduled message from the
/// classic scheduler API.
/// </summary>
/// <remarks>
/// Made public so these messages can be filtered for telemetry purposes
/// </remarks>
[InternalApi]
public interface IScheduledTellMsg : IWrappedMessage, INoSerializationVerificationNeeded
{
}

/// <summary>
/// INTERNAL API
/// </summary>
internal sealed class ScheduledTellMsg : IScheduledTellMsg
{
public ScheduledTellMsg(object message)
{
Message = message;
}
public object Message { get; }
}

/// <summary>
/// INTERNAL API
/// </summary>
internal sealed class ScheduledTellMsgNoInfluenceReceiveTimeout : IScheduledTellMsg, INotInfluenceReceiveTimeout
{
public ScheduledTellMsgNoInfluenceReceiveTimeout(object message)
{
Message = message;
}

public object Message { get; }
}

0 comments on commit a91bb5e

Please sign in to comment.