From abd38fe7beab69ee88a60a04939462b64ac3c8c8 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Sun, 26 Mar 2023 12:52:37 +0100 Subject: [PATCH 1/3] [67-74] `UnfoldResourceSourceSpec` --- .../Dsl/UnfoldResourceSourceSpec.cs | 84 +++++++++---------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs index 0a1da3d8c82..f41d7c8da1d 100644 --- a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs @@ -66,10 +66,9 @@ public UnfoldResourceSourceSpec(ITestOutputHelper helper) : base(Utils.Unbounded [Fact] - public void A_UnfoldResourceSource_must_read_contents_from_a_file() + public async Task A_UnfoldResourceSource_must_read_contents_from_a_file() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var p = Source.UnfoldResource(_open, Read, Close).RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); @@ -90,51 +89,51 @@ public void A_UnfoldResourceSource_must_read_contents_from_a_file() sub.Request(1); c.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_continue_when_strategy_is_resume_and_exception_happened() + public async Task A_UnfoldResourceSource_must_continue_when_strategy_is_resume_and_exception_happened() { - this.AssertAllStagesStopped(() => - { - var p = Source.UnfoldResource(_open, reader => - { - var s = reader.ReadLine(); - if (s != null && s.Contains("b")) - throw new TestException(""); - return s ?? Option.None; - }, Close) - .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + await this.AssertAllStagesStoppedAsync(() => { + var p = Source.UnfoldResource(_open, reader => + { + var s = reader.ReadLine(); + if (s != null && s.Contains("b")) + throw new TestException(""); + return s ?? Option.None; + }, Close) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) .RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); - + p.Subscribe(c); var sub = c.ExpectSubscription(); - Enumerable.Range(0,50).ForEach(i => + Enumerable.Range(0, 50).ForEach(i => { sub.Request(1); c.ExpectNext().Should().Be(i < 10 ? ManyLinesArray[i] : ManyLinesArray[i + 10]); }); sub.Request(1); c.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_close_and_open_stream_again_when_strategy_is_restart() + public async Task A_UnfoldResourceSource_must_close_and_open_stream_again_when_strategy_is_restart() { - this.AssertAllStagesStopped(() => - { - var p = Source.UnfoldResource(_open, reader => - { - var s = reader.ReadLine(); - if (s != null && s.Contains("b")) - throw new TestException(""); - return s ?? Option.None; - }, Close) - .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + await this.AssertAllStagesStoppedAsync(() => { + var p = Source.UnfoldResource(_open, reader => + { + var s = reader.ReadLine(); + if (s != null && s.Contains("b")) + throw new TestException(""); + return s ?? Option.None; + }, Close) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) .RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); @@ -147,14 +146,14 @@ public void A_UnfoldResourceSource_must_close_and_open_stream_again_when_strateg c.ExpectNext().Should().Be(ManyLinesArray[0]); }); sub.Cancel(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_work_with_ByteString_as_well() + public async Task A_UnfoldResourceSource_must_work_with_ByteString_as_well() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var chunkSize = 50; var buffer = new char[chunkSize]; @@ -172,7 +171,7 @@ public void A_UnfoldResourceSource_must_work_with_ByteString_as_well() var remaining = ManyLines; Func nextChunk = () => { - if(remaining.Length <= chunkSize) + if (remaining.Length <= chunkSize) return remaining; var chunk = remaining.Take(chunkSize).Aggregate("", (s, c1) => s + c1); remaining = remaining.Substring(chunkSize); @@ -189,14 +188,14 @@ public void A_UnfoldResourceSource_must_work_with_ByteString_as_well() }); sub.Request(1); c.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_default() + public async Task A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_default() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var sys = ActorSystem.Create("dispatcher-testing", Utils.UnboundedMailboxConfig); var materializer = sys.Materializer(); @@ -205,7 +204,7 @@ public void A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_ var p = Source.UnfoldResource(_open, Read, Close) .RunWith(this.SinkProbe(), materializer); - ((ActorMaterializerImpl) materializer).Supervisor.Tell(StreamSupervisor.GetChildren.Instance, + ((ActorMaterializerImpl)materializer).Supervisor.Tell(StreamSupervisor.GetChildren.Instance, TestActor); var refs = ExpectMsg().Refs; var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("unfoldResourceSource")); @@ -223,14 +222,15 @@ public void A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_ { Shutdown(sys); } + + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_fail_when_create_throws_exception() + public async Task A_UnfoldResourceSource_must_fail_when_create_throws_exception() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var testException = new TestException(""); var p = Source.UnfoldResource(() => { @@ -241,14 +241,14 @@ public void A_UnfoldResourceSource_must_fail_when_create_throws_exception() c.ExpectSubscription(); c.ExpectError().Should().Be(testException); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_fail_when_close_throws_exception() + public async Task A_UnfoldResourceSource_must_fail_when_close_throws_exception() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var testException = new TestException(""); var p = Source.UnfoldResource(_open, Read, reader => { @@ -262,7 +262,7 @@ public void A_UnfoldResourceSource_must_fail_when_close_throws_exception() sub.Request(61); c.ExpectNextN(60); c.ExpectError().Should().Be(testException); - + return Task.CompletedTask; }, Materializer); } From db08c7706a0aba7bc0b34adb58979869320d0a91 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Tue, 28 Mar 2023 16:53:17 +0100 Subject: [PATCH 2/3] Changes --- .../Dsl/UnfoldResourceSourceSpec.cs | 89 ++++++++++--------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs index f41d7c8da1d..1d7930c9853 100644 --- a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs @@ -16,7 +16,6 @@ using Akka.Streams.Implementation; using Akka.Streams.Supervision; using Akka.Streams.TestKit; -using Akka.Streams.Util; using Akka.TestKit; using Akka.Util; using Akka.Util.Internal; @@ -68,35 +67,37 @@ public UnfoldResourceSourceSpec(ITestOutputHelper helper) : base(Utils.Unbounded [Fact] public async Task A_UnfoldResourceSource_must_read_contents_from_a_file() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var p = Source.UnfoldResource(_open, Read, Close).RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); sub.Request(1); - c.ExpectNext().Should().Be(ManyLinesArray[0]); + var next = await c.ExpectNextAsync(); + next.Should().Be(ManyLinesArray[0]); sub.Request(1); - c.ExpectNext().Should().Be(ManyLinesArray[1]); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + next = await c.ExpectNextAsync(); + next.Should().Be(ManyLinesArray[1]); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(300)); for (var i = 2; i < ManyLinesArray.Length; i++) { sub.Request(1); - c.ExpectNext().Should().Be(ManyLinesArray[i]); + next = await c.ExpectNextAsync(); + next.Should().Be(ManyLinesArray[i]); } sub.Request(1); - c.ExpectComplete(); - return Task.CompletedTask; + await c.ExpectCompleteAsync(); }, Materializer); } [Fact] public async Task A_UnfoldResourceSource_must_continue_when_strategy_is_resume_and_exception_happened() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var p = Source.UnfoldResource(_open, reader => { var s = reader.ReadLine(); @@ -109,23 +110,23 @@ await this.AssertAllStagesStoppedAsync(() => { var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); - Enumerable.Range(0, 50).ForEach(i => + Enumerable.Range(0, 50).ForEach(async i => { sub.Request(1); - c.ExpectNext().Should().Be(i < 10 ? ManyLinesArray[i] : ManyLinesArray[i + 10]); + var next = await c.ExpectNextAsync(); + next.Should().Be(i < 10 ? ManyLinesArray[i] : ManyLinesArray[i + 10]); }); sub.Request(1); - c.ExpectComplete(); - return Task.CompletedTask; + await c.ExpectCompleteAsync(); }, Materializer); } [Fact] public async Task A_UnfoldResourceSource_must_close_and_open_stream_again_when_strategy_is_restart() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var p = Source.UnfoldResource(_open, reader => { var s = reader.ReadLine(); @@ -138,22 +139,22 @@ await this.AssertAllStagesStoppedAsync(() => { var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); - Enumerable.Range(0, 20).ForEach(i => + Enumerable.Range(0, 20).ForEach(async i => { sub.Request(1); - c.ExpectNext().Should().Be(ManyLinesArray[0]); + var next = await c.ExpectNextAsync(); + next.Should().Be(ManyLinesArray[0]); }); sub.Cancel(); - return Task.CompletedTask; }, Materializer); } [Fact] public async Task A_UnfoldResourceSource_must_work_with_ByteString_as_well() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var chunkSize = 50; var buffer = new char[chunkSize]; @@ -179,23 +180,23 @@ await this.AssertAllStagesStoppedAsync(() => { }; p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); - Enumerable.Range(0, 122).ForEach(i => + Enumerable.Range(0, 122).ForEach(async i => { sub.Request(1); - c.ExpectNext().ToString().Should().Be(nextChunk()); + var next = await c.ExpectNextAsync(); + next.ToString().Should().Be(nextChunk()); }); sub.Request(1); - c.ExpectComplete(); - return Task.CompletedTask; + await c.ExpectCompleteAsync(); }, Materializer); } [Fact] public async Task A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_default() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var sys = ActorSystem.Create("dispatcher-testing", Utils.UnboundedMailboxConfig); var materializer = sys.Materializer(); @@ -206,7 +207,8 @@ await this.AssertAllStagesStoppedAsync(() => { ((ActorMaterializerImpl)materializer).Supervisor.Tell(StreamSupervisor.GetChildren.Instance, TestActor); - var refs = ExpectMsg().Refs; + var msg = await ExpectMsgAsync(); + var refs = msg.Refs; var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("unfoldResourceSource")); try { @@ -223,14 +225,13 @@ await this.AssertAllStagesStoppedAsync(() => { Shutdown(sys); } - return Task.CompletedTask; }, Materializer); } [Fact] public async Task A_UnfoldResourceSource_must_fail_when_create_throws_exception() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var testException = new TestException(""); var p = Source.UnfoldResource(() => { @@ -239,16 +240,16 @@ await this.AssertAllStagesStoppedAsync(() => { var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - c.ExpectSubscription(); - c.ExpectError().Should().Be(testException); - return Task.CompletedTask; + await c.ExpectSubscriptionAsync(); + var error = await c.ExpectErrorAsync(); + error.Should().Be(testException); }, Materializer); } [Fact] public async Task A_UnfoldResourceSource_must_fail_when_close_throws_exception() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var testException = new TestException(""); var p = Source.UnfoldResource(_open, Read, reader => { @@ -258,16 +259,16 @@ await this.AssertAllStagesStoppedAsync(() => { var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); sub.Request(61); - c.ExpectNextN(60); - c.ExpectError().Should().Be(testException); - return Task.CompletedTask; + c.ExpectNextNAsync(60); + var error = await c.ExpectErrorAsync(); + error.Should().Be(testException); }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_fails() + public async Task A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_fails() { var closedCounter = new AtomicCounter(0); var testException = new TestException("failing read"); @@ -279,12 +280,13 @@ public void A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_f ).RunWith(this.SinkProbe(), Materializer); probe.Request(1); - probe.ExpectError().Should().Be(testException); + var error = await probe.ExpectErrorAsync(); + error.Should().Be(testException); closedCounter.Current.Should().Be(1); } [Fact] - public void A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_fails_and_then_close_fails() + public async Task A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_fails_and_then_close_fails() { var closedCounter = new AtomicCounter(0); var testException = new TestException("boom"); @@ -299,10 +301,11 @@ public void A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_f } ).RunWith(this.SinkProbe(), Materializer); - EventFilter.Exception().Expect(1, () => + await EventFilter.Exception().ExpectAsync(1, async () => { probe.Request(1); - probe.ExpectError().Should().Be(testException); + var error = await probe.ExpectErrorAsync(); + error.Should().Be(testException); }); closedCounter.Current.Should().Be(1); From 7aed3471f25727210a03a9ebf1908c9c545e4ed1 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Tue, 28 Mar 2023 17:33:39 +0100 Subject: [PATCH 3/3] fix --- .../Dsl/UnfoldResourceSourceSpec.cs | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs index 1d7930c9853..392b717f4db 100644 --- a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs @@ -112,12 +112,12 @@ await this.AssertAllStagesStoppedAsync(async() => { p.Subscribe(c); var sub = await c.ExpectSubscriptionAsync(); - Enumerable.Range(0, 50).ForEach(async i => + foreach (var i in Enumerable.Range(0, 50)) { sub.Request(1); var next = await c.ExpectNextAsync(); next.Should().Be(i < 10 ? ManyLinesArray[i] : ManyLinesArray[i + 10]); - }); + } sub.Request(1); await c.ExpectCompleteAsync(); }, Materializer); @@ -140,13 +140,13 @@ await this.AssertAllStagesStoppedAsync(async() => { p.Subscribe(c); var sub = await c.ExpectSubscriptionAsync(); - - Enumerable.Range(0, 20).ForEach(async i => + foreach (var i in Enumerable.Range(0, 20)) { sub.Request(1); var next = await c.ExpectNextAsync(); next.Should().Be(ManyLinesArray[0]); - }); + } + sub.Cancel(); }, Materializer); } @@ -182,12 +182,13 @@ await this.AssertAllStagesStoppedAsync(async() => { p.Subscribe(c); var sub = await c.ExpectSubscriptionAsync(); - Enumerable.Range(0, 122).ForEach(async i => + foreach (var i in Enumerable.Range(0, 122)) { sub.Request(1); var next = await c.ExpectNextAsync(); next.ToString().Should().Be(nextChunk()); - }); + } + sub.Request(1); await c.ExpectCompleteAsync(); }, Materializer); @@ -241,8 +242,7 @@ await this.AssertAllStagesStoppedAsync(async() => { p.Subscribe(c); await c.ExpectSubscriptionAsync(); - var error = await c.ExpectErrorAsync(); - error.Should().Be(testException); + c.ExpectError().Should().Be(testException); }, Materializer); } @@ -261,9 +261,8 @@ await this.AssertAllStagesStoppedAsync(async() => { var sub = await c.ExpectSubscriptionAsync(); sub.Request(61); - c.ExpectNextNAsync(60); - var error = await c.ExpectErrorAsync(); - error.Should().Be(testException); + c.ExpectNextN(60); + c.ExpectError().Should().Be(testException); }, Materializer); }