Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/extraction rate limiting #1945

Merged
merged 11 commits into from
Sep 26, 2024
Prev Previous commit
Next Next commit
add IBackoffProvider support to ProducerModel
  • Loading branch information
rkm committed Sep 26, 2024
commit ffa796eda6c18ee49a99f1ff89e0a465b713a393
10 changes: 10 additions & 0 deletions src/SmiServices/Common/Messaging/IBackoffProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;

namespace SmiServices.Common.Messaging;

public interface IBackoffProvider
{
TimeSpan GetNextBackoff();

void Reset();
}
15 changes: 13 additions & 2 deletions src/SmiServices/Common/Messaging/ProducerModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace SmiServices.Common.Messaging
{
Expand All @@ -30,6 +31,7 @@ public class ProducerModel : IProducerModel
// Used to stop messages being produced if we are in the process of crashing out
private readonly object _oSendLock = new();

private readonly IBackoffProvider _backoffProvider;

/// <summary>
///
Expand All @@ -38,7 +40,8 @@ public class ProducerModel : IProducerModel
/// <param name="model"></param>
/// <param name="properties"></param>
/// <param name="maxRetryAttempts">Max number of times to retry message confirmations</param>
public ProducerModel(string exchangeName, IModel model, IBasicProperties properties, int maxRetryAttempts = 1)
/// <param name="backoffProvider"></param>
public ProducerModel(string exchangeName, IModel model, IBasicProperties properties, int maxRetryAttempts = 1, IBackoffProvider? backoffProvider = null)
{
if (string.IsNullOrWhiteSpace(exchangeName))
throw new ArgumentException("exchangeName parameter is invalid: \"" + exchangeName + "\"");
Expand All @@ -63,6 +66,8 @@ public ProducerModel(string exchangeName, IModel model, IBasicProperties propert

// Handle RabbitMQ putting the queue into flow control mode
_model.FlowControl += (s, a) => _logger.Warn("FlowControl for " + exchangeName);

_backoffProvider = backoffProvider ?? new StaticBackoffProvider();
}


Expand Down Expand Up @@ -92,12 +97,18 @@ public void WaitForConfirms()
while (keepTrying)
{
if (_model.WaitForConfirms(TimeSpan.FromMilliseconds(ConfirmTimeoutMs), out var timedOut))
{
_backoffProvider.Reset();
return;
}

if (timedOut)
{
keepTrying = (++numAttempts < _maxRetryAttempts);
_logger.Warn($"RabbitMQ WaitForConfirms timed out. numAttempts: {numAttempts}");
var backoff = _backoffProvider.GetNextBackoff();

_logger.Warn($"RabbitMQ WaitForConfirms timed out. numAttempts: {numAttempts}. Backing off for {backoff}");
Thread.Sleep(backoff);

continue;
}
Expand Down
18 changes: 18 additions & 0 deletions src/SmiServices/Common/Messaging/StaticBackoffProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;

namespace SmiServices.Common.Messaging
{
public class StaticBackoffProvider : IBackoffProvider
{
private readonly TimeSpan _backoff;

public StaticBackoffProvider(TimeSpan? backoff = null)
{
_backoff = backoff ?? new TimeSpan(hours: 0, minutes: 1, seconds: 0);
}

public TimeSpan GetNextBackoff() => _backoff;

Check warning on line 14 in src/SmiServices/Common/Messaging/StaticBackoffProvider.cs

View check run for this annotation

Codecov / codecov/patch

src/SmiServices/Common/Messaging/StaticBackoffProvider.cs#L14

Added line #L14 was not covered by tests

public void Reset() { }
}
}
13 changes: 11 additions & 2 deletions tests/SmiServices.UnitTests/Common/Messaging/ProducerModelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@ public void SendMessage_HappyPath()
var mockBasicProperties = new Mock<IBasicProperties>();
mockBasicProperties.Setup(x => x.Headers).Returns(new Dictionary<string, object>());

var producerModel = new ProducerModel("Exchange", mockModel.Object, mockBasicProperties.Object);
var mockBackoffProvider = new Mock<IBackoffProvider>(MockBehavior.Strict);
mockBackoffProvider.Setup(x => x.GetNextBackoff()).Returns(TimeSpan.Zero);
mockBackoffProvider.Setup(x => x.Reset()).Verifiable();

var maxRetryAttempts = 1;
var producerModel = new ProducerModel("Exchange", mockModel.Object, mockBasicProperties.Object, maxRetryAttempts, mockBackoffProvider.Object);
var message = new TestMessage();

// Act
// Assert
Assert.DoesNotThrow(() => producerModel.SendMessage(message, inResponseTo: null, routingKey: null));
mockBackoffProvider.Verify();
}

[Test]
Expand All @@ -50,8 +56,11 @@ public void SendMessage_ThrowsException_OnTimeout()
var mockBasicProperties = new Mock<IBasicProperties>();
mockBasicProperties.Setup(x => x.Headers).Returns(new Dictionary<string, object>());

var mockBackoffProvider = new Mock<IBackoffProvider>(MockBehavior.Strict);
mockBackoffProvider.Setup(x => x.GetNextBackoff()).Returns(TimeSpan.Zero);

var maxRetryAttempts = 1;
var producerModel = new ProducerModel("Exchange", mockModel.Object, mockBasicProperties.Object, maxRetryAttempts);
var producerModel = new ProducerModel("Exchange", mockModel.Object, mockBasicProperties.Object, maxRetryAttempts, mockBackoffProvider.Object);
var message = new TestMessage();

// Act
Expand Down
Loading