diff --git a/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs b/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs index 95c0d63d053..9e890ef8698 100644 --- a/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/QueueSourceSpec.cs @@ -6,7 +6,6 @@ //----------------------------------------------------------------------- using System; -using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Pattern; @@ -40,18 +39,18 @@ private static void AssertSuccess(Task task) } [Fact] - public void QueueSource_should_emit_received_message_to_the_stream() + public async Task QueueSource_should_emit_received_message_to_the_stream() { var s = this.CreateManualSubscriberProbe(); var queue = Source.Queue(10, OverflowStrategy.Fail).To(Sink.FromSubscriber(s)).Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); sub.Request(2); AssertSuccess(queue.OfferAsync(1)); - s.ExpectNext(1); + await s.ExpectNextAsync(1); AssertSuccess(queue.OfferAsync(2)); - s.ExpectNext(2); + await s.ExpectNextAsync(2); AssertSuccess(queue.OfferAsync(3)); sub.Cancel(); } @@ -92,14 +91,14 @@ public void QueueSource_should_reject_elements_when_backpressuring_with_maxBuffe } [Fact] - public void QueueSource_should_buffer_when_needed() + public async Task QueueSource_should_buffer_when_needed() { var s = this.CreateManualSubscriberProbe(); var queue = Source.Queue(100, OverflowStrategy.DropHead) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); for (var i = 1; i <= 20; i++) AssertSuccess(queue.OfferAsync(i)); sub.Request(10); @@ -114,117 +113,110 @@ public void QueueSource_should_buffer_when_needed() } [Fact] - public void QueueSource_should_not_fail_when_0_buffer_space_and_demand_is_signalled() + public async Task QueueSource_should_not_fail_when_0_buffer_space_and_demand_is_signalled() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var s = this.CreateManualSubscriberProbe(); var queue = Source.Queue(0, OverflowStrategy.DropHead) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); sub.Request(1); AssertSuccess(queue.OfferAsync(1)); sub.Cancel(); - }, _materializer); } [Fact] - public void QueueSource_should_wait_for_demand_when_buffer_is_0() + public async Task QueueSource_should_wait_for_demand_when_buffer_is_0() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var s = this.CreateManualSubscriberProbe(); var queue = Source.Queue(0, OverflowStrategy.DropHead) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); queue.OfferAsync(1).PipeTo(TestActor); - ExpectNoMsg(_pause); + await ExpectNoMsgAsync(_pause); sub.Request(1); - ExpectMsg(); - s.ExpectNext(1); + await ExpectMsgAsync(); + await s.ExpectNextAsync(1); sub.Cancel(); }, _materializer); } [Fact] - public void QueueSource_should_finish_offer_and_complete_futures_when_stream_completed() + public async Task QueueSource_should_finish_offer_and_complete_futures_when_stream_completed() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var s = this.CreateManualSubscriberProbe(); var queue = Source.Queue(0, OverflowStrategy.DropHead) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); queue.WatchCompletionAsync() .ContinueWith(t => "done", TaskContinuationOptions.OnlyOnRanToCompletion) .PipeTo(TestActor); queue.OfferAsync(1).PipeTo(TestActor); - ExpectNoMsg(_pause); + await ExpectNoMsgAsync(_pause); sub.Cancel(); - ExpectMsgAllOf(new object[]{ QueueClosed.Instance, "done" }); + ExpectMsgAllOf(new object[] { QueueClosed.Instance, "done" }); }, _materializer); } [Fact] - public void QueueSource_should_fail_stream_on_buffer_overflow_in_fail_mode() + public async Task QueueSource_should_fail_stream_on_buffer_overflow_in_fail_mode() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var s = this.CreateManualSubscriberProbe(); var queue = Source.Queue(1, OverflowStrategy.Fail) .To(Sink.FromSubscriber(s)) .Run(_materializer); - s.ExpectSubscription(); + await s.ExpectSubscriptionAsync(); - queue.OfferAsync(1); - queue.OfferAsync(1); + await queue.OfferAsync(1); + await queue.OfferAsync(1); s.ExpectError(); }, _materializer); } [Fact] - public void QueueSource_should_remember_pull_from_downstream_to_send_offered_element_immediately() + public async Task QueueSource_should_remember_pull_from_downstream_to_send_offered_element_immediately() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var s = this.CreateManualSubscriberProbe(); var probe = CreateTestProbe(); var queue = TestSourceStage>.Create( new QueueSource(1, OverflowStrategy.DropHead), probe) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); sub.Request(1); - probe.ExpectMsg(); + await probe.ExpectMsgAsync(); AssertSuccess(queue.OfferAsync(1)); - s.ExpectNext(1); + await s.ExpectNextAsync(1); sub.Cancel(); }, _materializer); } [Fact] - public void QueueSource_should_fail_offer_future_if_user_does_not_wait_in_backpressure_mode() + public async Task QueueSource_should_fail_offer_future_if_user_does_not_wait_in_backpressure_mode() { - this.AssertAllStagesStopped(() => - { - var tuple = - Source.Queue(5, OverflowStrategy.Backpressure) - .ToMaterialized(this.SinkProbe(), Keep.Both) - .Run(_materializer); + await this.AssertAllStagesStoppedAsync(async() => { + var tuple = + Source.Queue(5, OverflowStrategy.Backpressure) + .ToMaterialized(this.SinkProbe(), Keep.Both) + .Run(_materializer); var queue = tuple.Item1; var probe = tuple.Item2; @@ -233,39 +225,38 @@ public void QueueSource_should_fail_offer_future_if_user_does_not_wait_in_backpr queue.OfferAsync(6).PipeTo(TestActor); queue.OfferAsync(7).PipeTo(TestActor); - ExpectMsg().Cause.Should().BeOfType(); - probe.RequestNext(1); - ExpectMsg(Enqueued.Instance); + var expect = await ExpectMsgAsync(); + expect.Cause.Should().BeOfType(); + await probe.RequestNextAsync(1); + await ExpectMsgAsync(Enqueued.Instance); queue.Complete(); - probe.Request(6) - .ExpectNext( 2, 3, 4, 5, 6) - .ExpectComplete(); + await probe.Request(6) + .ExpectNext(2, 3, 4, 5, 6) + .ExpectCompleteAsync(); }, _materializer); } [Fact] - public void QueueSource_should_complete_watching_future_with_failure_if_stream_failed() + public async Task QueueSource_should_complete_watching_future_with_failure_if_stream_failed() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var s = this.CreateManualSubscriberProbe(); var queue = Source.Queue(1, OverflowStrategy.Fail) .To(Sink.FromSubscriber(s)) .Run(_materializer); queue.WatchCompletionAsync().PipeTo(TestActor); - queue.OfferAsync(1); // need to wait when first offer is done as initialization can be done in this moment - queue.OfferAsync(2); - ExpectMsg(); + await queue.OfferAsync(1); // need to wait when first offer is done as initialization can be done in this moment + await queue.OfferAsync(2); + await ExpectMsgAsync(); }, _materializer); } [Fact] - public void QueueSource_should_complete_watching_future_with_failure_if_materializer_shut_down() + public async Task QueueSource_should_complete_watching_future_with_failure_if_materializer_shut_down() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var tempMap = ActorMaterializer.Create(Sys, ActorMaterializerSettings.Create(Sys)); // need to create a new materializer to be able to shutdown it var s = this.CreateManualSubscriberProbe(); var queue = Source.Queue(1, OverflowStrategy.Fail) @@ -273,74 +264,71 @@ public void QueueSource_should_complete_watching_future_with_failure_if_material .Run(tempMap); queue.WatchCompletionAsync().PipeTo(TestActor); tempMap.Shutdown(); - ExpectMsg(); + await ExpectMsgAsync(); }, _materializer); } [Fact] - public void QueueSource_should_return_false_when_element_was_not_added_to_buffer() + public async Task QueueSource_should_return_false_when_element_was_not_added_to_buffer() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var s = this.CreateManualSubscriberProbe(); var queue = Source.Queue(1, OverflowStrategy.DropNew) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); - queue.OfferAsync(1); + await queue.OfferAsync(1); queue.OfferAsync(2).PipeTo(TestActor); - ExpectMsg(); + await ExpectMsgAsync(); sub.Request(1); - s.ExpectNext(1); + await s.ExpectNextAsync(1); sub.Cancel(); }, _materializer); } [Fact] - public void QueueSource_should_wait_when_buffer_is_full_and_backpressure_is_on() + public async Task QueueSource_should_wait_when_buffer_is_full_and_backpressure_is_on() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async () => { var s = this.CreateManualSubscriberProbe(); var queue = Source.Queue(1, OverflowStrategy.Backpressure) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); AssertSuccess(queue.OfferAsync(1)); queue.OfferAsync(2).PipeTo(TestActor); - ExpectNoMsg(_pause); + await ExpectNoMsgAsync(_pause); sub.Request(1); - s.ExpectNext(1); + await s.ExpectNextAsync(1); sub.Request(1); - s.ExpectNext(2); - ExpectMsg(); + await s.ExpectNextAsync(2); + await ExpectMsgAsync(); sub.Cancel(); }, _materializer); } [Fact] - public void QueueSource_should_fail_offer_future_when_stream_is_completed() + public async Task QueueSource_should_fail_offer_future_when_stream_is_completed() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var s = this.CreateManualSubscriberProbe(); var queue = Source.Queue(1, OverflowStrategy.DropNew) .To(Sink.FromSubscriber(s)) .Run(_materializer); - var sub = s.ExpectSubscription(); + var sub = await s.ExpectSubscriptionAsync(); queue.WatchCompletionAsync().ContinueWith(t => Done.Instance).PipeTo(TestActor); sub.Cancel(); - ExpectMsg(Done.Instance); + await ExpectMsgAsync(Done.Instance); var exception = Record.ExceptionAsync(async () => await queue.OfferAsync(1)).Result; exception.Should().BeOfType(); @@ -348,7 +336,7 @@ public void QueueSource_should_fail_offer_future_when_stream_is_completed() } [Fact] - public void QueueSource_should_not_share_future_across_materializations() + public async Task QueueSource_should_not_share_future_across_materializations() { var source = Source.Queue(1, OverflowStrategy.Fail); @@ -357,21 +345,21 @@ public void QueueSource_should_not_share_future_across_materializations() var sourceQueue1 = source.To(Sink.FromSubscriber(mat1Subscriber)).Run(_materializer); var sourceQueue2 = source.To(Sink.FromSubscriber(mat2Subscriber)).Run(_materializer); - mat1Subscriber.EnsureSubscription(); - mat2Subscriber.EnsureSubscription(); + await mat1Subscriber.EnsureSubscriptionAsync(); + await mat2Subscriber.EnsureSubscriptionAsync(); mat1Subscriber.Request(1); - sourceQueue1.OfferAsync("hello"); - mat1Subscriber.ExpectNext("hello"); + await sourceQueue1.OfferAsync("hello"); + await mat1Subscriber.ExpectNextAsync("hello"); mat1Subscriber.Cancel(); - sourceQueue1.WatchCompletionAsync().ContinueWith(task => task.IsCompleted).PipeTo(TestActor); - ExpectMsg(true); + await sourceQueue1.WatchCompletionAsync().ContinueWith(task => task.IsCompleted).PipeTo(TestActor); + await ExpectMsgAsync(true); sourceQueue2.WatchCompletionAsync().IsCompleted.Should().BeFalse(); } [Fact] - public void QueueSource_should_complete_the_stream_when_buffer_is_empty() + public async Task QueueSource_should_complete_the_stream_when_buffer_is_empty() { var tuple = Source.Queue(1, OverflowStrategy.Fail) @@ -384,11 +372,11 @@ public void QueueSource_should_complete_the_stream_when_buffer_is_empty() var task = source.WatchCompletionAsync(); task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - probe.EnsureSubscription().ExpectComplete(); + await probe.EnsureSubscription().ExpectCompleteAsync(); } [Fact] - public void QueueSource_should_complete_the_stream_when_buffer_is_full() + public async Task QueueSource_should_complete_the_stream_when_buffer_is_full() { var tuple = Source.Queue(1, OverflowStrategy.Fail) @@ -397,15 +385,15 @@ public void QueueSource_should_complete_the_stream_when_buffer_is_full() var source = tuple.Item1; var probe = tuple.Item2; - source.OfferAsync(1); + await source.OfferAsync(1); source.Complete(); - probe.RequestNext(1).ExpectComplete(); + await probe.RequestNext(1).ExpectCompleteAsync(); var task = source.WatchCompletionAsync(); task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); } [Fact] - public void QueueSource_should_complete_the_stream_when_buffer_is_full_and_element_is_pending() + public async Task QueueSource_should_complete_the_stream_when_buffer_is_full_and_element_is_pending() { var tuple = Source.Queue(1, OverflowStrategy.Backpressure) @@ -417,16 +405,16 @@ public void QueueSource_should_complete_the_stream_when_buffer_is_full_and_eleme source.OfferAsync(1); source.OfferAsync(2); source.Complete(); - probe.RequestNext(1) + await probe.RequestNext(1) .RequestNext(2) - .ExpectComplete(); + .ExpectCompleteAsync(); var task = source.WatchCompletionAsync(); task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); } [Fact] - public void QueueSource_should_complete_the_stream_when_no_buffer_is_used() + public async Task QueueSource_should_complete_the_stream_when_no_buffer_is_used() { var tuple = Source.Queue(0, OverflowStrategy.Fail) @@ -439,11 +427,11 @@ public void QueueSource_should_complete_the_stream_when_no_buffer_is_used() var task = source.WatchCompletionAsync(); task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - probe.EnsureSubscription().ExpectComplete(); + await probe.EnsureSubscription().ExpectCompleteAsync(); } [Fact] - public void QueueSource_should_complete_the_stream_when_no_buffer_is_used_and_element_is_pending() + public async Task QueueSource_should_complete_the_stream_when_no_buffer_is_used_and_element_is_pending() { var tuple = Source.Queue(0, OverflowStrategy.Fail) @@ -454,7 +442,7 @@ public void QueueSource_should_complete_the_stream_when_no_buffer_is_used_and_el source.OfferAsync(1); source.Complete(); - probe.RequestNext(1).ExpectComplete(); + await probe.RequestNext(1).ExpectCompleteAsync(); var task = source.WatchCompletionAsync(); task.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); } @@ -478,7 +466,7 @@ public void QueueSource_should_fail_the_stream_when_buffer_is_empty() } [Fact] - public void QueueSource_should_fail_the_stream_when_buffer_is_full() + public async Task QueueSource_should_fail_the_stream_when_buffer_is_full() { var tuple = Source.Queue(1, OverflowStrategy.Fail) @@ -487,7 +475,7 @@ public void QueueSource_should_fail_the_stream_when_buffer_is_full() var source = tuple.Item1; var probe = tuple.Item2; - source.OfferAsync(1); + await source.OfferAsync(1); source.Fail(Ex); var task = source.WatchCompletionAsync(); task.Invoking(_ => _.Wait(TimeSpan.FromSeconds(3))).Should().Throw().And.Should().Be(Ex);