diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt index fd093f674f0..f3122132604 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt @@ -2555,9 +2555,9 @@ namespace Akka.Dispatch } public sealed class Dispatchers { - public static readonly string DefaultBlockingDispatcherId; - public static readonly string DefaultDispatcherId; - public static readonly string SynchronizedDispatcherId; + public const string DefaultBlockingDispatcherId = "akka.actor.default-blocking-io-dispatcher"; + public const string DefaultDispatcherId = "akka.actor.default-dispatcher"; + public const string SynchronizedDispatcherId = "akka.actor.synchronized-dispatcher"; public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites, Akka.Event.ILoggingAdapter logger) { } public Akka.Configuration.Config DefaultDispatcherConfig { get; } public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 9a8dffbc8f0..5c79e9b0ca6 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -2479,14 +2479,16 @@ namespace Akka.Configuration.Hocon } namespace Akka.Dispatch { - public sealed class ActionRunnable : Akka.Dispatch.IRunnable + public sealed class ActionRunnable : Akka.Dispatch.IRunnable, System.Threading.IThreadPoolWorkItem { public ActionRunnable(System.Action action) { } + public void Execute() { } public void Run() { } } - public sealed class ActionWithStateRunnable : Akka.Dispatch.IRunnable + public sealed class ActionWithStateRunnable : Akka.Dispatch.IRunnable, System.Threading.IThreadPoolWorkItem { public ActionWithStateRunnable(System.Action actionWithState, object state) { } + public void Execute() { } public void Run() { } } public class ActorTaskScheduler : System.Threading.Tasks.TaskScheduler @@ -2557,9 +2559,9 @@ namespace Akka.Dispatch } public sealed class Dispatchers { - public static readonly string DefaultBlockingDispatcherId; - public static readonly string DefaultDispatcherId; - public static readonly string SynchronizedDispatcherId; + public const string DefaultBlockingDispatcherId = "akka.actor.default-blocking-io-dispatcher"; + public const string DefaultDispatcherId = "akka.actor.default-dispatcher"; + public const string SynchronizedDispatcherId = "akka.actor.synchronized-dispatcher"; public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites, Akka.Event.ILoggingAdapter logger) { } public Akka.Configuration.Config DefaultDispatcherConfig { get; } public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; } @@ -2619,14 +2621,14 @@ namespace Akka.Dispatch where TQueue : Akka.Dispatch.MessageQueues.IMessageQueue { } public interface IRequiresMessageQueue where T : Akka.Dispatch.ISemantics { } - public interface IRunnable + public interface IRunnable : System.Threading.IThreadPoolWorkItem { void Run(); } public interface ISemantics { } public interface IUnboundedDequeBasedMessageQueueSemantics : Akka.Dispatch.IDequeBasedMessageQueueSemantics, Akka.Dispatch.ISemantics, Akka.Dispatch.IUnboundedMessageQueueSemantics { } public interface IUnboundedMessageQueueSemantics : Akka.Dispatch.ISemantics { } - public class Mailbox : Akka.Dispatch.IRunnable + public class Mailbox : Akka.Dispatch.IRunnable, System.Threading.IThreadPoolWorkItem { public Mailbox(Akka.Dispatch.MessageQueues.IMessageQueue messageQueue) { } public Akka.Dispatch.MessageDispatcher Dispatcher { get; } @@ -2634,6 +2636,7 @@ namespace Akka.Dispatch public virtual void CleanUp() { } [System.Diagnostics.ConditionalAttribute("MAILBOXDEBUG")] public static void DebugPrint(string message, params object[] args) { } + public void Execute() { } public void Run() { } public virtual void SetActor(Akka.Actor.ActorCell actorCell) { } } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index fd093f674f0..f3122132604 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -2555,9 +2555,9 @@ namespace Akka.Dispatch } public sealed class Dispatchers { - public static readonly string DefaultBlockingDispatcherId; - public static readonly string DefaultDispatcherId; - public static readonly string SynchronizedDispatcherId; + public const string DefaultBlockingDispatcherId = "akka.actor.default-blocking-io-dispatcher"; + public const string DefaultDispatcherId = "akka.actor.default-dispatcher"; + public const string SynchronizedDispatcherId = "akka.actor.synchronized-dispatcher"; public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites, Akka.Event.ILoggingAdapter logger) { } public Akka.Configuration.Config DefaultDispatcherConfig { get; } public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; } diff --git a/src/core/Akka.Streams/Implementation/IO/OutputStreamSourceStage.cs b/src/core/Akka.Streams/Implementation/IO/OutputStreamSourceStage.cs index 30bf4b31abb..2f72828f60d 100644 --- a/src/core/Akka.Streams/Implementation/IO/OutputStreamSourceStage.cs +++ b/src/core/Akka.Streams/Implementation/IO/OutputStreamSourceStage.cs @@ -20,7 +20,6 @@ namespace Akka.Streams.Implementation.IO { - /// /// INTERNAL API /// @@ -31,7 +30,9 @@ internal class OutputStreamSourceStage : GraphStageWithMaterializedValue /// TBD /// - internal interface IAdapterToStageMessage { } + internal interface IAdapterToStageMessage + { + } /// /// TBD @@ -45,7 +46,6 @@ internal class Flush : IAdapterToStageMessage private Flush() { - } } @@ -67,7 +67,9 @@ private Close() /// /// TBD /// - internal interface IDownstreamStatus { } + internal interface IDownstreamStatus + { + } /// /// TBD @@ -81,7 +83,6 @@ internal class Ok : IDownstreamStatus private Ok() { - } } @@ -97,7 +98,6 @@ internal class Canceled : IDownstreamStatus private Canceled() { - } } @@ -127,7 +127,8 @@ private sealed class Logic : OutGraphStageLogic, IStageWithCallback private TaskCompletionSource _close; private MessageDispatcher _dispatcher; - public Logic(OutputStreamSourceStage stage, BlockingCollection dataQueue, AtomicReference downstreamStatus, string dispatcherId) : base(stage.Shape) + public Logic(OutputStreamSourceStage stage, BlockingCollection dataQueue, + AtomicReference downstreamStatus, string dispatcherId) : base(stage.Shape) { _stage = stage; _dataQueue = dataQueue; @@ -141,7 +142,8 @@ public Logic(OutputStreamSourceStage stage, BlockingCollection dataQ else FailStage(result.Value as Exception); }); - _upstreamCallback = GetAsyncCallback<(IAdapterToStageMessage, TaskCompletionSource)>(OnAsyncMessage); + _upstreamCallback = + GetAsyncCallback<(IAdapterToStageMessage, TaskCompletionSource)>(OnAsyncMessage); _pullTask = new OnPullRunnable(downstreamCallback, dataQueue, _cancellation.Token); SetHandler(_stage._out, this); } @@ -163,14 +165,15 @@ public override void PostStop() _cancellation.Cancel(false); base.PostStop(); } - + private sealed class OnPullRunnable : IRunnable { private readonly Action> _callback; private readonly BlockingCollection _dataQueue; private readonly CancellationToken _cancellationToken; - public OnPullRunnable(Action> callback, BlockingCollection dataQueue, CancellationToken cancellationToken) + public OnPullRunnable(Action> callback, + BlockingCollection dataQueue, CancellationToken cancellationToken) { _callback = callback; _dataQueue = dataQueue; @@ -192,8 +195,15 @@ public void Run() _callback(new Right(ex)); } } + +#if !NETSTANDARD + public void Execute() + { + Run(); + } +#endif } - + public override void OnPull() => _dispatcher.Schedule(_pullTask); private void OnPush(ByteString data) @@ -282,7 +292,8 @@ public OutputStreamSourceStage(TimeSpan writeTimeout) /// TBD /// TBD /// TBD - public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + public override ILogicAndMaterializedValue CreateLogicAndMaterializedValue( + Attributes inheritedAttributes) { // has to be in this order as module depends on shape var maxBuffer = inheritedAttributes.GetAttribute(new Attributes.InputBuffer(16, 16)).Max; @@ -306,7 +317,7 @@ public override ILogicAndMaterializedValue CreateLogicAndMaterializedVal /// internal class OutputStreamAdapter : Stream { - #region not supported + #region not supported /// /// TBD @@ -315,7 +326,8 @@ internal class OutputStreamAdapter : Stream /// TBD /// TBD /// TBD - public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException("This stream can only write"); + public override long Seek(long offset, SeekOrigin origin) => + throw new NotSupportedException("This stream can only write"); /// /// TBD @@ -332,7 +344,8 @@ internal class OutputStreamAdapter : Stream /// TBD /// TBD /// TBD - public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("This stream can only write"); + public override int Read(byte[] buffer, int offset, int count) => + throw new NotSupportedException("This stream can only write"); /// /// TBD @@ -352,8 +365,9 @@ public override long Position #endregion - private static readonly Exception PublisherClosedException = new IOException("Reactive stream is terminated, no writes are possible"); - + private static readonly Exception PublisherClosedException = + new IOException("Reactive stream is terminated, no writes are possible"); + private readonly BlockingCollection _dataQueue; private readonly AtomicReference _downstreamStatus; private readonly IStageWithCallback _stageWithCallback; @@ -405,7 +419,6 @@ private void SendData(ByteString data) => Send(() => private void SendMessage(IAdapterToStageMessage msg, bool handleCancelled = true) => Send(() => { - _stageWithCallback.WakeUp(msg).Wait(_writeTimeout); if (_downstreamStatus.Value is Canceled && handleCancelled) { @@ -414,7 +427,7 @@ private void SendMessage(IAdapterToStageMessage msg, bool handleCancelled = true throw PublisherClosedException; } }); - + /// /// TBD @@ -445,13 +458,15 @@ protected override void Dispose(bool disposing) /// TBD /// public override bool CanRead => false; + /// /// TBD /// public override bool CanSeek => false; + /// /// TBD /// public override bool CanWrite => true; } -} +} \ No newline at end of file diff --git a/src/core/Akka/Actor/ActorCell.cs b/src/core/Akka/Actor/ActorCell.cs index 0e629a0246f..7e575323fa4 100644 --- a/src/core/Akka/Actor/ActorCell.cs +++ b/src/core/Akka/Actor/ActorCell.cs @@ -189,7 +189,7 @@ public void Init(bool sendSupervise, MailboxType mailboxType) * Create the mailbox and enqueue the Create() message to ensure that * this is processed before anything else. */ - var mailbox = Dispatcher.CreateMailbox(this, mailboxType); + var mailbox = MessageDispatcher.CreateMailbox(this, mailboxType); Create createMessage; /* diff --git a/src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs b/src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs index b08b4360842..51cfb59895f 100644 --- a/src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs +++ b/src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs @@ -429,6 +429,13 @@ public override string ToString() { return $"[{_receiver}.Tell({_message}, {_sender})]"; } + + #if !NETSTANDARD + public void Execute() + { + Run(); + } + #endif } private class SchedulerRegistration diff --git a/src/core/Akka/Dispatch/AbstractDispatcher.cs b/src/core/Akka/Dispatch/AbstractDispatcher.cs index 45b0f0ebbb9..c91ef4a0025 100644 --- a/src/core/Akka/Dispatch/AbstractDispatcher.cs +++ b/src/core/Akka/Dispatch/AbstractDispatcher.cs @@ -59,9 +59,9 @@ public sealed class DefaultDispatcherPrerequisites : IDispatcherPrerequisites /// TBD /// TBD public DefaultDispatcherPrerequisites( - EventStream eventStream, - IScheduler scheduler, - Settings settings, + EventStream eventStream, + IScheduler scheduler, + Settings settings, Mailboxes mailboxes) { Mailboxes = mailboxes; @@ -74,14 +74,17 @@ public DefaultDispatcherPrerequisites( /// TBD /// public EventStream EventStream { get; private set; } + /// /// TBD /// public IScheduler Scheduler { get; private set; } + /// /// TBD /// public Settings Settings { get; private set; } + /// /// TBD /// @@ -121,11 +124,12 @@ internal sealed class ChannelExecutorConfigurator : ExecutorServiceConfigurator private static readonly Config PriorityDefault = ConfigurationFactory.ParseString(@" executor = channel-executor channel-executor.priority = normal"); - - public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites) + + public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, + prerequisites) { config = config == null ? PriorityDefault : config.WithFallback(PriorityDefault); - + var priority = config.GetString("channel-executor.priority", "normal"); Priority = (TaskSchedulerPriority)Enum.Parse(typeof(TaskSchedulerPriority), priority, true); } @@ -134,7 +138,8 @@ public ChannelExecutorConfigurator(Config config, IDispatcherPrerequisites prere public override ExecutorService Produce(string id) { - Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[{id}]", typeof(TaskSchedulerExecutor), $"Launched Dispatcher [{id}] with Priority[{Priority}]")); + Prerequisites.EventStream.Publish(new Debug($"ChannelExecutor-[{id}]", typeof(TaskSchedulerExecutor), + $"Launched Dispatcher [{id}] with Priority[{Priority}]")); var scheduler = ChannelTaskScheduler.Get(Prerequisites.Settings.System).GetScheduler(Priority); return new TaskSchedulerExecutor(id, scheduler); @@ -163,7 +168,7 @@ public override ExecutorService Produce(string id) /// /// TBD /// TBD - public DefaultTaskSchedulerExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) + public DefaultTaskSchedulerExecutorConfigurator(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites) { } @@ -214,8 +219,9 @@ private static DedicatedThreadPoolSettings ConfigureSettings(Config config) { var dtp = config.GetConfig("dedicated-thread-pool"); var fje = config.GetConfig("fork-join-executor"); - if (dtp.IsNullOrEmpty() && fje.IsNullOrEmpty()) throw new ConfigurationException( - $"must define section 'dedicated-thread-pool' OR 'fork-join-executor' for fork-join-executor {config.GetString("id", "unknown")}"); + if (dtp.IsNullOrEmpty() && fje.IsNullOrEmpty()) + throw new ConfigurationException( + $"must define section 'dedicated-thread-pool' OR 'fork-join-executor' for fork-join-executor {config.GetString("id", "unknown")}"); if (!dtp.IsNullOrEmpty()) { @@ -231,13 +237,12 @@ private static DedicatedThreadPoolSettings ConfigureSettings(Config config) { var settings = new DedicatedThreadPoolSettings( ThreadPoolConfig.ScaledPoolSize( - fje.GetInt("parallelism-min"), - 1.0, + fje.GetInt("parallelism-min"), + 1.0, fje.GetInt("parallelism-max")), - name:config.GetString("id")); + name: config.GetString("id")); return settings; } - } } @@ -267,7 +272,8 @@ public override ExecutorService Produce(string id) /// /// TBD /// TBD - public ThreadPoolExecutorServiceFactory(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites) + public ThreadPoolExecutorServiceFactory(Config config, IDispatcherPrerequisites prerequisites) : base(config, + prerequisites) { } } @@ -340,11 +346,11 @@ protected ExecutorServiceConfigurator ConfigureExecutor() throw new ConfigurationException( $"Could not resolve executor service configurator type {executor} for path {Config.GetString("id", "unknown")}"); } + var args = new object[] { Config, Prerequisites }; return (ExecutorServiceConfigurator)Activator.CreateInstance(executorConfiguratorType, args); } } - } /// @@ -358,11 +364,16 @@ public abstract class MessageDispatcher private const int Rescheduled = 2; /* dispatcher debugging helpers */ - private const bool DebugDispatcher = false; // IMPORTANT: make this a compile-time constant so compiler will elide debug code in production + private const bool + DebugDispatcher = + false; // IMPORTANT: make this a compile-time constant so compiler will elide debug code in production + /// /// TBD /// - internal static readonly Lazy> Actors = new Lazy>(() => new Index(), LazyThreadSafetyMode.PublicationOnly); + internal static readonly Lazy> Actors = + new Lazy>( + () => new Index(), LazyThreadSafetyMode.PublicationOnly); #pragma warning disable CS0162 // Disabled since the flag can be set while debugging /// @@ -464,10 +475,12 @@ private long AddInhabitants(long add) { // We haven't succeeded in decreasing the inhabitants yet but the simple fact that we're trying to // go below zero means that there is an imbalance and we might as well throw the exception - var e = new InvalidOperationException("ACTOR SYSTEM CORRUPTED!!! A dispatcher can't have less than 0 inhabitants!"); + var e = new InvalidOperationException( + "ACTOR SYSTEM CORRUPTED!!! A dispatcher can't have less than 0 inhabitants!"); ReportFailure(e); throw e; } + return ret; } @@ -508,6 +521,13 @@ public void Run() _runnable = null; } } + +#if !NETSTANDARD + public void Execute() + { + Run(); + } +#endif } /// @@ -558,6 +578,7 @@ protected void ReportFailure(Exception ex) protected abstract void Shutdown(); private readonly ShutdownAction _shutdownAction; + sealed class ShutdownAction : IRunnable { private readonly MessageDispatcher _dispatcher; @@ -578,15 +599,25 @@ public void Run() } finally { - while (!_dispatcher.UpdateShutdownSchedule(_dispatcher.ShutdownSchedule, Unscheduled)) { } + while (!_dispatcher.UpdateShutdownSchedule(_dispatcher.ShutdownSchedule, Unscheduled)) + { + } } } else if (sched == Rescheduled) { - if (_dispatcher.UpdateShutdownSchedule(Rescheduled, Scheduled)) _dispatcher.ScheduleShutdownAction(); + if (_dispatcher.UpdateShutdownSchedule(Rescheduled, Scheduled)) + _dispatcher.ScheduleShutdownAction(); else Run(); } } + +#if !NETSTANDARD + public void Execute() + { + Run(); + } +#endif } private void IfSensibleToDoSoThenScheduleShutdown() @@ -600,9 +631,12 @@ private void IfSensibleToDoSoThenScheduleShutdown() if (UpdateShutdownSchedule(Unscheduled, Scheduled)) ScheduleShutdownAction(); else IfSensibleToDoSoThenScheduleShutdown(); } + if (sched == Scheduled) { - if (UpdateShutdownSchedule(Scheduled, Rescheduled)) { } + if (UpdateShutdownSchedule(Scheduled, Rescheduled)) + { + } else IfSensibleToDoSoThenScheduleShutdown(); } @@ -637,7 +671,7 @@ private void ScheduleShutdownAction() /// Cell of the actor. /// The mailbox configurator. /// The configured for this actor. - internal Mailbox CreateMailbox(ActorCell cell, MailboxType mailboxType) + internal static Mailbox CreateMailbox(ActorCell cell, MailboxType mailboxType) { return new Mailbox(mailboxType.Create(cell.Self, cell.System)); } @@ -706,15 +740,18 @@ internal virtual void Register(ActorCell actor) /// true if the was scheduled for execution, otherwise false. internal bool RegisterForExecution(Mailbox mbox, bool hasMessageHint, bool hasSystemMessageHint) { - if (mbox.CanBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) //This needs to be here to ensure thread safety and no races + if (mbox.CanBeScheduledForExecution(hasMessageHint, + hasSystemMessageHint)) //This needs to be here to ensure thread safety and no races { if (mbox.SetAsScheduled()) { ExecuteTask(mbox); return true; } + return false; } + return false; } @@ -762,7 +799,8 @@ internal virtual void Unregister(ActorCell actor) internal virtual void Suspend(ActorCell actorCell) { var mbox = actorCell.Mailbox; - if (mbox.Actor == actorCell && mbox.Dispatcher == this) //make sure everything is referring to the same instance + if (mbox.Actor == actorCell && + mbox.Dispatcher == this) //make sure everything is referring to the same instance { mbox.Suspend(); } @@ -775,11 +813,11 @@ internal virtual void Suspend(ActorCell actorCell) internal virtual void Resume(ActorCell actorCell) { var mbox = actorCell.Mailbox; - if (mbox.Actor == actorCell && mbox.Dispatcher == this && mbox.Resume()) //make sure everything is referring to the same instance + if (mbox.Actor == actorCell && mbox.Dispatcher == this && + mbox.Resume()) //make sure everything is referring to the same instance { RegisterForExecution(mbox, false, false); // force the mailbox to re-run after resume } } } -} - +} \ No newline at end of file diff --git a/src/core/Akka/Dispatch/CurrentSynchronizationContextDispatcher.cs b/src/core/Akka/Dispatch/CurrentSynchronizationContextDispatcher.cs index f750ae023da..71f9766b984 100644 --- a/src/core/Akka/Dispatch/CurrentSynchronizationContextDispatcher.cs +++ b/src/core/Akka/Dispatch/CurrentSynchronizationContextDispatcher.cs @@ -38,7 +38,8 @@ public override ExecutorService Produce(string id) /// /// TBD /// TBD - public CurrentSynchronizationContextExecutorServiceFactory(Config config, IDispatcherPrerequisites prerequisites) : base(config, prerequisites) + public CurrentSynchronizationContextExecutorServiceFactory(Config config, + IDispatcherPrerequisites prerequisites) : base(config, prerequisites) { } } @@ -60,11 +61,12 @@ internal sealed class CurrentSynchronizationContextDispatcherConfigurator : Mess /// /// TBD /// TBD - public CurrentSynchronizationContextDispatcherConfigurator(Config config, IDispatcherPrerequisites prerequisites) + public CurrentSynchronizationContextDispatcherConfigurator(Config config, + IDispatcherPrerequisites prerequisites) : base(config, prerequisites) { - - _executorServiceConfigurator = new CurrentSynchronizationContextExecutorServiceFactory(config, prerequisites); + _executorServiceConfigurator = + new CurrentSynchronizationContextExecutorServiceFactory(config, prerequisites); // We don't bother trying to support any other type of executor here. PinnedDispatcher doesn't support them } @@ -99,7 +101,9 @@ public sealed class CurrentSynchronizationContextDispatcher : Dispatcher /// TBD /// TBD /// TBD - public CurrentSynchronizationContextDispatcher(MessageDispatcherConfigurator configurator, string id, int throughput, long? throughputDeadlineTime, ExecutorServiceFactory executorServiceFactory, TimeSpan shutdownTimeout) + public CurrentSynchronizationContextDispatcher(MessageDispatcherConfigurator configurator, string id, + int throughput, long? throughputDeadlineTime, ExecutorServiceFactory executorServiceFactory, + TimeSpan shutdownTimeout) : base(configurator, id, throughput, throughputDeadlineTime, executorServiceFactory, shutdownTimeout) { /* @@ -113,8 +117,14 @@ sealed class NoTask : IRunnable { public void Run() { + } + +#if !NETSTANDARD + public void Execute() + { } +#endif } private volatile ActorCell _owner; @@ -129,7 +139,8 @@ public void Run() internal override void Register(ActorCell actor) { var current = _owner; - if (current != null && actor != current) throw new InvalidOperationException($"Cannot register to anyone but {_owner}"); + if (current != null && actor != current) + throw new InvalidOperationException($"Cannot register to anyone but {_owner}"); _owner = actor; base.Register(actor); } @@ -144,5 +155,4 @@ internal override void Unregister(ActorCell actor) _owner = null; } } -} - +} \ No newline at end of file diff --git a/src/core/Akka/Dispatch/Dispatchers.cs b/src/core/Akka/Dispatch/Dispatchers.cs index 19ae34f9c10..bc3f580832f 100644 --- a/src/core/Akka/Dispatch/Dispatchers.cs +++ b/src/core/Akka/Dispatch/Dispatchers.cs @@ -57,7 +57,13 @@ internal sealed class FullThreadPoolExecutorServiceImpl : ThreadPoolExecutorServ /// TBD public override void Execute(IRunnable run) { +#if NETSTANDARD ThreadPool.UnsafeQueueUserWorkItem(Executor, run); +#else + // use native .NET 6 APIs here to reduce allocations + // preferLocal to help reduce context switching + ThreadPool.UnsafeQueueUserWorkItem(run, true); +#endif } /// @@ -99,9 +105,7 @@ public PartialTrustThreadPoolExecutorService(string id) : base(id) /// internal sealed class FixedConcurrencyTaskScheduler : TaskScheduler { - - [ThreadStatic] - private static bool _threadRunning = false; + [ThreadStatic] private static bool _threadRunning = false; private ConcurrentQueue _tasks = new ConcurrentQueue(); private int _readers = 0; @@ -298,26 +302,26 @@ public sealed class Dispatchers /// /// The default dispatcher identifier, also the full key of the configuration of the default dispatcher. /// - public static readonly string DefaultDispatcherId = "akka.actor.default-dispatcher"; + public const string DefaultDispatcherId = "akka.actor.default-dispatcher"; /// /// The id of a default dispatcher to use for operations known to be blocking. Note that /// for optimal performance you will want to isolate different blocking resources /// on different thread pools. /// - public static readonly string DefaultBlockingDispatcherId = "akka.actor.default-blocking-io-dispatcher"; + public const string DefaultBlockingDispatcherId = "akka.actor.default-blocking-io-dispatcher"; /// /// INTERNAL API /// - internal static readonly string InternalDispatcherId = "akka.actor.internal-dispatcher"; + internal const string InternalDispatcherId = "akka.actor.internal-dispatcher"; private const int MaxDispatcherAliasDepth = 20; /// /// The identifier for synchronized dispatchers. /// - public static readonly string SynchronizedDispatcherId = "akka.actor.synchronized-dispatcher"; + public const string SynchronizedDispatcherId = "akka.actor.synchronized-dispatcher"; private readonly ActorSystem _system; private Config _cachingConfig; @@ -329,7 +333,8 @@ public sealed class Dispatchers /// /// Has to be thread-safe, as this collection can be accessed concurrently by many actors. /// - private readonly ConcurrentDictionary _dispatcherConfigurators = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _dispatcherConfigurators = + new ConcurrentDictionary(); /// Initializes a new instance of the class. /// The system. @@ -443,13 +448,20 @@ private MessageDispatcherConfigurator LookupConfigurator(string id) if (valueAtPath.IsObject()) { var newConfigurator = ConfiguratorFrom(Config(id)); - return _dispatcherConfigurators.TryAdd(id, newConfigurator) ? newConfigurator : _dispatcherConfigurators[id]; + return _dispatcherConfigurators.TryAdd(id, newConfigurator) + ? newConfigurator + : _dispatcherConfigurators[id]; } - throw new ConfigurationException($"Expected either a dispatcher config or an alias at [{id}] but found [{valueAtPath}]"); + + throw new ConfigurationException( + $"Expected either a dispatcher config or an alias at [{id}] but found [{valueAtPath}]"); } + throw new ConfigurationException($"Dispatcher {id} not configured."); } - throw new ConfigurationException($"Could not find a concrete dispatcher config after following {MaxDispatcherAliasDepth} deep. Is there a circular reference in your config? Last followed Id was [{id}]"); + + throw new ConfigurationException( + $"Could not find a concrete dispatcher config after following {MaxDispatcherAliasDepth} deep. Is there a circular reference in your config? Last followed Id was [{id}]"); } /// @@ -517,12 +529,14 @@ private static Config IdConfig(string id) } - private static readonly Config ForkJoinExecutorConfig = ConfigurationFactory.ParseString("executor=fork-join-executor"); + private static readonly Config ForkJoinExecutorConfig = + ConfigurationFactory.ParseString("executor=fork-join-executor"); private static readonly Config CurrentSynchronizationContextExecutorConfig = ConfigurationFactory.ParseString(@"executor=current-context-executor"); private static readonly Config TaskExecutorConfig = ConfigurationFactory.ParseString(@"executor=task-executor"); + private MessageDispatcherConfigurator ConfiguratorFrom(Config cfg) { if (cfg.IsNullOrEmpty()) @@ -565,7 +579,9 @@ private MessageDispatcherConfigurator ConfiguratorFrom(Config cfg) { throw new ConfigurationException($"Could not resolve dispatcher type {type} for path {id}"); } - dispatcher = (MessageDispatcherConfigurator)Activator.CreateInstance(dispatcherType, cfg, Prerequisites); + + dispatcher = + (MessageDispatcherConfigurator)Activator.CreateInstance(dispatcherType, cfg, Prerequisites); break; } @@ -619,5 +635,4 @@ public override MessageDispatcher Dispatcher() return _instance; } } -} - +} \ No newline at end of file diff --git a/src/core/Akka/Dispatch/IRunnable.cs b/src/core/Akka/Dispatch/IRunnable.cs index 7ca80c7da70..1dbdb62f8a4 100644 --- a/src/core/Akka/Dispatch/IRunnable.cs +++ b/src/core/Akka/Dispatch/IRunnable.cs @@ -6,16 +6,21 @@ //----------------------------------------------------------------------- using System; +using System.Threading; namespace Akka.Dispatch { /// /// An asynchronous operation will be executed by a . /// +#if NETSTANDARD public interface IRunnable +#else + public interface IRunnable : IThreadPoolWorkItem +#endif { /// - /// TBD + /// Executes the task. /// void Run(); } @@ -28,21 +33,25 @@ public sealed class ActionRunnable : IRunnable private readonly Action _action; /// - /// TBD + /// Creates a new thread pool work item that executes a delegate. /// - /// TBD + /// The delegate to execute public ActionRunnable(Action action) { _action = action; } - /// - /// TBD - /// public void Run() { _action(); } + +#if !NETSTANDARD + public void Execute() + { + _action(); + } +#endif } /// @@ -54,23 +63,26 @@ public sealed class ActionWithStateRunnable : IRunnable private readonly object _state; /// - /// TBD + /// Creates a new thread pool work item that executes a delegate along with state. /// - /// TBD - /// TBD + /// The delegate to execute. + /// The state to execute with this delegate. public ActionWithStateRunnable(Action actionWithState, object state) { _actionWithState = actionWithState; _state = state; } - /// - /// TBD - /// public void Run() { _actionWithState(_state); } - } -} +#if !NETSTANDARD + public void Execute() + { + _actionWithState(_state); + } +#endif + } +} \ No newline at end of file diff --git a/src/core/Akka/Dispatch/Mailbox.cs b/src/core/Akka/Dispatch/Mailbox.cs index e253187c76c..f9ab60a5797 100644 --- a/src/core/Akka/Dispatch/Mailbox.cs +++ b/src/core/Akka/Dispatch/Mailbox.cs @@ -35,6 +35,7 @@ internal static class MailboxStatus /// TBD /// public const int Open = 0; // _status is not initialized in AbstractMailbox, so default must be zero! + /// /// TBD /// @@ -51,18 +52,22 @@ internal static class MailboxStatus /// TBD /// public const int ShouldScheduleMask = 3; + /// /// TBD /// public const int ShouldNotProcessMask = ~2; + /// /// TBD /// public const int SuspendMask = ~3; + /// /// TBD /// public const int SuspendUnit = 4; + /// /// TBD /// @@ -201,31 +206,46 @@ public virtual void SetActor(ActorCell actorCell) /// TBD /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal int CurrentStatus() { return Volatile.Read(ref _statusDotNotCallMeDirectly); } + internal int CurrentStatus() + { + return Volatile.Read(ref _statusDotNotCallMeDirectly); + } /// /// TBD /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal bool ShouldProcessMessage() { return (CurrentStatus() & MailboxStatus.ShouldNotProcessMask) == 0; } + internal bool ShouldProcessMessage() + { + return (CurrentStatus() & MailboxStatus.ShouldNotProcessMask) == 0; + } /// /// Returns the number of times this mailbox is currently suspended. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal int SuspendCount() { return CurrentStatus() / MailboxStatus.SuspendUnit; } + internal int SuspendCount() + { + return CurrentStatus() / MailboxStatus.SuspendUnit; + } /// /// Returns true if the mailbox is currently suspended from processing. false otherwise. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal bool IsSuspended() { return (CurrentStatus() & MailboxStatus.SuspendMask) != 0; } + internal bool IsSuspended() + { + return (CurrentStatus() & MailboxStatus.SuspendMask) != 0; + } /// /// Returns true if the mailbox is closed. false otherwise. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal bool IsClosed() { return (CurrentStatus() == MailboxStatus.Closed); } + internal bool IsClosed() + { + return (CurrentStatus() == MailboxStatus.Closed); + } /// /// Returns true if the mailbox is scheduled for execution on a . false otherwise. @@ -307,6 +327,7 @@ internal bool BecomeClosed() SetStatus(MailboxStatus.Closed); return false; } + return UpdateStatus(status, MailboxStatus.Closed) || BecomeClosed(); } @@ -362,7 +383,8 @@ public void Run() finally { SetAsIdle(); // Volatile write, needed here - Dispatcher.RegisterForExecution(this, false, false); // schedule to run again if there are more messages, possibly + Dispatcher.RegisterForExecution(this, false, + false); // schedule to run again if there are more messages, possibly } } @@ -382,11 +404,13 @@ private void ProcessMailbox(int left, long deadlineTicks) // not going to bother catching ThreadAbortExceptions here, since they'll get rethrown anyway Actor.Invoke(next); ProcessAllSystemMessages(); - if (left > 0 && (Dispatcher.ThroughputDeadlineTime.HasValue == false || (MonotonicClock.GetTicks() - deadlineTicks) < 0)) + if (left > 0 && (Dispatcher.ThroughputDeadlineTime.HasValue == false || + (MonotonicClock.GetTicks() - deadlineTicks) < 0)) { left = left - 1; continue; } + break; } } @@ -407,7 +431,8 @@ private void ProcessAllSystemMessages() var msg = messageList.Head; messageList = messageList.Tail; msg.Unlink(); - DebugPrint("{0} processing system message {1} with {2}", Actor.Self, msg, string.Join(",", Actor.GetChildren())); + DebugPrint("{0} processing system message {1} with {2}", Actor.Self, msg, + string.Join(",", Actor.GetChildren())); // we know here that SystemInvoke ensures that only "fatal" exceptions get rethrown Actor.SystemInvoke(msg); @@ -432,7 +457,8 @@ private void ProcessAllSystemMessages() } catch (Exception ex) { - Actor.System.EventStream.Publish(new Error(ex, GetType().FullName, GetType(), $"error while enqueuing {msg} to deadletters: {ex.Message}")); + Actor.System.EventStream.Publish(new Error(ex, GetType().FullName, GetType(), + $"error while enqueuing {msg} to deadletters: {ex.Message}")); } } @@ -532,9 +558,16 @@ internal virtual bool HasSystemMessages public static void DebugPrint(string message, params object[] args) { var formattedMessage = args.Length == 0 ? message : string.Format(message, args); - Console.WriteLine("[MAILBOX][{0}][Thread {1:0000}] {2}", DateTime.Now.ToString("o"), Thread.CurrentThread.ManagedThreadId, formattedMessage); + Console.WriteLine("[MAILBOX][{0}][Thread {1:0000}] {2}", DateTime.Now.ToString("o"), + Thread.CurrentThread.ManagedThreadId, formattedMessage); } +#if !NETSTANDARD + public void Execute() + { + Run(); + } +#endif } /// @@ -587,7 +620,9 @@ protected MailboxType(Settings settings, Config config) /// Compliment to /// /// The type of produced by this class. - public interface IProducesMessageQueue where TQueue : IMessageQueue { } + public interface IProducesMessageQueue where TQueue : IMessageQueue + { + } /// /// UnboundedMailbox is the default used by Akka.NET Actors @@ -642,8 +677,11 @@ public BoundedMailbox(Settings settings, Config config) : base(settings, config) Capacity = config.GetInt("mailbox-capacity", 0); PushTimeout = config.GetTimeSpan("mailbox-push-timeout-time", TimeSpan.FromSeconds(-1)); - if (Capacity < 0) throw new ArgumentException("The capacity for BoundedMailbox cannot be negative", nameof(config)); - if (PushTimeout.TotalSeconds < 0) throw new ArgumentException("The push time-out for BoundedMailbox cannot be be negative", nameof(config)); + if (Capacity < 0) + throw new ArgumentException("The capacity for BoundedMailbox cannot be negative", nameof(config)); + if (PushTimeout.TotalSeconds < 0) + throw new ArgumentException("The push time-out for BoundedMailbox cannot be be negative", + nameof(config)); } /// @@ -699,7 +737,8 @@ protected UnboundedPriorityMailbox(Settings settings, Config config) : base(sett /// The value returned by the method will be used to order the message in the mailbox. /// Lower values will be delivered first. Messages ordered by the same number will remain in delivery order. /// - public abstract class UnboundedStablePriorityMailbox : MailboxType, IProducesMessageQueue + public abstract class UnboundedStablePriorityMailbox : MailboxType, + IProducesMessageQueue { /// /// Function responsible for generating the priority value of a message based on its type and content. @@ -777,8 +816,10 @@ public BoundedDequeBasedMailbox(Settings settings, Config config) : base(setting Capacity = config.GetInt("mailbox-capacity", 0); PushTimeout = config.GetTimeSpan("mailbox-push-timeout-time", TimeSpan.FromSeconds(-1)); - if (Capacity < 0) throw new ArgumentException("The capacity for BoundedMailbox cannot be negative", nameof(config)); - if (PushTimeout.TotalSeconds < 0) throw new ArgumentException("The push time-out for BoundedMailbox cannot be null", nameof(config)); + if (Capacity < 0) + throw new ArgumentException("The capacity for BoundedMailbox cannot be negative", nameof(config)); + if (PushTimeout.TotalSeconds < 0) + throw new ArgumentException("The push time-out for BoundedMailbox cannot be null", nameof(config)); } /// @@ -787,6 +828,4 @@ public override IMessageQueue Create(IActorRef owner, ActorSystem system) return new BoundedDequeMessageQueue(Capacity, PushTimeout); } } - -} - +} \ No newline at end of file diff --git a/src/core/Akka/Util/SpanHacks.cs b/src/core/Akka/Util/SpanHacks.cs index 08464610126..a04d9ba47dc 100644 --- a/src/core/Akka/Util/SpanHacks.cs +++ b/src/core/Akka/Util/SpanHacks.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Globalization; using System.Text; namespace Akka.Util @@ -26,8 +27,13 @@ public static bool IsNumeric(char x) /// An . public static int Parse(ReadOnlySpan str) { +#if NETSTANDARD if (TryParse(str, out var i)) return i; +#else + if (int.TryParse(str, out var i)) + return i; +#endif throw new FormatException($"[{str.ToString()}] is now a valid numeric format"); } @@ -42,6 +48,7 @@ public static int Parse(ReadOnlySpan str) /// An . public static bool TryParse(ReadOnlySpan str, out int returnValue) { +#if NETSTANDARD var pos = 0; returnValue = 0; var sign = 1; @@ -61,6 +68,9 @@ public static bool TryParse(ReadOnlySpan str, out int returnValue) returnValue = sign * returnValue; return true; +#else + return int.TryParse(str, out returnValue); +#endif } /// @@ -76,7 +86,8 @@ public static string ToLowerInvariant(ReadOnlySpan input) { output[i] = char.ToLowerInvariant(input[i]); } + return output.ToString(); } } -} +} \ No newline at end of file