From 6cab89d685e6d11ae12ca137d5530be8018a6790 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Thu, 23 Mar 2023 16:41:29 +0100 Subject: [PATCH] [8-74]`FlowConcatAllSpec` --- .../Dsl/FlowConcatAllSpec.cs | 63 ++++++++++--------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowConcatAllSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowConcatAllSpec.cs index 1514fe68658..a6e97766dee 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowConcatAllSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowConcatAllSpec.cs @@ -14,6 +14,7 @@ using Akka.Streams.Dsl.Internal; using Akka.TestKit; using Reactive.Streams; +using System.Threading.Tasks; // ReSharper disable InvokeAsExtensionMethod @@ -32,17 +33,16 @@ public FlowConcatAllSpec(ITestOutputHelper helper) : base(helper) private static readonly TestException TestException = new TestException("test"); [Fact] - public void ConcatAll_must_work_in_the_happy_case() + public async Task ConcatAll_must_work_in_the_happy_case() { - this.AssertAllStagesStopped(() => - { - var s1 = Source.From(new[] {1, 2}); - var s2 = Source.From(new int[] {}); - var s3 = Source.From(new[] {3}); - var s4 = Source.From(new[] {4, 5, 6}); - var s5 = Source.From(new[] {7, 8, 9, 10}); + await this.AssertAllStagesStoppedAsync(() => { + var s1 = Source.From(new[] { 1, 2 }); + var s2 = Source.From(new int[] { }); + var s3 = Source.From(new[] { 3 }); + var s4 = Source.From(new[] { 4, 5, 6 }); + var s5 = Source.From(new[] { 7, 8, 9, 10 }); - var main = Source.From(new[] {s1, s2, s3, s4, s5}); + var main = Source.From(new[] { s1, s2, s3, s4, s5 }); var subscriber = this.CreateManualSubscriberProbe(); main.ConcatMany(s => s).To(Sink.FromSubscriber(subscriber)).Run(Materializer); @@ -53,6 +53,7 @@ public void ConcatAll_must_work_in_the_happy_case() subscription.Request(1); subscriber.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } @@ -75,10 +76,9 @@ public void ConcatAll_must_work_together_with_SplitWhen() subscriber.ExpectComplete();} [Fact] - public void ConcatAll_must_on_OnError_on_master_stream_cancel_the_current_open_substream_and_signal_error() + public async Task ConcatAll_must_on_OnError_on_master_stream_cancel_the_current_open_substream_and_signal_error() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var publisher = this.CreateManualPublisherProbe>(); var subscriber = this.CreateManualSubscriberProbe(); Source.FromPublisher(publisher) @@ -99,14 +99,14 @@ public void ConcatAll_must_on_OnError_on_master_stream_cancel_the_current_open_s upstream.SendError(TestException); subscriber.ExpectError().Should().Be(TestException); subUpstream.ExpectCancellation(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void ConcatAll_must_on_OnError_on_master_stream_cancel_the_currently_opening_substream_and_signal_error() + public async Task ConcatAll_must_on_OnError_on_master_stream_cancel_the_currently_opening_substream_and_signal_error() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var publisher = this.CreateManualPublisherProbe>(); var subscriber = this.CreateManualSubscriberProbe(); Source.FromPublisher(publisher) @@ -130,18 +130,18 @@ public void ConcatAll_must_on_OnError_on_master_stream_cancel_the_currently_open subscriber.ExpectError().Should().Be(TestException); subUpstream.ExpectCancellation(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void ConcatAll_must_on_OnError_on_opening_substream_cancel_the_master_stream_and_signal_error() + public async Task ConcatAll_must_on_OnError_on_opening_substream_cancel_the_master_stream_and_signal_error() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var publisher = this.CreateManualPublisherProbe>(); var subscriber = this.CreateManualSubscriberProbe(); Source.FromPublisher(publisher) - .ConcatMany,int,NotUsed>(x => { throw TestException; }) + .ConcatMany, int, NotUsed>(x => { throw TestException; }) .To(Sink.FromSubscriber(subscriber)) .Run(Materializer); @@ -155,14 +155,14 @@ public void ConcatAll_must_on_OnError_on_opening_substream_cancel_the_master_str upstream.SendNext(substreamFlow); subscriber.ExpectError().Should().Be(TestException); upstream.ExpectCancellation(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void ConcatAll_must_on_OnError_on_open_substream_cancel_the_master_stream_and_signal_error() + public async Task ConcatAll_must_on_OnError_on_open_substream_cancel_the_master_stream_and_signal_error() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var publisher = this.CreateManualPublisherProbe>(); var subscriber = this.CreateManualSubscriberProbe(); Source.FromPublisher(publisher) @@ -183,14 +183,14 @@ public void ConcatAll_must_on_OnError_on_open_substream_cancel_the_master_stream subUpstream.SendError(TestException); subscriber.ExpectError().Should().Be(TestException); upstream.ExpectCancellation(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void ConcatAll_must_on_cancellation_cancel_the_current_open_substream_and_the_master_stream() + public async Task ConcatAll_must_on_cancellation_cancel_the_current_open_substream_and_the_master_stream() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var publisher = this.CreateManualPublisherProbe>(); var subscriber = this.CreateManualSubscriberProbe(); Source.FromPublisher(publisher) @@ -212,14 +212,14 @@ public void ConcatAll_must_on_cancellation_cancel_the_current_open_substream_and subUpstream.ExpectCancellation(); upstream.ExpectCancellation(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void ConcatAll_must_on_cancellation_cancel_the_currently_opening_substream_and_the_master_stream() + public async Task ConcatAll_must_on_cancellation_cancel_the_currently_opening_substream_and_the_master_stream() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var publisher = this.CreateManualPublisherProbe>(); var subscriber = this.CreateManualSubscriberProbe(); Source.FromPublisher(publisher) @@ -243,14 +243,14 @@ public void ConcatAll_must_on_cancellation_cancel_the_currently_opening_substrea subUpstream.ExpectCancellation(); upstream.ExpectCancellation(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void ConcatAll_must_pass_along_early_cancellation() + public async Task ConcatAll_must_pass_along_early_cancellation() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var up = this.CreateManualPublisherProbe>(); var down = this.CreateManualSubscriberProbe(); @@ -264,6 +264,7 @@ public void ConcatAll_must_pass_along_early_cancellation() up.Subscribe(flowSubscriber); var upSub = up.ExpectSubscription(); upSub.ExpectCancellation(); + return Task.CompletedTask; }, Materializer); } }