diff --git a/src/core/Akka.Tests/Delivery/ReliableDeliveryRandomSpecs.cs b/src/core/Akka.Tests/Delivery/ReliableDeliveryRandomSpecs.cs index 5d8ddcc74d6..e2e56e4e530 100644 --- a/src/core/Akka.Tests/Delivery/ReliableDeliveryRandomSpecs.cs +++ b/src/core/Akka.Tests/Delivery/ReliableDeliveryRandomSpecs.cs @@ -20,11 +20,13 @@ namespace Akka.Tests.Delivery; public class ReliableDeliveryRandomSpecs : TestKit.Xunit2.TestKit { - internal static readonly Config Config = @"akka.reliable-delivery.consumer-controller{ + private static readonly Config Config = @"akka.reliable-delivery.consumer-controller{ flow-control-window = 20 resend-interval-min = 500ms resend-interval-max = 2s - }"; + } + akka.loglevel = DEBUG + "; public ReliableDeliveryRandomSpecs(ITestOutputHelper output) : this(output, Config) { @@ -57,16 +59,6 @@ private async Task Test(int numberOfMessages, double producerDropProbability, do consumerDropProbability, producerDropProbability, consumerDelay, producerDelay, durableFailProbability, durableDelay); - // RandomFlakyNetwork to simulate lost messages from producerController to consumerController - double ConsumerDrop(object msg) - { - return msg switch - { - ConsumerController.SequencedMessage _ => consumerDropProbability, - _ => 0 - }; - } - var consumerEndProbe = CreateTestProbe(); var consumerController = Sys.ActorOf(ConsumerController.CreateWithFuzzing(Sys, Option.None, ConsumerDrop, consumerControllerSettings), $"consumer-controller-{_idCount}"); @@ -75,18 +67,6 @@ double ConsumerDrop(object msg) TestConsumer.PropsFor(consumerDelay, numberOfMessages, consumerEndProbe.Ref, consumerController), $"consumer-{_idCount}"); - // RandomFlakyNetwork to simulate lost messages from consumerController to producerController - double ProducerDrop(object msg) - { - return msg switch - { - ProducerController.Request _ => producerDropProbability, - ProducerController.Resend _ => producerDropProbability, - ProducerController.RegisterConsumer _ => producerDropProbability, - _ => 0 - }; - } - var stateHolder = new AtomicReference>(DurableProducerQueueStateHolder.Empty); var durableQueue = durableFailProbability.Select(p => { @@ -105,31 +85,54 @@ double ProducerDrop(object msg) new ConsumerController.RegisterToProducerController(producerController)); await consumerEndProbe.ExpectMsgAsync(TimeSpan.FromSeconds(120)); + return; + + // RandomFlakyNetwork to simulate lost messages from producerController to consumerController + double ConsumerDrop(object msg) + { + return msg switch + { + ConsumerController.SequencedMessage _ => consumerDropProbability, + _ => 0 + }; + } + + // RandomFlakyNetwork to simulate lost messages from consumerController to producerController + double ProducerDrop(object msg) + { + return msg switch + { + ProducerController.Request _ => producerDropProbability, + ProducerController.Resend _ => producerDropProbability, + ProducerController.RegisterConsumer _ => producerDropProbability, + _ => 0 + }; + } } [Fact] - public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_network() + public Task ReliableDelivery_with_random_failures_must_work_with_flaky_network() { NextId(); var consumerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.2; var producerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.2; - await Test(numberOfMessages: 63, producerDropProbability: producerDropProbability, + return Test(numberOfMessages: 63, producerDropProbability: producerDropProbability, consumerDropProbability: consumerDropProbability, Option.None, true); } [Fact] - public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_DurableProducerQueue() + public Task ReliableDelivery_with_random_failures_must_work_with_flaky_DurableProducerQueue() { NextId(); var durableFailProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.1; - await Test(numberOfMessages: 31, producerDropProbability: 0.0, + return Test(numberOfMessages: 31, producerDropProbability: 0.0, consumerDropProbability:0.0, durableFailProbability, true); } [Fact] - public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_network_and_flaky_DurableProducerQueue() + public Task ReliableDelivery_with_random_failures_must_work_with_flaky_network_and_flaky_DurableProducerQueue() { NextId(); var consumerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.1; @@ -137,18 +140,18 @@ public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_net var durableFailProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.1; - await Test(numberOfMessages: 17, producerDropProbability: producerDropProbability, + return Test(numberOfMessages: 17, producerDropProbability: producerDropProbability, consumerDropProbability: consumerDropProbability, durableFailProbability, true); } [Fact] - public async Task ReliableDelivery_with_random_failures_must_work_with_flaky_network_without_resending() + public Task ReliableDelivery_with_random_failures_must_work_with_flaky_network_without_resending() { NextId(); var consumerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.4; var producerDropProbability = 0.1 + ThreadLocalRandom.Current.NextDouble() * 0.3; - await Test(numberOfMessages: 63, producerDropProbability: producerDropProbability, + return Test(numberOfMessages: 63, producerDropProbability: producerDropProbability, consumerDropProbability: consumerDropProbability, Option.None, false); } } \ No newline at end of file diff --git a/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs b/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs index 89053c457ad..94db04dc68c 100644 --- a/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs +++ b/src/core/Akka/Delivery/Internal/ConsumerControllerImpl.cs @@ -57,7 +57,11 @@ protected internal override bool AroundReceive(Receive receive, object message) { // TESTING PURPOSES ONLY - used to simulate network failures. if (_fuzzingControl != null && ThreadLocalRandom.Current.NextDouble() < _fuzzingControl(message)) + { + _log.Debug("[Testing] dropping message [{0}] due to fuzzing factor", message); return true; + } + return base.AroundReceive(receive, message); } @@ -281,38 +285,6 @@ private void WaitingForConfirmation(SequencedMessage sequencedMessage) if (_log.IsDebugEnabled) _log.Debug("Received Confirmed seqNr [{0}] from consumer, stashed size [{1}].", seqNr, Stash.Count); - long ComputeNextSeqNr() - { - if (sequencedMessage.First) - { - // confirm the first message immediately to cancel resending of first - var newRequestedSeqNr = seqNr - 1 + Settings.FlowControlWindow; - _log.Debug("Sending Request after first with confirmedSeqNr [{0}], requestUpToSeqNr [{1}]", seqNr, - newRequestedSeqNr); - CurrentState.ProducerController.Tell(new Request(seqNr, newRequestedSeqNr, ResendLost, false)); - return newRequestedSeqNr; - } - - if (CurrentState.RequestedSeqNr - seqNr == Settings.FlowControlWindow / 2) - { - var newRequestedSeqNr = CurrentState.RequestedSeqNr + Settings.FlowControlWindow / 2; - _log.Debug("Sending Request with confirmedSeqNr [{0}], requestUpToSeqNr [{1}]", seqNr, - newRequestedSeqNr); - CurrentState.ProducerController.Tell(new Request(seqNr, newRequestedSeqNr, ResendLost, false)); - _retryTimer.Start(); // reset interval since Request was just sent - return newRequestedSeqNr; - } - - if (sequencedMessage.Ack) - { - if (_log.IsDebugEnabled) - _log.Debug("Sending Ack seqNr [{0}]", seqNr); - CurrentState.ProducerController.Tell(new Ack(seqNr)); - } - - return CurrentState.RequestedSeqNr; - } - var requestedSeqNr = ComputeNextSeqNr(); if (CurrentState.Stopping && Stash.IsEmpty) { @@ -351,6 +323,40 @@ async Task ShutDownAndStop() Stash.Unstash(); Become(Active); } + + return; + + long ComputeNextSeqNr() + { + if (sequencedMessage.First) + { + // confirm the first message immediately to cancel resending of first + var newRequestedSeqNr = seqNr - 1 + Settings.FlowControlWindow; + _log.Debug("Sending Request after first with confirmedSeqNr [{0}], requestUpToSeqNr [{1}]", seqNr, + newRequestedSeqNr); + CurrentState.ProducerController.Tell(new Request(seqNr, newRequestedSeqNr, ResendLost, false)); + return newRequestedSeqNr; + } + + if (CurrentState.RequestedSeqNr - seqNr == Settings.FlowControlWindow / 2) + { + var newRequestedSeqNr = CurrentState.RequestedSeqNr + Settings.FlowControlWindow / 2; + _log.Debug("Sending Request with confirmedSeqNr [{0}], requestUpToSeqNr [{1}]", seqNr, + newRequestedSeqNr); + CurrentState.ProducerController.Tell(new Request(seqNr, newRequestedSeqNr, ResendLost, false)); + _retryTimer.Start(); // reset interval since Request was just sent + return newRequestedSeqNr; + } + + if (sequencedMessage.Ack) + { + if (_log.IsDebugEnabled) + _log.Debug("Sending Ack seqNr [{0}]", seqNr); + CurrentState.ProducerController.Tell(new Ack(seqNr)); + } + + return CurrentState.RequestedSeqNr; + } }); Receive>(msg => diff --git a/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs b/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs index 29e61035044..d011d2b5c51 100644 --- a/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs +++ b/src/core/Akka/Delivery/Internal/ProducerControllerImpl.cs @@ -490,10 +490,10 @@ private void ReceiveRequest(long newConfirmedSeqNr, long newRequestedSeqNr, bool : newRequestedSeqNr; if (newRequestedSeqNr2 != newRequestedSeqNr) - _log.Debug("Expanded requestedSeqNr from [{0}] to [{1}], because current [{3}] and all were probably lost.", + _log.Debug("Expanded requestedSeqNr from [{0}] to [{1}], because current [{2}] and all were probably lost.", newRequestedSeqNr, newRequestedSeqNr2, stateAfterAck.CurrentSeqNr); - if (newRequestedSeqNr > CurrentState.RequestedSeqNr) + if (newRequestedSeqNr2 > CurrentState.RequestedSeqNr) { bool newRequested; if (CurrentState.StoreMessageSentInProgress != 0)