Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/extraction rate limiting #1945

Merged
merged 11 commits into from
Sep 26, 2024
4 changes: 4 additions & 0 deletions news/1945-feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Add a publish timeout backoff mechanism to ProducerModel, allowing control over message publishing timeout behaviour. This can be enabled by setting `BackoffProviderType` in any `ProducerOptions` config. Currently implemented types are:

- StaticBackoffProvider (1 minute flat timeout)
- ExponentialBackoffProvider (1 minute initial, doubling after each timeout)
19 changes: 19 additions & 0 deletions src/SmiServices/Common/Messaging/BackoffProviderFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;

namespace SmiServices.Common.Messaging;

internal static class BackoffProviderFactory
{
public static IBackoffProvider Create(string typename)
{
if (!Enum.TryParse(typename, ignoreCase: true, out BackoffProviderType backoffProviderType))
throw new ArgumentException($"Could not parse '{typename}' to a valid BackoffProviderType");

return backoffProviderType switch
{
BackoffProviderType.StaticBackoffProvider => new StaticBackoffProvider(),
BackoffProviderType.ExponentialBackoffProvider => new ExponentialBackoffProvider(),
_ => throw new NotImplementedException($"No case for BackoffProviderType '{backoffProviderType}'"),

Check warning on line 16 in src/SmiServices/Common/Messaging/BackoffProviderFactory.cs

View check run for this annotation

Codecov / codecov/patch

src/SmiServices/Common/Messaging/BackoffProviderFactory.cs#L15-L16

Added lines #L15 - L16 were not covered by tests
};
}
}
8 changes: 8 additions & 0 deletions src/SmiServices/Common/Messaging/BackoffProviderType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace SmiServices.Common.Messaging;

