From 982d96d205c578e20c985a39027306f3018c3389 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 17 Aug 2023 05:27:28 +0700 Subject: [PATCH] Improve Streams SelectAsync. Log errors and improve test (#6884) * Improve Streams SelectAsync. Log errors and improve test * Fix unit test to fit the test intent better --------- Co-authored-by: Aaron Stannard --- .../Dsl/FlowSelectAsyncSpec.cs | 190 ++++++++++++------ .../Akka.Streams/Implementation/Fusing/Ops.cs | 49 ++++- 2 files changed, 168 insertions(+), 71 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs index f981473cbbd..5bd7e42d746 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs @@ -25,6 +25,8 @@ using Xunit; using Xunit.Abstractions; using FluentAssertions.Extensions; +using static FluentAssertions.FluentActions; +using Directive = Akka.Streams.Supervision.Directive; // ReSharper disable InvokeAsExtensionMethod #pragma warning disable 162 @@ -87,63 +89,96 @@ public async void A_Flow_with_SelectAsync_must_produce_task_elements_in_order() await c.ExpectCompleteAsync(); } - [LocalFact(SkipLocal = "Racy on Azure DevOps")] + // Turning this on in CI/CD for now + [Fact] public async Task A_Flow_with_SelectAsync_must_not_run_more_futures_than_requested_parallelism() { - var probe = CreateTestProbe(); - var c = this.CreateManualSubscriberProbe(); - Source.From(Enumerable.Range(1, 20)) - .SelectAsync(8, n => Task.Run(() => - { - probe.Ref.Tell(n); - return n; - })) - .RunWith(Sink.FromSubscriber(c), Materializer); - var sub = await c.ExpectSubscriptionAsync(); - await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500)); - sub.Request(1); - probe.ReceiveN(9).Should().BeEquivalentTo(Enumerable.Range(1, 9)); - await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500)); - sub.Request(2); - probe.ReceiveN(2).Should().BeEquivalentTo(Enumerable.Range(10, 2)); - await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500)); - sub.Request(10); - probe.ReceiveN(9).Should().BeEquivalentTo(Enumerable.Range(12, 9)); - await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); - - foreach (var n in Enumerable.Range(1, 13)) - await c.ExpectNextAsync(n); - //Enumerable.Range(1, 13).ForEach(n => c.ExpectNext(n)); - await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = CreateTestProbe(); + var c = this.CreateManualSubscriberProbe(); + + Source.From(Enumerable.Range(1, 20)) + .SelectAsync(8, async n => + { + await Task.Yield(); + probe.Ref.Tell(n); + return n; + }) + .RunWith(Sink.FromSubscriber(c), Materializer); + var sub = await c.ExpectSubscriptionAsync(); + await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + sub.Request(1); + (await probe.ReceiveNAsync(9).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 9)); + await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + sub.Request(2); + (await probe.ReceiveNAsync(2).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(10, 2)); + await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + sub.Request(10); + (await probe.ReceiveNAsync(9).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(12, 9)); + await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + + foreach (var n in Enumerable.Range(1, 13)) + await c.ExpectNextAsync(n); + + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); + }, Materializer).ShouldCompleteWithin(RemainingOrDefault); } - [LocalFact(SkipLocal = "Racy on Azure DevOps")] - public async Task A_Flow_with_SelectAsync_must_signal_task_failure() + // Turning this on in CI/CD for now + [Fact] + public async Task A_Flow_with_parallel_execution_SelectAsync_must_signal_task_failure() { await this.AssertAllStagesStoppedAsync(async() => { - var latch = new TestLatch(1); var c = this.CreateManualSubscriberProbe(); + Source.From(Enumerable.Range(1, 5)) - .SelectAsync(4, n => Task.Run(() => + .SelectAsync(4, async n => { - if (n == 3) + if (n == 4) throw new TestException("err1"); + await Task.Delay(10.Seconds()); - latch.Ready(TimeSpan.FromSeconds(10)); return n; - })) + }) .To(Sink.FromSubscriber(c)).Run(Materializer); + var sub = await c.ExpectSubscriptionAsync(); sub.Request(10); - c.ExpectError().InnerException.Message.Should().Be("err1"); - latch.CountDown(); - }, Materializer); + + var exception = await c.ExpectErrorAsync(); + exception.InnerException!.Message.Should().Be("err1"); + }, Materializer).ShouldCompleteWithin(RemainingOrDefault); } + [Fact] + public async Task A_Flow_with_SelectAsync_must_signal_task_failure() + { + await this.AssertAllStagesStoppedAsync(async() => { + var probe = Source.From(Enumerable.Range(1, 5)) + .SelectAsync(1, async n => + { + await Task.Delay(10); + if (n == 3) + throw new TestException("err1"); + + return n; + }) + .RunWith(this.SinkProbe(), Materializer); + + var exception = await probe.AsyncBuilder() + .Request(10) + .ExpectNextN(new[]{1, 2}) + .ExpectErrorAsync() + .ShouldCompleteWithin(RemainingOrDefault); + exception.InnerException!.Message.Should().Be("err1"); + }, Materializer); + } + [Fact] public async Task A_Flow_with_SelectAsync_must_signal_task_failure_asap() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async () => { var latch = CreateTestLatch(); var done = Source.From(Enumerable.Range(1, 5)) .Select(n => @@ -165,17 +200,19 @@ await this.AssertAllStagesStoppedAsync(() => { return Task.FromResult(n); }).RunWith(Sink.Ignore(), Materializer); - done.Invoking(d => d.Wait(RemainingOrDefault)).Should().Throw().WithMessage("err1"); + await Awaiting(async () => await done).Should() + .ThrowAsync() + .WithMessage("err1") + .ShouldCompleteWithin(RemainingOrDefault); + latch.CountDown(); - return Task.CompletedTask; }, Materializer); } [Fact] public async Task A_Flow_with_SelectAsync_must_signal_error_from_SelectAsync() { - await this.AssertAllStagesStoppedAsync(async() => { - var latch = new TestLatch(1); + await this.AssertAllStagesStoppedAsync(async () => { var c = this.CreateManualSubscriberProbe(); Source.From(Enumerable.Range(1, 5)) .SelectAsync(4, n => @@ -183,9 +220,9 @@ await this.AssertAllStagesStoppedAsync(async() => { if (n == 3) throw new TestException("err2"); - return Task.Run(() => + return Task.Run(async () => { - latch.Ready(TimeSpan.FromSeconds(10)); + await Task.Delay(10.Seconds()); return n; }); }) @@ -193,32 +230,57 @@ await this.AssertAllStagesStoppedAsync(async() => { var sub = await c.ExpectSubscriptionAsync(); sub.Request(10); c.ExpectError().Message.Should().Be("err2"); - latch.CountDown(); }, Materializer); } [Fact] - public async Task A_Flow_with_SelectAsync_must_resume_after_task_failure() + public async Task A_Flow_with_SelectAsync_must_invoke_supervision_strategy_on_task_failure() { - await this.AssertAllStagesStoppedAsync(async() => + await this.AssertAllStagesStoppedAsync(async () => { - await this.AssertAllStagesStoppedAsync(async () => { - var c = this.CreateManualSubscriberProbe(); - Source.From(Enumerable.Range(1, 5)) - .SelectAsync(4, n => Task.Run(() => - { - if (n == 3) - throw new TestException("err3"); - return n; - })) - .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) - .RunWith(Sink.FromSubscriber(c), Materializer); - var sub = await c.ExpectSubscriptionAsync(); - sub.Request(10); - foreach (var i in new[] { 1, 2, 4, 5 }) - await c.ExpectNextAsync(i); - await c.ExpectCompleteAsync(); - }, Materializer); + var invoked = false; + var probe = Source.From(Enumerable.Range(1, 5)) + .SelectAsync(1, n => Task.Run(() => + { + if (n == 3) + throw new TestException("err3"); + return n; + })) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(_ => + { + invoked = true; + return Directive.Stop; + })) + .RunWith(this.SinkProbe(), Materializer); + + await probe.AsyncBuilder() + .Request(10) + .ExpectNextN(new[] { 1, 2 }) + .ExpectErrorAsync(); + + invoked.Should().BeTrue(); + }, Materializer); + } + + [Fact] + public async Task A_Flow_with_SelectAsync_must_resume_after_task_failure() + { + await this.AssertAllStagesStoppedAsync(async () => { + var c = this.CreateManualSubscriberProbe(); + Source.From(Enumerable.Range(1, 5)) + .SelectAsync(4, n => Task.Run(() => + { + if (n == 3) + throw new TestException("err3"); + return n; + })) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(Sink.FromSubscriber(c), Materializer); + var sub = await c.ExpectSubscriptionAsync(); + sub.Request(10); + foreach (var i in new[] { 1, 2, 4, 5 }) + await c.ExpectNextAsync(i); + await c.ExpectCompleteAsync(); }, Materializer); } @@ -227,7 +289,7 @@ public async Task A_Flow_with_SelectAsync_must_resume_after_multiple_failures() { await this.AssertAllStagesStoppedAsync(() => { var futures = new[] - { + { Task.Run(() => { throw new TestException("failure1"); return "";}), Task.Run(() => { throw new TestException("failure2"); return "";}), Task.Run(() => { throw new TestException("failure3"); return "";}), diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index 280ff345fdb..59816ecf67c 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -2571,9 +2571,10 @@ public Logic(Attributes inheritedAttributes, SelectAsync stage) : bas public override void OnPush() { + var message = Grab(_stage.In); try { - var task = _stage._mapFunc(Grab(_stage.In)); + var task = _stage._mapFunc(message); var holder = new Holder(NotYetThere, _taskCallback); _buffer.Enqueue(holder); @@ -2590,8 +2591,21 @@ public override void OnPush() } catch (Exception e) { - if (_decider(e) == Directive.Stop) - FailStage(e); + var strategy = _decider(e); + Log.Error(e, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", message, strategy); + switch (strategy) + { + case Directive.Stop: + FailStage(e); + break; + + case Directive.Resume: + case Directive.Restart: + break; + + default: + throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", e); + } } if (Todo < _stage._parallelism && !HasBeenPulled(_stage.In)) TryPull(_stage.In); @@ -2644,10 +2658,31 @@ private void PushOne() private void HolderCompleted(Holder holder) { var element = holder.Element; - if (!element.IsSuccess && _decider(element.Exception) == Directive.Stop) - FailStage(element.Exception); - else if (IsAvailable(_stage.Out)) - PushOne(); + if (element.IsSuccess) + { + if (IsAvailable(_stage.Out)) + PushOne(); + return; + } + + var exception = element.Exception; + var strategy = _decider(exception); + Log.Error(exception, "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", strategy); + switch (strategy) + { + case Directive.Stop: + FailStage(exception); + break; + + case Directive.Resume: + case Directive.Restart: + if (IsAvailable(_stage.Out)) + PushOne(); + break; + + default: + throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", exception); + } } public override string ToString() => $"SelectAsync.Logic(buffer={_buffer})";