Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ValueTask support to PipeTo #6025

Merged
merged 5 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1387,9 +1387,13 @@ namespace Akka.Actor
public class static PipeToSupport
{
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.Task<T> taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.ValueTask<T> taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.Task<T> taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.ValueTask<T> taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.Task taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.ValueTask taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.Task taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.ValueTask taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
}
public sealed class PoisonPill : Akka.Actor.IAutoReceivedMessage, Akka.Actor.IPossiblyHarmful, Akka.Event.IDeadLetterSuppression
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1389,9 +1389,13 @@ namespace Akka.Actor
public class static PipeToSupport
{
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.Task<T> taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.ValueTask<T> taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.Task<T> taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.ValueTask<T> taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.Task taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.ValueTask taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.Task taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.ValueTask taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
}
public sealed class PoisonPill : Akka.Actor.IAutoReceivedMessage, Akka.Actor.IPossiblyHarmful, Akka.Event.IDeadLetterSuppression
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1387,9 +1387,13 @@ namespace Akka.Actor
public class static PipeToSupport
{
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.Task<T> taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.ValueTask<T> taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.Task<T> taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo<T>(this System.Threading.Tasks.ValueTask<T> taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<T, object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.Task taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.ValueTask taskToPipe, Akka.Actor.ICanTell recipient, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.Task taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
public static System.Threading.Tasks.Task PipeTo(this System.Threading.Tasks.ValueTask taskToPipe, Akka.Actor.ICanTell recipient, bool useConfigureAwait, Akka.Actor.IActorRef sender = null, System.Func<object> success = null, System.Func<System.Exception, object> failure = null) { }
}
public sealed class PoisonPill : Akka.Actor.IAutoReceivedMessage, Akka.Actor.IPossiblyHarmful, Akka.Event.IDeadLetterSuppression
{
Expand Down
63 changes: 63 additions & 0 deletions src/core/Akka.Tests/Actor/PipeToSupportSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using Akka.Actor;
using Akka.Event;
using Akka.TestKit;
Expand All @@ -22,11 +23,18 @@ public class PipeToSupportSpec : AkkaSpec
private readonly Task<string> _task;
private readonly Task _taskWithoutResult;

private readonly ValueTask<string> _valueTask;
private readonly ValueTask _valueTaskWithoutResult;

public PipeToSupportSpec()
{
_taskCompletionSource = new TaskCompletionSource<string>();
_task = _taskCompletionSource.Task;
_taskWithoutResult = _taskCompletionSource.Task;

_valueTask = new ValueTask<string>(_taskCompletionSource.Task);
_valueTaskWithoutResult = new ValueTask(_taskCompletionSource.Task);

Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter));
}

Expand All @@ -38,6 +46,14 @@ public async Task Should_immediately_PipeTo_completed_Task()
await ExpectMsgAsync("foo");
}

[Fact]
public async Task ValueTask_Should_immediately_PipeTo_completed_Task()
{
var task = new ValueTask<string>("foo");
task.PipeTo(TestActor);
await ExpectMsgAsync("foo");
}

[Fact]
public async Task Should_by_default_send_task_result_as_message()
{
Expand All @@ -46,6 +62,14 @@ public async Task Should_by_default_send_task_result_as_message()
await ExpectMsgAsync("Hello");
}

[Fact]
public async Task ValueTask_Should_by_default_send_task_result_as_message()
{
_valueTask.PipeTo(TestActor);
_taskCompletionSource.SetResult("Hello");
await ExpectMsgAsync("Hello");
}

[Fact]
public async Task Should_by_default_not_send_a_success_message_if_the_task_does_not_produce_a_result()
{
Expand All @@ -54,6 +78,14 @@ public async Task Should_by_default_not_send_a_success_message_if_the_task_does_
await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
}

[Fact]
public async Task ValueTask_Should_by_default_not_send_a_success_message_if_the_task_does_not_produce_a_result()
{
_valueTaskWithoutResult.PipeTo(TestActor);
_taskCompletionSource.SetResult("Hello");
await ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
}

[Fact]
public async Task Should_by_default_send_task_exception_as_status_failure_message()
{
Expand All @@ -64,6 +96,16 @@ public async Task Should_by_default_send_task_exception_as_status_failure_messag
await ExpectMsgAsync<Status.Failure>(x => x.Cause.Message == "Boom");
}

[Fact]
public async Task ValueTask_Should_by_default_send_task_exception_as_status_failure_message()
{
_valueTask.PipeTo(TestActor);
_valueTaskWithoutResult.PipeTo(TestActor);
_taskCompletionSource.SetException(new Exception("Boom"));
await ExpectMsgAsync<Status.Failure>(x => x.Cause.Message == "Boom");
await ExpectMsgAsync<Status.Failure>(x => x.Cause.Message == "Boom");
}