public enum BackoffProviderType
{
None = 0,
StaticBackoffProvider,
ExponentialBackoffProvider,
}
4 changes: 2 additions & 2 deletions src/SmiServices/Common/Messaging/BatchProducerModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace SmiServices.Common.Messaging
/// </summary>
public class BatchProducerModel : ProducerModel
{
public BatchProducerModel(string exchangeName, IModel model, IBasicProperties properties, int maxPublishAttempts = 1)
: base(exchangeName, model, properties, maxPublishAttempts) { }
public BatchProducerModel(string exchangeName, IModel model, IBasicProperties properties, int maxPublishAttempts = 1, IBackoffProvider? backoffProvider = null)
: base(exchangeName, model, properties, maxPublishAttempts, backoffProvider) { }


/// <summary>
Expand Down
27 changes: 27 additions & 0 deletions src/SmiServices/Common/Messaging/ExponentialBackoffProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;

namespace SmiServices.Common.Messaging;

public class ExponentialBackoffProvider : IBackoffProvider
{
private readonly TimeSpan _initialBackoff;
private TimeSpan _currentBackoff;

public ExponentialBackoffProvider(TimeSpan? initialBackoff = null)
{
_initialBackoff = initialBackoff ?? new TimeSpan(hours: 0, minutes: 1, seconds: 0);
Reset();
}

public TimeSpan GetNextBackoff()
{
var b = _currentBackoff;
_currentBackoff *= 2;
return b;
}

public void Reset()
{
_currentBackoff = _initialBackoff;
}
}
10 changes: 10 additions & 0 deletions src/SmiServices/Common/Messaging/IBackoffProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;

namespace SmiServices.Common.Messaging;

public interface IBackoffProvider
{
TimeSpan GetNextBackoff();

void Reset();
}
24 changes: 18 additions & 6 deletions src/SmiServices/Common/Messaging/ProducerModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace SmiServices.Common.Messaging
{
Expand All @@ -30,6 +31,7 @@ public class ProducerModel : IProducerModel
// Used to stop messages being produced if we are in the process of crashing out
private readonly object _oSendLock = new();

private readonly IBackoffProvider? _backoffProvider;

/// <summary>
///
Expand All @@ -38,7 +40,8 @@ public class ProducerModel : IProducerModel
/// <param name="model"></param>
/// <param name="properties"></param>
/// <param name="maxRetryAttempts">Max number of times to retry message confirmations</param>
public ProducerModel(string exchangeName, IModel model, IBasicProperties properties, int maxRetryAttempts = 1)
/// <param name="backoffProvider"></param>
public ProducerModel(string exchangeName, IModel model, IBasicProperties properties, int maxRetryAttempts = 1, IBackoffProvider? backoffProvider = null)
{
if (string.IsNullOrWhiteSpace(exchangeName))
throw new ArgumentException("exchangeName parameter is invalid: \"" + exchangeName + "\"");
Expand All @@ -63,6 +66,8 @@ public ProducerModel(string exchangeName, IModel model, IBasicProperties propert

// Handle RabbitMQ putting the queue into flow control mode
_model.FlowControl += (s, a) => _logger.Warn("FlowControl for " + exchangeName);

_backoffProvider = backoffProvider;
}


Expand Down Expand Up @@ -91,20 +96,27 @@ public void WaitForConfirms()

while (keepTrying)
{
bool ok = _model.WaitForConfirms(TimeSpan.FromMilliseconds(ConfirmTimeoutMs), out var timedOut);
if (_model.WaitForConfirms(TimeSpan.FromMilliseconds(ConfirmTimeoutMs), out var timedOut))
{
_backoffProvider?.Reset();
return;
}

if (timedOut)
{
keepTrying = (++numAttempts < _maxRetryAttempts);
_logger.Warn($"RabbitMQ WaitForConfirms timed out. numAttempts: {numAttempts}");

TimeSpan? backoff = _backoffProvider?.GetNextBackoff();
if (backoff.HasValue)
{
_logger.Warn($"Backing off for {backoff}");
Thread.Sleep(backoff.Value);
}

continue;
}

// All good
if (ok)
return;

throw new ApplicationException("RabbitMQ got a Nack");
}

Expand Down
19 changes: 16 additions & 3 deletions src/SmiServices/Common/Messaging/RabbitMQBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,26 @@ public IProducerModel SetupProducer(ProducerOptions producerOptions, bool isBatc
props.ContentType = "application/json";
props.Persistent = true;

IProducerModel producerModel;
IBackoffProvider? backoffProvider = null;
if (producerOptions.BackoffProviderType != null)
{
try
{
backoffProvider = BackoffProviderFactory.Create(producerOptions.BackoffProviderType);
}
catch (Exception)
{
model.Close(200, "SetupProducer - Couldn't create BackoffProvider");
throw;
}
}

IProducerModel producerModel;
try
{
producerModel = isBatch ?
new BatchProducerModel(producerOptions.ExchangeName!, model, props, producerOptions.MaxConfirmAttempts) :
new ProducerModel(producerOptions.ExchangeName!, model, props, producerOptions.MaxConfirmAttempts);
new BatchProducerModel(producerOptions.ExchangeName!, model, props, producerOptions.MaxConfirmAttempts, backoffProvider) :
new ProducerModel(producerOptions.ExchangeName!, model, props, producerOptions.MaxConfirmAttempts, backoffProvider);
}
catch (Exception)
{
Expand Down
18 changes: 18 additions & 0 deletions src/SmiServices/Common/Messaging/StaticBackoffProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;

namespace SmiServices.Common.Messaging
{
public class StaticBackoffProvider : IBackoffProvider
{
private readonly TimeSpan _initialBackoff;

public StaticBackoffProvider(TimeSpan? initialBackoff = null)
{
_initialBackoff = initialBackoff ?? new TimeSpan(hours: 0, minutes: 1, seconds: 0);
}

public TimeSpan GetNextBackoff() => _initialBackoff;

public void Reset() { }

Check warning on line 16 in src/SmiServices/Common/Messaging/StaticBackoffProvider.cs

View check run for this annotation

Codecov / codecov/patch

src/SmiServices/Common/Messaging/StaticBackoffProvider.cs#L16

Added line #L16 was not covered by tests
}
}
11 changes: 7 additions & 4 deletions src/SmiServices/Common/Options/ProducerOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Equ;
using SmiServices.Common.Messaging;

namespace SmiServices.Common.Options
{
Expand All @@ -17,6 +18,11 @@ public class ProducerOptions : MemberwiseEquatable<ProducerOptions>, IOptions
/// </summary>
public int MaxConfirmAttempts { get; set; } = 1;

/// <summary>
/// Specify the <see cref="IBackoffProvider"/> to use when handling publish failures
/// </summary>
public string? BackoffProviderType { get; set; }

/// <summary>
/// Verifies that the individual options have been populated
/// </summary>
Expand All @@ -26,9 +32,6 @@ public bool VerifyPopulated()
return !string.IsNullOrWhiteSpace(ExchangeName);
}

public override string ToString()
{
return "ExchangeName: " + ExchangeName + ", MaxConfirmAttempts: " + MaxConfirmAttempts;
}
public override string ToString() => $"ExchangeName={ExchangeName}, MaxConfirmAttempts={MaxConfirmAttempts}, BackoffProviderType={BackoffProviderType}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,70 @@ public void MessageHolds()
});
}

[Test]
public void SetupProducer_NullBackoffProvider_DoesNotThrow()
{
// Arrange

var exchangeName = $"TEST.{nameof(SetupProducer_NullBackoffProvider_DoesNotThrow)}Exchange";
_tester.CreateExchange(exchangeName);

var producerOptions = new ProducerOptions
{
ExchangeName = exchangeName,
BackoffProviderType = null,
};

var broker = new RabbitMQBroker(_testOptions.RabbitOptions!, "RabbitMqAdapterTests");

// Act
// Assert
Assert.DoesNotThrow(() => broker.SetupProducer(producerOptions));
}

[Test]
public void SetupProducer_InvalidBackoffProvider_Throws()
{
// Arrange

var exchangeName = $"TEST.{nameof(SetupProducer_InvalidBackoffProvider_Throws)}Exchange";
_tester.CreateExchange(exchangeName);

var producerOptions = new ProducerOptions
{
ExchangeName = exchangeName,
BackoffProviderType = "Foo",
};

var broker = new RabbitMQBroker(_testOptions.RabbitOptions!, "RabbitMqAdapterTests");

// Act
// Assert
var exc = Assert.Throws<ArgumentException>(() => broker.SetupProducer(producerOptions));
Assert.That(exc.Message, Is.EqualTo("Could not parse 'Foo' to a valid BackoffProviderType"));
}

[Test]
public void SetupProducer_ValidBackoffProvider()
{
// Arrange

var exchangeName = $"TEST.{nameof(SetupProducer_InvalidBackoffProvider_Throws)}Exchange";
_tester.CreateExchange(exchangeName);

var producerOptions = new ProducerOptions
{
ExchangeName = exchangeName,
BackoffProviderType = "StaticBackoffProvider",
};

var broker = new RabbitMQBroker(_testOptions.RabbitOptions!, "RabbitMqAdapterTests");

// Act
// Assert
Assert.DoesNotThrow(() => broker.SetupProducer(producerOptions));
}

private class ThrowingConsumer : Consumer<TestMessage>
{
public int HeldMessages { get => _heldMessages; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using NUnit.Framework;
using SmiServices.Common.Messaging;
using System;

namespace SmiServices.UnitTests.Common.Messaging;

internal class ExponentialBackoffProviderTests
{
[OneTimeSetUp]
public void OneTimeSetUp()
{
TestLogger.Setup();
}

[Test]
public void Constructor_WithTimeSpan_IsSet()
{
var provider = new ExponentialBackoffProvider(new TimeSpan(1, 2, 3));

Assert.That(provider.GetNextBackoff(), Is.EqualTo(new TimeSpan(1, 2, 3)));
}

[Test]
public void Constructor_WithNoTimeSpan_UsesDefault()
{
var provider = new ExponentialBackoffProvider();

Assert.That(provider.GetNextBackoff(), Is.EqualTo(new TimeSpan(0, 1, 0)));
}

[Test]
public void GetNextBackoff_ReturnsIncreasingTimeSpan()
{
var provider = new ExponentialBackoffProvider(new TimeSpan(1));
Assert.That(provider.GetNextBackoff(), Is.EqualTo(new TimeSpan(1)));
Assert.That(provider.GetNextBackoff(), Is.EqualTo(new TimeSpan(2)));
Assert.That(provider.GetNextBackoff(), Is.EqualTo(new TimeSpan(4)));
Assert.That(provider.GetNextBackoff(), Is.EqualTo(new TimeSpan(8)));
}

[Test]
public void Reset_ReturnsTimeoutToInitial()
{
var provider = new ExponentialBackoffProvider(new TimeSpan(1));
Assert.That(provider.GetNextBackoff(), Is.EqualTo(new TimeSpan(1)));
Assert.That(provider.GetNextBackoff(), Is.EqualTo(new TimeSpan(2)));
provider.Reset();
Assert.That(provider.GetNextBackoff(), Is.EqualTo(new TimeSpan(1)));
}
}
Loading
Loading