Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source that flattens a task source and keeps the materialized value #5338

Merged
merged 1 commit into from
Oct 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/articles/streams/builtinstages.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ If the task fails the stream is failed with that exception.

**completes** after the task has completed

#### FromTaskSource

Streams the elements of the given ``Task`` source once it successfully completes.
If the task fails the stream is failed.

**emits** the next value from the ``Task`` source, once it has completed

**completes** after the ``Task`` source completes

#### Unfold

Stream the result of a function as long as it returns not ``null``, the value inside the option
Expand Down
14 changes: 13 additions & 1 deletion src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,8 @@ namespace Akka.Streams
}
public class StreamDetachedException : System.Exception
{
public static readonly Akka.Streams.StreamDetachedException Instance;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit of a breaking change here - should probably keep this member but also allow the public constructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb should I send a new PR and add this back?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be great

public StreamDetachedException() { }
public StreamDetachedException(string message) { }
}
public class StreamLimitReachedException : System.Exception
{
Expand Down Expand Up @@ -1946,6 +1947,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> FromObservable<T>(System.IObservable<T> observable, int maxBufferCapacity = 128, Akka.Streams.OverflowStrategy overflowStrategy = 2) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> FromPublisher<T>(Reactive.Streams.IPublisher<T> publisher) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> FromTask<T>(System.Threading.Tasks.Task<T> task) { }
public static Akka.Streams.Dsl.Source<T, System.Threading.Tasks.Task<M>> FromTaskSource<T, M>(System.Threading.Tasks.Task<Akka.Streams.Dsl.Source<T, M>> task) { }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

public static Akka.Streams.Dsl.Source<TOut, System.Threading.Tasks.Task<TMat>> Lazily<TOut, TMat>(System.Func<Akka.Streams.Dsl.Source<TOut, TMat>> create) { }
public static Akka.Streams.Dsl.Source<T, System.Threading.Tasks.TaskCompletionSource<T>> Maybe<T>() { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> Never<T>() { }
Expand Down Expand Up @@ -4351,6 +4353,15 @@ namespace Akka.Streams.Implementation.Fusing
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
public sealed class TaskFlattenSource<T, M> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SourceShape<T>, System.Threading.Tasks.Task<M>>
{
public readonly Akka.Streams.Outlet<T> Outlet;
public TaskFlattenSource(System.Threading.Tasks.Task<Akka.Streams.Dsl.Source<T, M>> taskSource) { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public override Akka.Streams.SourceShape<T> Shape { get; }
public override Akka.Streams.Stage.ILogicAndMaterializedValue<System.Threading.Tasks.Task<M>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
public sealed class TaskSource<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.SourceShape<T>>
{
public readonly Akka.Streams.Outlet<T> Outlet;
Expand Down Expand Up @@ -4486,6 +4497,7 @@ namespace Akka.Streams.Implementation.Stages
public static readonly Akka.Streams.Attributes Sum;
public static readonly Akka.Streams.Attributes Take;
public static readonly Akka.Streams.Attributes TakeWhile;
public static readonly Akka.Streams.Attributes TaskFlattenSource;
public static readonly Akka.Streams.Attributes TaskSource;
public static readonly Akka.Streams.Attributes TerminationWatcher;
public static readonly Akka.Streams.Attributes TickSource;
Expand Down
269 changes: 269 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FutureFlattenSourceSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
//-----------------------------------------------------------------------
// <copyright file="FutureFlattenSourceSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
using Akka.TestKit;
using FluentAssertions;
using Xunit;

namespace Akka.Streams.Tests.Dsl
{
public class FutureFlattenSourceSpec : AkkaSpec
{
private readonly IMaterializer _materializer;

private static readonly Source<int, string> underlying =
Source.From(new List<int>() { 1, 2, 3 }).MapMaterializedValue(_ => "foo");

public FutureFlattenSourceSpec() => _materializer = ActorMaterializer.Create(Sys);

[Fact]
public void TaskSource_must_emit_the_elements_of_the_already_successful_task_source()
{
this.AssertAllStagesStopped(() =>
{
var (sourceMatVal, sinkMatVal) = Source.FromTaskSource(Task.FromResult(underlying))
.ToMaterialized(Sink.Seq<int>(), Keep.Both)
.Run(_materializer);

sourceMatVal.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
sinkMatVal.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();

// should complete as soon as inner source has been materialized
sourceMatVal.Result.Should().Be("foo");
sinkMatVal.Result.Should().BeEquivalentTo(ImmutableList.CreateRange(new List<int>() { 1, 2, 3 }));
}, _materializer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}

[Fact]
public void TaskSource_must_emit_no_elements_before_the_task_of_source_successful()
{
this.AssertAllStagesStopped(() =>
{
var c = this.CreateManualSubscriberProbe<int>();
var sourcePromise = new TaskCompletionSource<Source<int, string>>();

Source.FromTaskSource(sourcePromise.Task).RunWith(Sink.AsPublisher<int>(true), _materializer).Subscribe(c);
var sub = c.ExpectSubscription();
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
sub.Request(3);
c.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
sourcePromise.SetResult(underlying);
c.ExpectNext(1);
c.ExpectNext(2);
c.ExpectNext(3);
c.ExpectComplete();
}, _materializer);
}

[Fact]
public void TaskSource_must_emit_the_elements_of_the_task_source()
{
this.AssertAllStagesStopped(() =>
{
var sourcePromise = new TaskCompletionSource<Source<int, string>>();

var (sourceMatVal, sinkMatVal) = Source.FromTaskSource(sourcePromise.Task)
.ToMaterialized(Sink.Seq<int>(), Keep.Both)
.Run(_materializer);

sourcePromise.SetResult(underlying);

// should complete as soon as inner source has been materialized
sourceMatVal.Result.Should().Be("foo");
sinkMatVal.Result.Should().BeEquivalentTo(ImmutableList.CreateRange(new List<int>() { 1, 2, 3 }));
}, _materializer);
}

[Fact]
public void TaskSource_must_handle_downstream_cancelling_before_the_underlying_task_completes()
{
this.AssertAllStagesStopped(() =>
{
var sourcePromise = new TaskCompletionSource<Source<int, string>>();
var probe = this.CreateSubscriberProbe<int>();

var sourceMatVal = Source.FromTaskSource(sourcePromise.Task)
.ToMaterialized(Sink.FromSubscriber(probe), Keep.Left)
.Run(_materializer);

// wait for cancellation to occur
probe.EnsureSubscription();
probe.Request(1);
probe.Cancel();

// try to avoid a race between probe cancel and completing the promise
Thread.Sleep(100);

// even though canceled the underlying matval should arrive
sourcePromise.SetResult(underlying);
var failure = sourceMatVal.Exception.Flatten().InnerException;
failure.Should().BeAssignableTo<StreamDetachedException>();
failure.Message.Should().Be("Stream cancelled before Source Task completed");
}, _materializer);
}

[Fact]
public void TaskSource_must_fail_if_the_underlying_task_is_failed()
{
this.AssertAllStagesStopped(() =>
{
var failure = new TestException("foo");
var underlying = Task.FromException<Source<int, string>>(failure);

var (sourceMatVal, sinkMatVal) = Source.FromTaskSource(underlying)
.ToMaterialized(Sink.Seq<int>(), Keep.Both)
.Run(_materializer);

// wait until the underlying task is completed
Thread.Sleep(100);

sourceMatVal.Exception.Flatten().InnerException.Should().Be(failure);
sinkMatVal.Exception.Flatten().InnerException.Should().Be(failure);
}, _materializer);
}

[Fact]
public void TaskSource_must_fail_as_the_underlying_task_fails_after_outer_source_materialization()
{
this.AssertAllStagesStopped(() =>
{
var failure = new TestException("foo");
var sourcePromise = new TaskCompletionSource<Source<int, string>>();
var materializationLatch = new TestLatch(1);

var (sourceMatVal, sinkMatVal) = Source.FromTaskSource(sourcePromise.Task)
.MapMaterializedValue(value =>
{
materializationLatch.CountDown();
return value;
})
.ToMaterialized(Sink.Seq<int>(), Keep.Both)
.Run(_materializer);

// we don't know that materialization completed yet (this is still a bit racy)
materializationLatch.Ready(RemainingOrDefault);
sourcePromise.SetException(failure);
Thread.Sleep(100);

sourceMatVal.Exception.Flatten().InnerException.Should().Be(failure);
sinkMatVal.Exception.Flatten().InnerException.Should().Be(failure);
}, _materializer);
}

[Fact]
public void TaskSource_must_fail_as_the_underlying_task_fails_after_outer_source_materialization_with_no_demand()
{
this.AssertAllStagesStopped(() =>
{
var failure = new TestException("foo");
var sourcePromise = new TaskCompletionSource<Source<int, string>>();
var testProbe = this.CreateSubscriberProbe<int>();

var sourceMatVal = Source.FromTaskSource(sourcePromise.Task)
.To(Sink.FromSubscriber(testProbe))
.Run(_materializer);

testProbe.ExpectSubscription();
sourcePromise.SetException(failure);
Thread.Sleep(100);

sourceMatVal.Exception.Flatten().InnerException.Should().Be(failure);
}, _materializer);
}

[Fact]
public void TaskSource_must_handle_backpressure_when_the_task_completes()
{
this.AssertAllStagesStopped(() =>
{
var subscriber = this.CreateSubscriberProbe<int>();
var publisher = this.CreatePublisherProbe<int>();

var sourcePromise = new TaskCompletionSource<Source<int, string>>();
var matVal = Source.FromTaskSource(sourcePromise.Task)
.To(Sink.FromSubscriber(subscriber))
.Run(_materializer);

subscriber.EnsureSubscription();
sourcePromise.SetResult(Source.FromPublisher(publisher).MapMaterializedValue(_ => "woho"));

// materialized value completes but still no demand
matVal.Result.Should().Be("woho");

// then demand and let an element through to see it works
subscriber.EnsureSubscription();
subscriber.Request(1);
publisher.ExpectRequest();
publisher.SendNext(1);
subscriber.ExpectNext(1);
publisher.SendComplete();
subscriber.ExpectComplete();
}, _materializer);
}

[Fact]
public void TaskSource_must_carry_through_cancellation_to_later_materialized_source()
{
this.AssertAllStagesStopped(() =>
{
var subscriber = this.CreateSubscriberProbe<int>();
var publisher = this.CreatePublisherProbe<int>();

var sourcePromise = new TaskCompletionSource<Source<int, string>>();
var matVal = Source.FromTaskSource(sourcePromise.Task)
.To(Sink.FromSubscriber(subscriber))
.Run(_materializer);

subscriber.EnsureSubscription();
sourcePromise.SetResult(Source.FromPublisher(publisher).MapMaterializedValue(_ => "woho"));

// materialized value completes but still no demand
matVal.Result.Should().Be("woho");

// then demand and let an element through to see it works
subscriber.EnsureSubscription();
subscriber.Cancel();
publisher.ExpectCancellation();
}, _materializer);
}

[Fact]
public void TaskSource_must_fail_when_the_task_source_materialization_fails()
{
this.AssertAllStagesStopped(() =>
{
var inner = Task.FromResult(Source.FromGraph(new FailingMatGraphStage()));
var (innerSourceMat, outerSinkMat) = Source.FromTaskSource(inner).ToMaterialized(Sink.Seq<int>(), Keep.Both).Run(_materializer);

// wait until the underlying tasks are completed
Thread.Sleep(100);

outerSinkMat.Exception.Flatten().InnerException.Should().Be(new TestException("INNER_FAILED"));
innerSourceMat.Exception.Flatten().InnerException.Should().Be(new TestException("INNER_FAILED"));
}, _materializer);
}

private class FailingMatGraphStage : GraphStageWithMaterializedValue<SourceShape<int>, string>
{
private readonly Outlet<int> _out = new Outlet<int>("whatever");

public override SourceShape<int> Shape => new SourceShape<int>(_out);

public override ILogicAndMaterializedValue<string> CreateLogicAndMaterializedValue(Attributes inheritedAttributes) =>
throw new TestException("INNER_FAILED");
}
}
}
12 changes: 12 additions & 0 deletions src/core/Akka.Streams/Dsl/Source.cs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,18 @@ public static Source<T, TMat> FromGraph<T, TMat>(IGraph<SourceShape<T>, TMat> so
/// <returns>TBD</returns>
public static Source<T, NotUsed> Never<T>() => FromTask(new TaskCompletionSource<T>().Task).WithAttributes(DefaultAttributes.NeverSource);

/// <summary>
/// Streams the elements of the given future source once it successfully completes.
/// If the <see cref="Task{T}"/> fails the stream is failed with the exception from the future. If downstream cancels before the
/// stream completes the materialized <see cref="Task{M}"/> will be failed with a <see cref="StreamDetachedException"/>
/// </summary>
/// <typeparam name="T">TBD</typeparam>
/// <typeparam name="M">TBD</typeparam>
/// <param name="task">TBD</param>
/// <returns>TBD</returns>
public static Source<T, Task<M>> FromTaskSource<T, M>(Task<Source<T, M>> task) =>
FromGraph(new TaskFlattenSource<T, M>(task));

/// <summary>
/// Elements are emitted periodically with the specified interval.
/// The tick element will be delivered to downstream consumers that has requested any elements.
Expand Down
Loading