Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8-74]FlowConcatAllSpec #6549

Merged
merged 2 commits into from
Mar 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 32 additions & 31 deletions src/core/Akka.Streams.Tests/Dsl/FlowConcatAllSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Akka.Streams.Dsl.Internal;
using Akka.TestKit;
using Reactive.Streams;
using System.Threading.Tasks;

// ReSharper disable InvokeAsExtensionMethod

Expand All @@ -32,17 +33,16 @@ public FlowConcatAllSpec(ITestOutputHelper helper) : base(helper)
private static readonly TestException TestException = new TestException("test");

[Fact]
public void ConcatAll_must_work_in_the_happy_case()
public async Task ConcatAll_must_work_in_the_happy_case()
{
this.AssertAllStagesStopped(() =>
{
var s1 = Source.From(new[] {1, 2});
var s2 = Source.From(new int[] {});
var s3 = Source.From(new[] {3});
var s4 = Source.From(new[] {4, 5, 6});
var s5 = Source.From(new[] {7, 8, 9, 10});
await this.AssertAllStagesStoppedAsync(() => {
var s1 = Source.From(new[] { 1, 2 });
var s2 = Source.From(new int[] { });
var s3 = Source.From(new[] { 3 });
var s4 = Source.From(new[] { 4, 5, 6 });
var s5 = Source.From(new[] { 7, 8, 9, 10 });

var main = Source.From(new[] {s1, s2, s3, s4, s5});
var main = Source.From(new[] { s1, s2, s3, s4, s5 });

var subscriber = this.CreateManualSubscriberProbe<int>();
main.ConcatMany(s => s).To(Sink.FromSubscriber(subscriber)).Run(Materializer);
Expand All @@ -53,6 +53,7 @@ public void ConcatAll_must_work_in_the_happy_case()

subscription.Request(1);
subscriber.ExpectComplete();
return Task.CompletedTask;
}, Materializer);
}

Expand All @@ -75,10 +76,9 @@ public void ConcatAll_must_work_together_with_SplitWhen()
subscriber.ExpectComplete();}

[Fact]
public void ConcatAll_must_on_OnError_on_master_stream_cancel_the_current_open_substream_and_signal_error()
public async Task ConcatAll_must_on_OnError_on_master_stream_cancel_the_current_open_substream_and_signal_error()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var publisher = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
var subscriber = this.CreateManualSubscriberProbe<int>();
Source.FromPublisher(publisher)
Expand All @@ -99,14 +99,14 @@ public void ConcatAll_must_on_OnError_on_master_stream_cancel_the_current_open_s
upstream.SendError(TestException);
subscriber.ExpectError().Should().Be(TestException);
subUpstream.ExpectCancellation();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ConcatAll_must_on_OnError_on_master_stream_cancel_the_currently_opening_substream_and_signal_error()
public async Task ConcatAll_must_on_OnError_on_master_stream_cancel_the_currently_opening_substream_and_signal_error()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var publisher = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
var subscriber = this.CreateManualSubscriberProbe<int>();
Source.FromPublisher(publisher)
Expand All @@ -130,18 +130,18 @@ public void ConcatAll_must_on_OnError_on_master_stream_cancel_the_currently_open

subscriber.ExpectError().Should().Be(TestException);
subUpstream.ExpectCancellation();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ConcatAll_must_on_OnError_on_opening_substream_cancel_the_master_stream_and_signal_error()
public async Task ConcatAll_must_on_OnError_on_opening_substream_cancel_the_master_stream_and_signal_error()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var publisher = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
var subscriber = this.CreateManualSubscriberProbe<int>();
Source.FromPublisher(publisher)
.ConcatMany<Source<int,NotUsed>,int,NotUsed>(x => { throw TestException; })
.ConcatMany<Source<int, NotUsed>, int, NotUsed>(x => { throw TestException; })
.To(Sink.FromSubscriber(subscriber))
.Run(Materializer);

Expand All @@ -155,14 +155,14 @@ public void ConcatAll_must_on_OnError_on_opening_substream_cancel_the_master_str
upstream.SendNext(substreamFlow);
subscriber.ExpectError().Should().Be(TestException);
upstream.ExpectCancellation();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ConcatAll_must_on_OnError_on_open_substream_cancel_the_master_stream_and_signal_error()
public async Task ConcatAll_must_on_OnError_on_open_substream_cancel_the_master_stream_and_signal_error()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var publisher = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
var subscriber = this.CreateManualSubscriberProbe<int>();
Source.FromPublisher(publisher)
Expand All @@ -183,14 +183,14 @@ public void ConcatAll_must_on_OnError_on_open_substream_cancel_the_master_stream
subUpstream.SendError(TestException);
subscriber.ExpectError().Should().Be(TestException);
upstream.ExpectCancellation();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ConcatAll_must_on_cancellation_cancel_the_current_open_substream_and_the_master_stream()
public async Task ConcatAll_must_on_cancellation_cancel_the_current_open_substream_and_the_master_stream()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var publisher = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
var subscriber = this.CreateManualSubscriberProbe<int>();
Source.FromPublisher(publisher)
Expand All @@ -212,14 +212,14 @@ public void ConcatAll_must_on_cancellation_cancel_the_current_open_substream_and

subUpstream.ExpectCancellation();
upstream.ExpectCancellation();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ConcatAll_must_on_cancellation_cancel_the_currently_opening_substream_and_the_master_stream()
public async Task ConcatAll_must_on_cancellation_cancel_the_currently_opening_substream_and_the_master_stream()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var publisher = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
var subscriber = this.CreateManualSubscriberProbe<int>();
Source.FromPublisher(publisher)
Expand All @@ -243,14 +243,14 @@ public void ConcatAll_must_on_cancellation_cancel_the_currently_opening_substrea

subUpstream.ExpectCancellation();
upstream.ExpectCancellation();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ConcatAll_must_pass_along_early_cancellation()
public async Task ConcatAll_must_pass_along_early_cancellation()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var up = this.CreateManualPublisherProbe<Source<int, NotUsed>>();
var down = this.CreateManualSubscriberProbe<int>();

Expand All @@ -264,6 +264,7 @@ public void ConcatAll_must_pass_along_early_cancellation()
up.Subscribe(flowSubscriber);
var upSub = up.ExpectSubscription();
upSub.ExpectCancellation();
return Task.CompletedTask;
}, Materializer);
}
}
Expand Down