Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IThreadPoolWorkItem support to ThreadPoolDispatcher #5943

Merged
merged 5 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2553,9 +2553,9 @@ namespace Akka.Dispatch
}
public sealed class Dispatchers
{
public static readonly string DefaultBlockingDispatcherId;
public static readonly string DefaultDispatcherId;
public static readonly string SynchronizedDispatcherId;
public const string DefaultBlockingDispatcherId = "akka.actor.default-blocking-io-dispatcher";
public const string DefaultDispatcherId = "akka.actor.default-dispatcher";
public const string SynchronizedDispatcherId = "akka.actor.synchronized-dispatcher";
public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites, Akka.Event.ILoggingAdapter logger) { }
public Akka.Configuration.Config DefaultDispatcherConfig { get; }
public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2475,14 +2475,16 @@ namespace Akka.Configuration.Hocon
}
namespace Akka.Dispatch
{
public sealed class ActionRunnable : Akka.Dispatch.IRunnable
public sealed class ActionRunnable : Akka.Dispatch.IRunnable, System.Threading.IThreadPoolWorkItem
{
public ActionRunnable(System.Action action) { }
public void Execute() { }
public void Run() { }
}
public sealed class ActionWithStateRunnable : Akka.Dispatch.IRunnable
public sealed class ActionWithStateRunnable : Akka.Dispatch.IRunnable, System.Threading.IThreadPoolWorkItem
{
public ActionWithStateRunnable(System.Action<object> actionWithState, object state) { }
public void Execute() { }
public void Run() { }
}
public class ActorTaskScheduler : System.Threading.Tasks.TaskScheduler
Expand Down Expand Up @@ -2553,9 +2555,9 @@ namespace Akka.Dispatch
}
public sealed class Dispatchers
{
public static readonly string DefaultBlockingDispatcherId;
public static readonly string DefaultDispatcherId;
public static readonly string SynchronizedDispatcherId;
public const string DefaultBlockingDispatcherId = "akka.actor.default-blocking-io-dispatcher";
public const string DefaultDispatcherId = "akka.actor.default-dispatcher";
public const string SynchronizedDispatcherId = "akka.actor.synchronized-dispatcher";
public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites, Akka.Event.ILoggingAdapter logger) { }
public Akka.Configuration.Config DefaultDispatcherConfig { get; }
public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; }
Expand Down Expand Up @@ -2615,21 +2617,22 @@ namespace Akka.Dispatch
where TQueue : Akka.Dispatch.MessageQueues.IMessageQueue { }
public interface IRequiresMessageQueue<T>
where T : Akka.Dispatch.ISemantics { }
public interface IRunnable
public interface IRunnable : System.Threading.IThreadPoolWorkItem
{
void Run();
}
public interface ISemantics { }
public interface IUnboundedDequeBasedMessageQueueSemantics : Akka.Dispatch.IDequeBasedMessageQueueSemantics, Akka.Dispatch.ISemantics, Akka.Dispatch.IUnboundedMessageQueueSemantics { }
public interface IUnboundedMessageQueueSemantics : Akka.Dispatch.ISemantics { }
public class Mailbox : Akka.Dispatch.IRunnable
public class Mailbox : Akka.Dispatch.IRunnable, System.Threading.IThreadPoolWorkItem
{
public Mailbox(Akka.Dispatch.MessageQueues.IMessageQueue messageQueue) { }
public Akka.Dispatch.MessageDispatcher Dispatcher { get; }
public Akka.Dispatch.MessageQueues.IMessageQueue MessageQueue { get; }
public virtual void CleanUp() { }
[System.Diagnostics.ConditionalAttribute("MAILBOXDEBUG")]
public static void DebugPrint(string message, params object[] args) { }
public void Execute() { }
public void Run() { }
public virtual void SetActor(Akka.Actor.ActorCell actorCell) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2553,9 +2553,9 @@ namespace Akka.Dispatch
}
public sealed class Dispatchers
{
public static readonly string DefaultBlockingDispatcherId;
public static readonly string DefaultDispatcherId;
public static readonly string SynchronizedDispatcherId;
public const string DefaultBlockingDispatcherId = "akka.actor.default-blocking-io-dispatcher";
public const string DefaultDispatcherId = "akka.actor.default-dispatcher";
public const string SynchronizedDispatcherId = "akka.actor.synchronized-dispatcher";
public Dispatchers(Akka.Actor.ActorSystem system, Akka.Dispatch.IDispatcherPrerequisites prerequisites, Akka.Event.ILoggingAdapter logger) { }
public Akka.Configuration.Config DefaultDispatcherConfig { get; }
public Akka.Dispatch.MessageDispatcher DefaultGlobalDispatcher { get; }
Expand Down
55 changes: 35 additions & 20 deletions src/core/Akka.Streams/Implementation/IO/OutputStreamSourceStage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

namespace Akka.Streams.Implementation.IO
{

/// <summary>
/// INTERNAL API
/// </summary>
Expand All @@ -31,7 +30,9 @@ internal class OutputStreamSourceStage : GraphStageWithMaterializedValue<SourceS
/// <summary>
/// TBD
/// </summary>
internal interface IAdapterToStageMessage { }
internal interface IAdapterToStageMessage
{
}

/// <summary>
/// TBD
Expand All @@ -45,7 +46,6 @@ internal class Flush : IAdapterToStageMessage

private Flush()
{

}
}

Expand All @@ -67,7 +67,9 @@ private Close()
/// <summary>
/// TBD
/// </summary>
internal interface IDownstreamStatus { }
internal interface IDownstreamStatus
{
}

/// <summary>
/// TBD
Expand All @@ -81,7 +83,6 @@ internal class Ok : IDownstreamStatus

private Ok()
{

}
}

