From 8d5fafca6d0cc39626ddf114ca9c13cf967f959a Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Wed, 22 Mar 2023 17:35:44 +0100 Subject: [PATCH] Change `AssertAllStagesStopped` to `AssertAllStagesStoppedAsync` - `ActorRefBackpressureSinkSpec`! --- .../Dsl/ActorRefBackpressureSinkSpec.cs | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/ActorRefBackpressureSinkSpec.cs b/src/core/Akka.Streams.Tests/Dsl/ActorRefBackpressureSinkSpec.cs index 675f3d7829b..890f327b6c4 100644 --- a/src/core/Akka.Streams.Tests/Dsl/ActorRefBackpressureSinkSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/ActorRefBackpressureSinkSpec.cs @@ -7,6 +7,7 @@ using System; using System.Linq; +using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; using Akka.Streams.Dsl; @@ -77,10 +78,9 @@ public ActorRefBackpressureSinkSpec(ITestOutputHelper output) : base(output, Str private IActorRef CreateActor() => Sys.ActorOf(Props.Create(typeof(T), TestActor).WithDispatcher("akka.test.stream-dispatcher")); [Fact] - public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef() + public async Task ActorBackpressureSink_should_send_the_elements_to_the_ActorRef() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var fw = CreateActor(); Source.From(Enumerable.Range(1, 3)) .RunWith(Sink.ActorRefWithAck(fw, InitMessage, AckMessage, CompleteMessage), Materializer); @@ -89,14 +89,14 @@ public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef() ExpectMsg(2); ExpectMsg(3); ExpectMsg(CompleteMessage); + return Task.CompletedTask; }, Materializer); } [Fact] - public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef2() + public async Task ActorBackpressureSink_should_send_the_elements_to_the_ActorRef2() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var fw = CreateActor(); var probe = this.SourceProbe() @@ -111,14 +111,14 @@ public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef2() ExpectMsg(3); probe.SendComplete(); ExpectMsg(CompleteMessage); + return Task.CompletedTask; }, Materializer); } [Fact] - public void ActorBackpressureSink_should_cancel_stream_when_actor_terminates() + public async Task ActorBackpressureSink_should_cancel_stream_when_actor_terminates() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var fw = CreateActor(); var publisher = this.SourceProbe() @@ -129,14 +129,14 @@ public void ActorBackpressureSink_should_cancel_stream_when_actor_terminates() ExpectMsg(1); Sys.Stop(fw); publisher.ExpectCancellation(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void ActorBackpressureSink_should_send_message_only_when_backpressure_received() + public async Task ActorBackpressureSink_should_send_message_only_when_backpressure_received() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var fw = CreateActor(); var publisher = this.SourceProbe() .To(Sink.ActorRefWithAck(fw, InitMessage, AckMessage, CompleteMessage)) @@ -156,14 +156,14 @@ public void ActorBackpressureSink_should_send_message_only_when_backpressure_rec ExpectMsg(3); ExpectMsg(CompleteMessage); + return Task.CompletedTask; }, Materializer); } [Fact] - public void ActorBackpressureSink_should_keep_on_sending_even_after_the_buffer_has_been_full() + public async Task ActorBackpressureSink_should_keep_on_sending_even_after_the_buffer_has_been_full() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var bufferSize = 16; var streamElementCount = bufferSize + 4; var fw = CreateActor(); @@ -187,14 +187,14 @@ public void ActorBackpressureSink_should_keep_on_sending_even_after_the_buffer_h fw.Tell(TriggerAckMessage.Instance); } ExpectMsg(CompleteMessage); + return Task.CompletedTask; }, Materializer); } [Fact] - public void ActorBackpressureSink_should_work_with_one_element_buffer() + public async Task ActorBackpressureSink_should_work_with_one_element_buffer() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var fw = CreateActor(); var publisher = this.SourceProbe() @@ -216,6 +216,7 @@ public void ActorBackpressureSink_should_work_with_one_element_buffer() publisher.SendComplete(); ExpectMsg(CompleteMessage); + return Task.CompletedTask; }, Materializer); }