diff --git a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs index 0a1da3d8c82..392b717f4db 100644 --- a/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/UnfoldResourceSourceSpec.cs @@ -16,7 +16,6 @@ using Akka.Streams.Implementation; using Akka.Streams.Supervision; using Akka.Streams.TestKit; -using Akka.Streams.Util; using Akka.TestKit; using Akka.Util; using Akka.Util.Internal; @@ -66,95 +65,96 @@ public UnfoldResourceSourceSpec(ITestOutputHelper helper) : base(Utils.Unbounded [Fact] - public void A_UnfoldResourceSource_must_read_contents_from_a_file() + public async Task A_UnfoldResourceSource_must_read_contents_from_a_file() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var p = Source.UnfoldResource(_open, Read, Close).RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); sub.Request(1); - c.ExpectNext().Should().Be(ManyLinesArray[0]); + var next = await c.ExpectNextAsync(); + next.Should().Be(ManyLinesArray[0]); sub.Request(1); - c.ExpectNext().Should().Be(ManyLinesArray[1]); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + next = await c.ExpectNextAsync(); + next.Should().Be(ManyLinesArray[1]); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(300)); for (var i = 2; i < ManyLinesArray.Length; i++) { sub.Request(1); - c.ExpectNext().Should().Be(ManyLinesArray[i]); + next = await c.ExpectNextAsync(); + next.Should().Be(ManyLinesArray[i]); } sub.Request(1); - c.ExpectComplete(); + await c.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_continue_when_strategy_is_resume_and_exception_happened() + public async Task A_UnfoldResourceSource_must_continue_when_strategy_is_resume_and_exception_happened() { - this.AssertAllStagesStopped(() => - { - var p = Source.UnfoldResource(_open, reader => - { - var s = reader.ReadLine(); - if (s != null && s.Contains("b")) - throw new TestException(""); - return s ?? Option.None; - }, Close) - .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + await this.AssertAllStagesStoppedAsync(async() => { + var p = Source.UnfoldResource(_open, reader => + { + var s = reader.ReadLine(); + if (s != null && s.Contains("b")) + throw new TestException(""); + return s ?? Option.None; + }, Close) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) .RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); - + p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); - Enumerable.Range(0,50).ForEach(i => + foreach (var i in Enumerable.Range(0, 50)) { sub.Request(1); - c.ExpectNext().Should().Be(i < 10 ? ManyLinesArray[i] : ManyLinesArray[i + 10]); - }); + var next = await c.ExpectNextAsync(); + next.Should().Be(i < 10 ? ManyLinesArray[i] : ManyLinesArray[i + 10]); + } sub.Request(1); - c.ExpectComplete(); + await c.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_close_and_open_stream_again_when_strategy_is_restart() + public async Task A_UnfoldResourceSource_must_close_and_open_stream_again_when_strategy_is_restart() { - this.AssertAllStagesStopped(() => - { - var p = Source.UnfoldResource(_open, reader => - { - var s = reader.ReadLine(); - if (s != null && s.Contains("b")) - throw new TestException(""); - return s ?? Option.None; - }, Close) - .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + await this.AssertAllStagesStoppedAsync(async() => { + var p = Source.UnfoldResource(_open, reader => + { + var s = reader.ReadLine(); + if (s != null && s.Contains("b")) + throw new TestException(""); + return s ?? Option.None; + }, Close) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) .RunWith(Sink.AsPublisher(false), Materializer); var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - var sub = c.ExpectSubscription(); - - Enumerable.Range(0, 20).ForEach(i => + var sub = await c.ExpectSubscriptionAsync(); + foreach (var i in Enumerable.Range(0, 20)) { sub.Request(1); - c.ExpectNext().Should().Be(ManyLinesArray[0]); - }); + var next = await c.ExpectNextAsync(); + next.Should().Be(ManyLinesArray[0]); + } + sub.Cancel(); }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_work_with_ByteString_as_well() + public async Task A_UnfoldResourceSource_must_work_with_ByteString_as_well() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var chunkSize = 50; var buffer = new char[chunkSize]; @@ -172,7 +172,7 @@ public void A_UnfoldResourceSource_must_work_with_ByteString_as_well() var remaining = ManyLines; Func nextChunk = () => { - if(remaining.Length <= chunkSize) + if (remaining.Length <= chunkSize) return remaining; var chunk = remaining.Take(chunkSize).Aggregate("", (s, c1) => s + c1); remaining = remaining.Substring(chunkSize); @@ -180,23 +180,24 @@ public void A_UnfoldResourceSource_must_work_with_ByteString_as_well() }; p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); - Enumerable.Range(0, 122).ForEach(i => + foreach (var i in Enumerable.Range(0, 122)) { sub.Request(1); - c.ExpectNext().ToString().Should().Be(nextChunk()); - }); + var next = await c.ExpectNextAsync(); + next.ToString().Should().Be(nextChunk()); + } + sub.Request(1); - c.ExpectComplete(); + await c.ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_default() + public async Task A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_default() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var sys = ActorSystem.Create("dispatcher-testing", Utils.UnboundedMailboxConfig); var materializer = sys.Materializer(); @@ -205,9 +206,10 @@ public void A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_ var p = Source.UnfoldResource(_open, Read, Close) .RunWith(this.SinkProbe(), materializer); - ((ActorMaterializerImpl) materializer).Supervisor.Tell(StreamSupervisor.GetChildren.Instance, + ((ActorMaterializerImpl)materializer).Supervisor.Tell(StreamSupervisor.GetChildren.Instance, TestActor); - var refs = ExpectMsg().Refs; + var msg = await ExpectMsgAsync(); + var refs = msg.Refs; var actorRef = refs.First(@ref => @ref.Path.ToString().Contains("unfoldResourceSource")); try { @@ -223,14 +225,14 @@ public void A_UnfoldResourceSource_must_use_dedicated_blocking_io_dispatcher_by_ { Shutdown(sys); } + }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_fail_when_create_throws_exception() + public async Task A_UnfoldResourceSource_must_fail_when_create_throws_exception() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var testException = new TestException(""); var p = Source.UnfoldResource(() => { @@ -239,16 +241,15 @@ public void A_UnfoldResourceSource_must_fail_when_create_throws_exception() var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - c.ExpectSubscription(); + await c.ExpectSubscriptionAsync(); c.ExpectError().Should().Be(testException); }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_fail_when_close_throws_exception() + public async Task A_UnfoldResourceSource_must_fail_when_close_throws_exception() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var testException = new TestException(""); var p = Source.UnfoldResource(_open, Read, reader => { @@ -258,16 +259,15 @@ public void A_UnfoldResourceSource_must_fail_when_close_throws_exception() var c = this.CreateManualSubscriberProbe(); p.Subscribe(c); - var sub = c.ExpectSubscription(); + var sub = await c.ExpectSubscriptionAsync(); sub.Request(61); c.ExpectNextN(60); c.ExpectError().Should().Be(testException); - }, Materializer); } [Fact] - public void A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_fails() + public async Task A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_fails() { var closedCounter = new AtomicCounter(0); var testException = new TestException("failing read"); @@ -279,12 +279,13 @@ public void A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_f ).RunWith(this.SinkProbe(), Materializer); probe.Request(1); - probe.ExpectError().Should().Be(testException); + var error = await probe.ExpectErrorAsync(); + error.Should().Be(testException); closedCounter.Current.Should().Be(1); } [Fact] - public void A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_fails_and_then_close_fails() + public async Task A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_fails_and_then_close_fails() { var closedCounter = new AtomicCounter(0); var testException = new TestException("boom"); @@ -299,10 +300,11 @@ public void A_UnfoldResourceSource_must_not_close_the_resource_twice_when_read_f } ).RunWith(this.SinkProbe(), Materializer); - EventFilter.Exception().Expect(1, () => + await EventFilter.Exception().ExpectAsync(1, async () => { probe.Request(1); - probe.ExpectError().Should().Be(testException); + var error = await probe.ExpectErrorAsync(); + error.Should().Be(testException); }); closedCounter.Current.Should().Be(1);