Expand All @@ -97,7 +98,6 @@ internal class Canceled : IDownstreamStatus

private Canceled()
{

}
}

Expand Down Expand Up @@ -127,7 +127,8 @@ private sealed class Logic : OutGraphStageLogic, IStageWithCallback
private TaskCompletionSource<NotUsed> _close;
private MessageDispatcher _dispatcher;

public Logic(OutputStreamSourceStage stage, BlockingCollection<ByteString> dataQueue, AtomicReference<IDownstreamStatus> downstreamStatus, string dispatcherId) : base(stage.Shape)
public Logic(OutputStreamSourceStage stage, BlockingCollection<ByteString> dataQueue,
AtomicReference<IDownstreamStatus> downstreamStatus, string dispatcherId) : base(stage.Shape)
{
_stage = stage;
_dataQueue = dataQueue;
Expand All @@ -141,7 +142,8 @@ public Logic(OutputStreamSourceStage stage, BlockingCollection<ByteString> dataQ
else
FailStage(result.Value as Exception);
});
_upstreamCallback = GetAsyncCallback<(IAdapterToStageMessage, TaskCompletionSource<NotUsed>)>(OnAsyncMessage);
_upstreamCallback =
GetAsyncCallback<(IAdapterToStageMessage, TaskCompletionSource<NotUsed>)>(OnAsyncMessage);
_pullTask = new OnPullRunnable(downstreamCallback, dataQueue, _cancellation.Token);
SetHandler(_stage._out, this);
}
Expand All @@ -163,14 +165,15 @@ public override void PostStop()
_cancellation.Cancel(false);
base.PostStop();
}

private sealed class OnPullRunnable : IRunnable
{
private readonly Action<Either<ByteString, Exception>> _callback;
private readonly BlockingCollection<ByteString> _dataQueue;
private readonly CancellationToken _cancellationToken;

public OnPullRunnable(Action<Either<ByteString, Exception>> callback, BlockingCollection<ByteString> dataQueue, CancellationToken cancellationToken)
public OnPullRunnable(Action<Either<ByteString, Exception>> callback,
BlockingCollection<ByteString> dataQueue, CancellationToken cancellationToken)
{
_callback = callback;
_dataQueue = dataQueue;
Expand All @@ -192,8 +195,15 @@ public void Run()
_callback(new Right<ByteString, Exception>(ex));
}
}

