Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1-74] ActorRefBackpressureSinkSpec: Change AssertAllStagesStopped to AssertAllStagesStoppedAsync #6539

Merged
merged 1 commit into from
Mar 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions src/core/Akka.Streams.Tests/Dsl/ActorRefBackpressureSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Streams.Dsl;
Expand Down Expand Up @@ -77,10 +78,9 @@ public ActorRefBackpressureSinkSpec(ITestOutputHelper output) : base(output, Str
private IActorRef CreateActor<T>() => Sys.ActorOf(Props.Create(typeof(T), TestActor).WithDispatcher("akka.test.stream-dispatcher"));

[Fact]
public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef()
public async Task ActorBackpressureSink_should_send_the_elements_to_the_ActorRef()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var fw = CreateActor<Fw>();
Source.From(Enumerable.Range(1, 3))
.RunWith(Sink.ActorRefWithAck<int>(fw, InitMessage, AckMessage, CompleteMessage), Materializer);
Expand All @@ -89,14 +89,14 @@ public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef()
ExpectMsg(2);
ExpectMsg(3);
ExpectMsg(CompleteMessage);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef2()
public async Task ActorBackpressureSink_should_send_the_elements_to_the_ActorRef2()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var fw = CreateActor<Fw>();
var probe =
this.SourceProbe<int>()
Expand All @@ -111,14 +111,14 @@ public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef2()
ExpectMsg(3);
probe.SendComplete();
ExpectMsg(CompleteMessage);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ActorBackpressureSink_should_cancel_stream_when_actor_terminates()
public async Task ActorBackpressureSink_should_cancel_stream_when_actor_terminates()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var fw = CreateActor<Fw>();
var publisher =
this.SourceProbe<int>()
Expand All @@ -129,14 +129,14 @@ public void ActorBackpressureSink_should_cancel_stream_when_actor_terminates()
ExpectMsg(1);
Sys.Stop(fw);
publisher.ExpectCancellation();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ActorBackpressureSink_should_send_message_only_when_backpressure_received()
public async Task ActorBackpressureSink_should_send_message_only_when_backpressure_received()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var fw = CreateActor<Fw2>();
var publisher = this.SourceProbe<int>()
.To(Sink.ActorRefWithAck<int>(fw, InitMessage, AckMessage, CompleteMessage))
Expand All @@ -156,14 +156,14 @@ public void ActorBackpressureSink_should_send_message_only_when_backpressure_rec
ExpectMsg(3);

ExpectMsg(CompleteMessage);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ActorBackpressureSink_should_keep_on_sending_even_after_the_buffer_has_been_full()
public async Task ActorBackpressureSink_should_keep_on_sending_even_after_the_buffer_has_been_full()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var bufferSize = 16;
var streamElementCount = bufferSize + 4;
var fw = CreateActor<Fw2>();
Expand All @@ -187,14 +187,14 @@ public void ActorBackpressureSink_should_keep_on_sending_even_after_the_buffer_h
fw.Tell(TriggerAckMessage.Instance);
}
ExpectMsg(CompleteMessage);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ActorBackpressureSink_should_work_with_one_element_buffer()
public async Task ActorBackpressureSink_should_work_with_one_element_buffer()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var fw = CreateActor<Fw2>();
var publisher =
this.SourceProbe<int>()
Expand All @@ -216,6 +216,7 @@ public void ActorBackpressureSink_should_work_with_one_element_buffer()

publisher.SendComplete();
ExpectMsg(CompleteMessage);
return Task.CompletedTask;
}, Materializer);
}

Expand Down