From 7727f16e40760c89a2f4ffba1b2351f5d9a55227 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Sat, 25 Mar 2023 16:31:14 +0100 Subject: [PATCH 1/2] [48-74] `GraphUnzipWithSpec` --- .../Dsl/GraphUnzipWithSpec.cs | 70 +++++++++---------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs index 343ca8550d2..40cb189822f 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs @@ -33,50 +33,49 @@ public GraphUnzipWithSpec(ITestOutputHelper helper) : base(helper) } [Fact] - public void UnzipWith_must_work_with_immediately_completed_publisher() + public async Task UnzipWith_must_work_with_immediately_completed_publisher() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var subscribers = Setup(TestPublisher.Empty()); ValidateSubscriptionAndComplete(subscribers); + return Task.CompletedTask; }, Materializer); } [Fact] - public void UnzipWith_must_work_with_delayed_completed_publisher() + public async Task UnzipWith_must_work_with_delayed_completed_publisher() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var subscribers = Setup(TestPublisher.LazyEmpty()); ValidateSubscriptionAndComplete(subscribers); + return Task.CompletedTask; }, Materializer); } [Fact] - public void UnzipWith_must_work_with_two_immediately_failed_publisher() + public async Task UnzipWith_must_work_with_two_immediately_failed_publisher() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var subscribers = Setup(TestPublisher.Error(TestException)); ValidateSubscriptionAndError(subscribers); + return Task.CompletedTask; }, Materializer); } [Fact] - public void UnzipWith_must_work_with_two_delayed_failed_publisher() + public async Task UnzipWith_must_work_with_two_delayed_failed_publisher() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var subscribers = Setup(TestPublisher.LazyError(TestException)); ValidateSubscriptionAndError(subscribers); + return Task.CompletedTask; }, Materializer); } [Fact] - public void UnzipWith_must_work_in_the_happy_case() + public async Task UnzipWith_must_work_in_the_happy_case() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var leftProbe = this.CreateManualSubscriberProbe(); var rightProbe = this.CreateManualSubscriberProbe(); @@ -102,19 +101,19 @@ public void UnzipWith_must_work_in_the_happy_case() leftSubscription.Request(2); rightSubscription.Request(1); - leftProbe.ExpectNext( 2, 4); + leftProbe.ExpectNext(2, 4); leftProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); rightProbe.ExpectNext("1+1"); rightProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - + leftSubscription.Request(1); rightSubscription.Request(2); leftProbe.ExpectNext(6); leftProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - rightProbe.ExpectNext( "2+2", "3+3"); + rightProbe.ExpectNext("2+2", "3+3"); rightProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); leftSubscription.Request(1); @@ -125,20 +124,20 @@ public void UnzipWith_must_work_in_the_happy_case() leftProbe.ExpectComplete(); rightProbe.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void UnzipWith_must_work_in_the_sad_case() + public async Task UnzipWith_must_work_in_the_sad_case() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var leftProbe = this.CreateManualSubscriberProbe(); var rightProbe = this.CreateManualSubscriberProbe(); RunnableGraph.FromGraph(GraphDsl.Create(b => { - var unzip = b.Add(new UnzipWith(i => (1/i, 1 + "/" + i))); + var unzip = b.Add(new UnzipWith(i => (1 / i, 1 + "/" + i))); var source = Source.From(Enumerable.Range(-2, 5)); b.From(source).To(unzip.In); @@ -158,7 +157,7 @@ public void UnzipWith_must_work_in_the_sad_case() }; requestFromBoth(); - leftProbe.ExpectNext(1/-2); + leftProbe.ExpectNext(1 / -2); rightProbe.ExpectNext("1/-2"); requestFromBoth(); @@ -172,14 +171,14 @@ public void UnzipWith_must_work_in_the_sad_case() leftProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); rightProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + return Task.CompletedTask; }, Materializer); } [Fact] - public void UnzipWith_must_propagate_last_downstream_cancellation_cause_once_all_downstream_have_cancelled() + public async Task UnzipWith_must_propagate_last_downstream_cancellation_cause_once_all_downstream_have_cancelled() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var probe = CreateTestProbe(); RunnableGraph.FromGraph(GraphDsl.Create(b => { @@ -196,7 +195,7 @@ public void UnzipWith_must_propagate_last_downstream_cancellation_cause_once_all var unzip = b.Add(new UnzipWith(i => (1 / i, $"1 / {i}"))); b.From(source).To(unzip.In); - + Flow KillSwitchFlow() => Flow.Create() .ViaMaterialized(KillSwitches.Single(), Keep.Right) @@ -223,14 +222,14 @@ Flow KillSwitchFlow() t.Exception.Should().NotBeNull(); t.Exception.InnerException.Should().Be(boom); }); + return Task.CompletedTask; }, Materializer); } [Fact] - public void UnzipWith_must_unzipWith_expanded_Person_unapply_3_outputs() + public async Task UnzipWith_must_unzipWith_expanded_Person_unapply_3_outputs() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var probe0 = this.CreateManualSubscriberProbe(); var probe1 = this.CreateManualSubscriberProbe(); var probe2 = this.CreateManualSubscriberProbe(); @@ -263,16 +262,16 @@ public void UnzipWith_must_unzipWith_expanded_Person_unapply_3_outputs() probe0.ExpectComplete(); probe1.ExpectComplete(); probe2.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void UnzipWith_must_work_with_up_to_6_outputs() + public async Task UnzipWith_must_work_with_up_to_6_outputs() { // the jvm version uses 20 outputs but we have only 7 so changed this spec a little bit - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var probe0 = this.CreateManualSubscriberProbe(); var probe1 = this.CreateManualSubscriberProbe(); var probe2 = this.CreateManualSubscriberProbe(); @@ -290,8 +289,8 @@ public void UnzipWith_must_work_with_up_to_6_outputs() (ints[0], ints[0].ToString(), ints[1], ints[1].ToString(), ints[2], ints[2].ToString()))); - var source = Source.Single(Enumerable.Range(1,3).ToList()); - + var source = Source.Single(Enumerable.Range(1, 3).ToList()); + b.From(source).To(unzip.In); b.From(unzip.Out0).To(Sink.FromSubscriber(probe0)); b.From(unzip.Out1).To(Sink.FromSubscriber(probe1)); @@ -323,6 +322,7 @@ public void UnzipWith_must_work_with_up_to_6_outputs() probe3.ExpectComplete(); probe4.ExpectComplete(); probe5.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } From 56ff340eb86bff8cf9f76c2b029fd639aec49375 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Fri, 31 Mar 2023 00:56:46 +0100 Subject: [PATCH 2/2] Changes to `async` TestKit --- .../Dsl/GraphUnzipWithSpec.cs | 110 +++++++++--------- 1 file changed, 53 insertions(+), 57 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs index 40cb189822f..8955edbbda9 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphUnzipWithSpec.cs @@ -75,7 +75,7 @@ await this.AssertAllStagesStoppedAsync(() => { [Fact] public async Task UnzipWith_must_work_in_the_happy_case() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var leftProbe = this.CreateManualSubscriberProbe(); var rightProbe = this.CreateManualSubscriberProbe(); @@ -95,43 +95,42 @@ await this.AssertAllStagesStoppedAsync(() => { return ClosedShape.Instance; })).Run(Materializer); - var leftSubscription = leftProbe.ExpectSubscription(); - var rightSubscription = rightProbe.ExpectSubscription(); + var leftSubscription = await leftProbe.ExpectSubscriptionAsync(); + var rightSubscription = await rightProbe.ExpectSubscriptionAsync(); leftSubscription.Request(2); rightSubscription.Request(1); leftProbe.ExpectNext(2, 4); - leftProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await leftProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); - rightProbe.ExpectNext("1+1"); - rightProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await rightProbe.ExpectNextAsync("1+1"); + await rightProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); leftSubscription.Request(1); rightSubscription.Request(2); leftProbe.ExpectNext(6); - leftProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await leftProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); rightProbe.ExpectNext("2+2", "3+3"); - rightProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await rightProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); leftSubscription.Request(1); rightSubscription.Request(1); - leftProbe.ExpectNext(8); - rightProbe.ExpectNext("4+4"); + await leftProbe.ExpectNextAsync(8); + await rightProbe.ExpectNextAsync("4+4"); - leftProbe.ExpectComplete(); - rightProbe.ExpectComplete(); - return Task.CompletedTask; + await leftProbe.ExpectCompleteAsync(); + await rightProbe.ExpectCompleteAsync(); }, Materializer); } [Fact] public async Task UnzipWith_must_work_in_the_sad_case() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async () => { var leftProbe = this.CreateManualSubscriberProbe(); var rightProbe = this.CreateManualSubscriberProbe(); @@ -147,8 +146,8 @@ await this.AssertAllStagesStoppedAsync(() => { return ClosedShape.Instance; })).Run(Materializer); - var leftSubscription = leftProbe.ExpectSubscription(); - var rightSubscription = rightProbe.ExpectSubscription(); + var leftSubscription = await leftProbe.ExpectSubscriptionAsync(); + var rightSubscription = await rightProbe.ExpectSubscriptionAsync(); Action requestFromBoth = () => { @@ -157,21 +156,20 @@ await this.AssertAllStagesStoppedAsync(() => { }; requestFromBoth(); - leftProbe.ExpectNext(1 / -2); - rightProbe.ExpectNext("1/-2"); + await leftProbe.ExpectNextAsync(1 / -2); + await rightProbe.ExpectNextAsync("1/-2"); requestFromBoth(); - leftProbe.ExpectNext(1 / -1); - rightProbe.ExpectNext("1/-1"); + await leftProbe.ExpectNextAsync(1 / -1); + await rightProbe.ExpectNextAsync("1/-1"); - EventFilter.Exception().ExpectOne(requestFromBoth); + await EventFilter.Exception().ExpectOneAsync(requestFromBoth); leftProbe.ExpectError().Should().BeOfType(); rightProbe.ExpectError().Should().BeOfType(); - leftProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - rightProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - return Task.CompletedTask; + await leftProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); + await rightProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); }, Materializer); } @@ -229,7 +227,7 @@ Flow KillSwitchFlow() [Fact] public async Task UnzipWith_must_unzipWith_expanded_Person_unapply_3_outputs() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var probe0 = this.CreateManualSubscriberProbe(); var probe1 = this.CreateManualSubscriberProbe(); var probe2 = this.CreateManualSubscriberProbe(); @@ -247,22 +245,21 @@ await this.AssertAllStagesStoppedAsync(() => { return ClosedShape.Instance; })).Run(Materializer); - var subscription0 = probe0.ExpectSubscription(); - var subscription1 = probe1.ExpectSubscription(); - var subscription2 = probe2.ExpectSubscription(); + var subscription0 = await probe0.ExpectSubscriptionAsync(); + var subscription1 = await probe1.ExpectSubscriptionAsync(); + var subscription2 = await probe2.ExpectSubscriptionAsync(); subscription0.Request(1); subscription1.Request(1); subscription2.Request(1); - probe0.ExpectNext("Caplin"); - probe1.ExpectNext("Capybara"); - probe2.ExpectNext(55); + await probe0.ExpectNextAsync("Caplin"); + await probe1.ExpectNextAsync("Capybara"); + await probe2.ExpectNextAsync(55); - probe0.ExpectComplete(); - probe1.ExpectComplete(); - probe2.ExpectComplete(); - return Task.CompletedTask; + await probe0.ExpectCompleteAsync(); + await probe1.ExpectCompleteAsync(); + await probe2.ExpectCompleteAsync(); }, Materializer); } @@ -271,7 +268,7 @@ public async Task UnzipWith_must_work_with_up_to_6_outputs() { // the jvm version uses 20 outputs but we have only 7 so changed this spec a little bit - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var probe0 = this.CreateManualSubscriberProbe(); var probe1 = this.CreateManualSubscriberProbe(); var probe2 = this.CreateManualSubscriberProbe(); @@ -302,27 +299,26 @@ await this.AssertAllStagesStoppedAsync(() => { return ClosedShape.Instance; })).Run(Materializer); - probe0.ExpectSubscription().Request(1); - probe1.ExpectSubscription().Request(1); - probe2.ExpectSubscription().Request(1); - probe3.ExpectSubscription().Request(1); - probe4.ExpectSubscription().Request(1); - probe5.ExpectSubscription().Request(1); - - probe0.ExpectNext(1); - probe1.ExpectNext("1"); - probe2.ExpectNext(2); - probe3.ExpectNext("2"); - probe4.ExpectNext(3); - probe5.ExpectNext("3"); - - probe0.ExpectComplete(); - probe1.ExpectComplete(); - probe2.ExpectComplete(); - probe3.ExpectComplete(); - probe4.ExpectComplete(); - probe5.ExpectComplete(); - return Task.CompletedTask; + (await probe0.ExpectSubscriptionAsync()).Request(1); + (await probe1.ExpectSubscriptionAsync()).Request(1); + (await probe2.ExpectSubscriptionAsync()).Request(1); + (await probe3.ExpectSubscriptionAsync()).Request(1); + (await probe4.ExpectSubscriptionAsync()).Request(1); + (await probe5.ExpectSubscriptionAsync()).Request(1); + + await probe0.ExpectNextAsync(1); + await probe1.ExpectNextAsync("1"); + await probe2.ExpectNextAsync(2); + await probe3.ExpectNextAsync("2"); + await probe4.ExpectNextAsync(3); + await probe5.ExpectNextAsync("3"); + + await probe0.ExpectCompleteAsync(); + await probe1.ExpectCompleteAsync(); + await probe2.ExpectCompleteAsync(); + await probe3.ExpectCompleteAsync(); + await probe4.ExpectCompleteAsync(); + await probe5.ExpectCompleteAsync(); }, Materializer); }