#if !NETSTANDARD
public void Execute()
{
Run();
}
#endif
}

public override void OnPull() => _dispatcher.Schedule(_pullTask);

private void OnPush(ByteString data)
Expand Down Expand Up @@ -282,7 +292,8 @@ public OutputStreamSourceStage(TimeSpan writeTimeout)
/// <param name="inheritedAttributes">TBD</param>
/// <exception cref="ArgumentException">TBD</exception>
/// <returns>TBD</returns>
public override ILogicAndMaterializedValue<Stream> CreateLogicAndMaterializedValue(Attributes inheritedAttributes)
public override ILogicAndMaterializedValue<Stream> CreateLogicAndMaterializedValue(
Attributes inheritedAttributes)
{
// has to be in this order as module depends on shape
var maxBuffer = inheritedAttributes.GetAttribute(new Attributes.InputBuffer(16, 16)).Max;
Expand All @@ -306,7 +317,7 @@ public override ILogicAndMaterializedValue<Stream> CreateLogicAndMaterializedVal
/// </summary>
internal class OutputStreamAdapter : Stream
{
#region not supported
#region not supported

/// <summary>
/// TBD
Expand All @@ -315,7 +326,8 @@ internal class OutputStreamAdapter : Stream
/// <param name="origin">TBD</param>
/// <exception cref="NotSupportedException">TBD</exception>
/// <returns>TBD</returns>
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException("This stream can only write");
public override long Seek(long offset, SeekOrigin origin) =>
throw new NotSupportedException("This stream can only write");

/// <summary>
/// TBD
Expand All @@ -332,7 +344,8 @@ internal class OutputStreamAdapter : Stream
/// <param name="count">TBD</param>
/// <exception cref="NotSupportedException">TBD</exception>
/// <returns>TBD</returns>
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException("This stream can only write");
public override int Read(byte[] buffer, int offset, int count) =>
throw new NotSupportedException("This stream can only write");

/// <summary>
/// TBD
Expand All @@ -352,8 +365,9 @@ public override long Position

#endregion

private static readonly Exception PublisherClosedException = new IOException("Reactive stream is terminated, no writes are possible");

private static readonly Exception PublisherClosedException =
new IOException("Reactive stream is terminated, no writes are possible");

private readonly BlockingCollection<ByteString> _dataQueue;
private readonly AtomicReference<IDownstreamStatus> _downstreamStatus;
private readonly IStageWithCallback _stageWithCallback;
Expand Down Expand Up @@ -405,7 +419,6 @@ private void SendData(ByteString data) => Send(() =>

private void SendMessage(IAdapterToStageMessage msg, bool handleCancelled = true) => Send(() =>
{

_stageWithCallback.WakeUp(msg).Wait(_writeTimeout);
if (_downstreamStatus.Value is Canceled && handleCancelled)
{
Expand All @@ -414,7 +427,7 @@ private void SendMessage(IAdapterToStageMessage msg, bool handleCancelled = true
throw PublisherClosedException;
}
});


/// <summary>
/// TBD
Expand Down Expand Up @@ -445,13 +458,15 @@ protected override void Dispose(bool disposing)
/// TBD
/// </summary>
public override bool CanRead => false;

/// <summary>
/// TBD
/// </summary>
public override bool CanSeek => false;

/// <summary>
/// TBD
/// </summary>
public override bool CanWrite => true;
}
}
}
2 changes: 1 addition & 1 deletion src/core/Akka/Actor/ActorCell.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void Init(bool sendSupervise, MailboxType mailboxType)
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
var mailbox = Dispatcher.CreateMailbox(this, mailboxType);
var mailbox = MessageDispatcher.CreateMailbox(this, mailboxType);

Create createMessage;
/*
Expand Down
7 changes: 7 additions & 0 deletions src/core/Akka/Actor/Scheduler/HashedWheelTimerScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,13 @@ public override string ToString()
{
return $"[{_receiver}.Tell({_message}, {_sender})]";
}

#if !NETSTANDARD
public void Execute()
{
Run();
}
#endif
}

private class SchedulerRegistration
Expand Down
Loading