[Fact]
public async Task Should_use_success_handling_to_transform_task_result()
{
Expand All @@ -75,6 +117,17 @@ public async Task Should_use_success_handling_to_transform_task_result()
pipeTo.Should().Contain("Hello World");
}

[Fact]
public async Task ValueTask_Should_use_success_handling_to_transform_task_result()
{
_valueTask.PipeTo(TestActor, success: x => "Hello " + x);
_valueTaskWithoutResult.PipeTo(TestActor, success: () => "Hello");
_taskCompletionSource.SetResult("World");
var pipeTo = await ReceiveNAsync(2, default).Cast<string>().ToListAsync();
pipeTo.Should().Contain("Hello");
pipeTo.Should().Contain("Hello World");
}

[Fact]
public async Task Should_use_failure_handling_to_transform_task_exception()
{
Expand All @@ -84,5 +137,15 @@ public async Task Should_use_failure_handling_to_transform_task_exception()
await ExpectMsgAsync("Such a failure...");
await ExpectMsgAsync("Such a failure...");
}

[Fact]
public async Task ValueTask_Should_use_failure_handling_to_transform_task_exception()
{
_valueTask.PipeTo(TestActor, failure: e => "Such a " + e.Message);
_valueTaskWithoutResult.PipeTo(TestActor, failure: e => "Such a " + e.Message);
_taskCompletionSource.SetException(new Exception("failure..."));
await ExpectMsgAsync("Such a failure...");
await ExpectMsgAsync("Such a failure...");
}
}
}
3 changes: 2 additions & 1 deletion src/core/Akka.Tests/Akka.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

<PropertyGroup>
<AssemblyTitle>Akka.Tests</AssemblyTitle>
<TargetFrameworks>$(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' == 'Windows_NT' ">$(NetFrameworkTestVersion);$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
<TargetFrameworks Condition=" '$(OS)' != 'Windows_NT' ">$(NetTestVersion);$(NetCoreTestVersion)</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
Expand Down
69 changes: 69 additions & 0 deletions src/core/Akka/Actor/PipeToSupport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ public static Task PipeTo<T>(
Func<Exception, object> failure = null)
=> PipeTo(taskToPipe, recipient, false, sender, success, failure);

public static Task PipeTo<T>(
this ValueTask<T> taskToPipe,
ICanTell recipient,
IActorRef sender = null,
Func<T, object> success = null,
Func<Exception, object> failure = null)
=> PipeTo(taskToPipe, recipient, false, sender, success, failure);

/// <summary>
/// Pipes the output of a Task directly to the <paramref name="recipient"/>'s mailbox once
/// the task completes
Expand Down Expand Up @@ -74,6 +82,32 @@ public static async Task PipeTo<T>(
}
}

public static async Task PipeTo<T>(
this ValueTask<T> taskToPipe,
ICanTell recipient,
bool useConfigureAwait,
IActorRef sender = null,
Func<T, object> success = null,
Func<Exception, object> failure = null)
{
sender ??= ActorRefs.NoSender;

try
{
var result = await taskToPipe.ConfigureAwait(useConfigureAwait);

recipient.Tell(success != null
? success(result)
: result, sender);
}
catch (Exception ex)
{
recipient.Tell(failure != null
? failure(ex)
: new Status.Failure(ex), sender);
}
}

/// <summary>
/// Pipes the output of a Task directly to the <paramref name="recipient"/>'s mailbox once
/// the task completes. As this task has no result, only exceptions will be piped to the <paramref name="recipient"/>
Expand All @@ -92,6 +126,14 @@ public static Task PipeTo(
Func<Exception, object> failure = null)
=> PipeTo(taskToPipe, recipient, false, sender, success, failure);

public static Task PipeTo(
this ValueTask taskToPipe,
ICanTell recipient,
IActorRef sender = null,
Func<object> success = null,
Func<Exception, object> failure = null)
=> PipeTo(taskToPipe, recipient, false, sender, success, failure);

/// <summary>
/// Pipes the output of a Task directly to the <paramref name="recipient"/>'s mailbox once
/// the task completes. As this task has no result, only exceptions will be piped to the <paramref name="recipient"/>
Expand Down Expand Up @@ -129,6 +171,33 @@ public static async Task PipeTo(
: new Status.Failure(ex), sender);
}
}

public static async Task PipeTo(
this ValueTask taskToPipe,
ICanTell recipient,
bool useConfigureAwait,
IActorRef sender = null,
Func<object> success = null,
Func<Exception, object> failure = null)
{
sender = sender ?? ActorRefs.NoSender;

try
{
await taskToPipe.ConfigureAwait(useConfigureAwait);

if (success != null)
{
recipient.Tell(success(), sender);
}
}
catch (Exception ex)
{
recipient.Tell(failure != null
? failure(ex)
: new Status.Failure(ex), sender);
}
}
}
}