diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowWireTapSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowWireTapSpec.cs index c598ce014c3..b248d6e8009 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowWireTapSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowWireTapSpec.cs @@ -7,6 +7,7 @@ using System; using System.Linq; +using System.Threading.Tasks; using Akka.Actor; using Akka.Streams.Dsl; using Akka.Streams.TestKit; @@ -30,38 +31,33 @@ public FlowWireTapSpec(ITestOutputHelper helper) } [Fact] - public void A_wireTap_must_call_the_procedure_for_each_element() + public async Task A_wireTap_must_call_the_procedure_for_each_element() { - this.AssertAllStagesStopped(() => - { - Source.From(Enumerable.Range(1, 100)) - .WireTap(i => TestActor.Tell(i)) - .RunWith(Sink.Ignore(), Materializer).Wait(); - - Enumerable.Range(1, 100).Select(i => ExpectMsg(i)); + await this.AssertAllStagesStoppedAsync(async () => { + Source.From(Enumerable.Range(1, 100)) + .WireTap(i => TestActor.Tell(i)) + .RunWith(Sink.Ignore(), Materializer).Wait(); + foreach (var i in Enumerable.Range(1, 100)) + await ExpectMsgAsync(i); }, Materializer); } [Fact] - public void A_wireTap_must_complete_the_future_for_an_empty_stream() + public async Task A_wireTap_must_complete_the_future_for_an_empty_stream() { - this.AssertAllStagesStopped(() => - { - Source.Empty() - .WireTap(i => TestActor.Tell(i)) - .RunWith(Sink.Ignore(), Materializer) - .ContinueWith(_ => TestActor.Tell("done")); - - ExpectMsg("done"); - + await this.AssertAllStagesStoppedAsync(async() => { + await Source.Empty() + .WireTap(i => TestActor.Tell(i)) + .RunWith(Sink.Ignore(), Materializer) + .ContinueWith(_ => TestActor.Tell("done")); + await ExpectMsgAsync("done"); }, Materializer); } [Fact] - public void A_wireTap_must_yield_the_first_error() + public async Task A_wireTap_must_yield_the_first_error() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var p = this.CreateManualPublisherProbe(); Source.FromPublisher(p) @@ -69,23 +65,22 @@ public void A_wireTap_must_yield_the_first_error() .RunWith(Sink.Ignore(), Materializer) .ContinueWith(t => TestActor.Tell(t.Exception.InnerException)); - var proc = p.ExpectSubscription(); - proc.ExpectRequest(); + var proc = await p.ExpectSubscriptionAsync(); + await proc.ExpectRequestAsync(); var rte = new Exception("ex"); proc.SendError(rte); - ExpectMsg(rte); - + await ExpectMsgAsync(rte); }, Materializer); } [Fact] - public void A_wireTap_must_no_cause_subsequent_stages_to_be_failed_if_throws() + public async Task A_wireTap_must_no_cause_subsequent_stages_to_be_failed_if_throws() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var error = new TestException("Boom!"); var future = Source.Single(1).WireTap(_ => throw error).RunWith(Sink.Ignore(), Materializer); Invoking(() => future.Wait()).Should().NotThrow(); + return Task.CompletedTask; }, Materializer); } }