Skip to content

Commit

Permalink
[5-74]FlowAggregateSpec: Change AssertAllStagesStopped to `Assert…
Browse files Browse the repository at this point in the history
…AllStagesStoppedAsync`
  • Loading branch information
eaba committed Mar 23, 2023
1 parent fd41c70 commit 1adfbb4
Showing 1 changed file with 22 additions and 22 deletions.
44 changes: 22 additions & 22 deletions src/core/Akka.Streams.Tests/Dsl/FlowAggregateSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public FlowAggregateSpec(ITestOutputHelper helper) : base(helper)
private static Sink<int, Task<int>> AggregateSink => Sink.Aggregate<int, int>(0, (sum, i) => sum + i);

[Fact]
public void A_Aggregate_must_work_when_using_Source_RunAggregate()
public async Task A_Aggregate_must_work_when_using_Source_RunAggregate()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var task = InputSource.RunAggregate(0, (sum, i) => sum + i, Materializer);
var complete = await task.ShouldCompleteWithin(3.Seconds());
Expand All @@ -49,9 +49,9 @@ public void A_Aggregate_must_work_when_using_Source_RunAggregate()
}

[Fact]
public void A_Aggregate_must_work_when_using_Source_Aggregate()
public async Task A_Aggregate_must_work_when_using_Source_Aggregate()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var task = AggregateSource.RunWith(Sink.First<int>(), Materializer);
var complete = await task.ShouldCompleteWithin(3.Seconds());
Expand All @@ -60,9 +60,9 @@ public void A_Aggregate_must_work_when_using_Source_Aggregate()
}

[Fact]
public void A_Aggregate_must_work_when_using_Sink_Aggregate()
public async Task A_Aggregate_must_work_when_using_Sink_Aggregate()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var task = InputSource.RunWith(AggregateSink, Materializer);
var complete = await task.ShouldCompleteWithin(3.Seconds());
Expand All @@ -71,9 +71,9 @@ public void A_Aggregate_must_work_when_using_Sink_Aggregate()
}

[Fact]
public void A_Aggregate_must_work_when_using_Flow_Aggregate()
public async Task A_Aggregate_must_work_when_using_Flow_Aggregate()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var task = InputSource.Via(AggregateFlow).RunWith(Sink.First<int>(), Materializer);
var complete = await task.ShouldCompleteWithin(3.Seconds());
Expand All @@ -82,9 +82,9 @@ public void A_Aggregate_must_work_when_using_Flow_Aggregate()
}

[Fact]
public void A_Aggregate_must_work_when_using_Source_Aggregate_and_Flow_Aggregate_and_Sink_Aggregate()
public async Task A_Aggregate_must_work_when_using_Source_Aggregate_and_Flow_Aggregate_and_Sink_Aggregate()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var task = AggregateSource.Via(AggregateFlow).RunWith(AggregateSink, Materializer);
var complete = await task.ShouldCompleteWithin(3.Seconds());
Expand All @@ -93,10 +93,9 @@ public void A_Aggregate_must_work_when_using_Source_Aggregate_and_Flow_Aggregate
}

[Fact]
public void A_Aggregate_must_propagate_an_error()
public async Task A_Aggregate_must_propagate_an_error()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var error = new TestException("buh");
var future = InputSource.Select(x =>
{
Expand All @@ -109,15 +108,15 @@ public void A_Aggregate_must_propagate_an_error()
.Should().Throw<TestException>()
.And.Should()
.Be(error);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void
public async Task
A_Aggregate_must_complete_task_with_failure_when_the_aggregateing_function_throws_and_the_supervisor_strategy_decides_to_stop()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var error = new TestException("buh");
var future = InputSource.RunAggregate(0, (x, y) =>
{
Expand All @@ -130,13 +129,14 @@ public void
.Should().Throw<TestException>()
.And.Should()
.Be(error);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_resume()
public async Task A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_resume()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var error = new Exception("boom");
var aggregate = Sink.Aggregate(0, (int x, int y) =>
Expand All @@ -155,9 +155,9 @@ public void A_Aggregate_must_resume_with_the_accumulated_state_when_the_aggregat
}

[Fact]
public void A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_restart()
public async Task A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_funtion_throws_and_the_supervisor_strategy_decides_to_restart()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var error = new Exception("boom");
var aggregate = Sink.Aggregate(0, (int x, int y) =>
Expand All @@ -176,9 +176,9 @@ public void A_Aggregate_must_resume_and_reset_the_state_when_the_aggregating_fun
}

[Fact]
public void A_Aggregate_must_complete_task_and_return_zero_given_an_empty_stream()
public async Task A_Aggregate_must_complete_task_and_return_zero_given_an_empty_stream()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var task = Source.From(Enumerable.Empty<int>())
.RunAggregate(0, (acc, element) => acc + element, Materializer);
Expand Down

0 comments on commit 1adfbb4

Please sign in to comment.