Skip to content

Commit

Permalink
Azure queues config (#425)
Browse files Browse the repository at this point in the history
## Motivation and Context (Why the change? What's the scenario?)
Currently, Azure Queues settings like message processing retries are
hard-coded.

## High level description (Approach, Design)
This PR moves the settings to the `AzureQueuesConfig` class.

---------

Co-authored-by: Devis Lucato <devis@microsoft.com>
  • Loading branch information
marcominerva and dluc authored Apr 23, 2024
1 parent ce815a0 commit bcf07e4
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 24 deletions.
59 changes: 59 additions & 0 deletions extensions/AzureQueues/AzureQueuesConfig.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Text.Json.Serialization;
using System.Text.RegularExpressions;
using Azure;
using Azure.Core;
using Azure.Storage;
Expand All @@ -17,6 +19,8 @@ public class AzureQueuesConfig
private AzureSasCredential? _azureSasCredential;
private TokenCredential? _tokenCredential;

private static readonly Regex s_validPoisonQueueSuffixRegex = new(@"^[a-z0-9-]{1}(?!.*--)[a-z0-9-]{0,28}[a-z0-9]$");

[JsonConverter(typeof(JsonStringEnumConverter))]
public enum AuthTypes
{
Expand All @@ -35,6 +39,38 @@ public enum AuthTypes
public string AccountKey { get; set; } = "";
public string EndpointSuffix { get; set; } = "core.windows.net";

/// <summary>
/// How often to check if there are new messages.
/// </summary>
public int PollDelayMsecs { get; set; } = 100;

/// <summary>
/// How many messages to fetch at a time.
/// </summary>
public int FetchBatchSize { get; set; } = 3;

/// <summary>
/// How long to lock messages once fetched. Azure Queue default is 30 secs.
/// </summary>
public int FetchLockSeconds { get; set; } = 300;

/// <summary>
/// How many times to dequeue a messages and process before moving it to a poison queue.
/// </summary>
public int MaxRetriesBeforePoisonQueue { get; set; } = 20;

/// <summary>
/// Suffix used for the poison queues.
/// </summary>
private string? _poisonQueueSuffix = "-poison";

public string PoisonQueueSuffix
{
get => this._poisonQueueSuffix!;
// Queue names must be lowercase.
set => this._poisonQueueSuffix = value?.ToLowerInvariant() ?? string.Empty;
}

public void SetCredential(StorageSharedKeyCredential credential)
{
this.Auth = AuthTypes.ManualStorageSharedKeyCredential;
Expand Down Expand Up @@ -70,4 +106,27 @@ public TokenCredential GetTokenCredential()
return this._tokenCredential
?? throw new ConfigurationException("TokenCredential not defined");
}

/// <summary>
/// Verify that the current state is valid.
/// </summary>
public void Validate()
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(this.PollDelayMsecs);
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(this.FetchBatchSize);
ArgumentOutOfRangeException.ThrowIfLessThan(this.FetchLockSeconds, 30);
ArgumentOutOfRangeException.ThrowIfNegative(this.MaxRetriesBeforePoisonQueue);
ArgumentException.ThrowIfNullOrWhiteSpace(this.PoisonQueueSuffix);

// Queue names must follow the rules described at
// https://learn.microsoft.com/rest/api/storageservices/naming-queues-and-metadata#queue-names.
// In this case, we need to validate only the suffix part, so rules are slightly different
// (for example, as it is a suffix, it can safely start with a dash (-) character).
// Queue names can be up to 63 characters long, so for the suffix we define a maximum length
// of 30, so there is room for the other name part.
if (!s_validPoisonQueueSuffixRegex.IsMatch(this.PoisonQueueSuffix))
{
throw new ArgumentException($"Invalid {nameof(this.PoisonQueueSuffix)} format.", nameof(this.PoisonQueueSuffix));
}
}
}
37 changes: 14 additions & 23 deletions extensions/AzureQueues/AzureQueuesPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,12 @@ private sealed class MessageEventArgs : EventArgs
/// </summary>
private event AsyncMessageHandler<MessageEventArgs>? Received;

// How often to check if there are new messages
private const int PollDelayMsecs = 100;

// How many messages to fetch at a time
private const int FetchBatchSize = 3;

// How long to lock messages once fetched. Azure Queue default is 30 secs.
private const int FetchLockSeconds = 300;

// How many times to dequeue a messages and process before moving it to a poison queue
private const int MaxRetryBeforePoisonQueue = 20;

// Suffix used for the poison queues
private const string PoisonQueueSuffix = "-poison";

// Queue client builder, requiring the queue name in input
private readonly Func<string, QueueClient> _clientBuilder;

// Queue confirguration
private readonly AzureQueuesConfig _config;

// Queue client, once connected
private QueueClient? _queue;

Expand All @@ -77,6 +65,9 @@ public AzureQueuesPipeline(
AzureQueuesConfig config,
ILogger<AzureQueuesPipeline>? log = null)
{
this._config = config;
this._config.Validate();

this._log = log ?? DefaultLogger<AzureQueuesPipeline>.Instance;

switch (config.Auth)
Expand Down Expand Up @@ -161,14 +152,14 @@ public async Task<IQueue> ConnectToQueueAsync(string queueName, QueueOptions opt
Response? result = await this._queue.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
this._log.LogTrace("Queue ready: status code {0}", result?.Status);

this._poisonQueue = this._clientBuilder(this._queueName + PoisonQueueSuffix);
this._poisonQueue = this._clientBuilder(this._queueName + this._config.PoisonQueueSuffix);
result = await this._poisonQueue.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
this._log.LogTrace("Poison queue ready: status code {0}", result?.Status);

if (options.DequeueEnabled)
{
this._log.LogTrace("Enabling dequeue on queue {0}, every {1} msecs", this._queueName, PollDelayMsecs);
this._dispatchTimer = new Timer(PollDelayMsecs); // milliseconds
this._log.LogTrace("Enabling dequeue on queue {0}, every {1} msecs", this._queueName, this._config.PollDelayMsecs);
this._dispatchTimer = new Timer(this._config.PollDelayMsecs); // milliseconds
this._dispatchTimer.Elapsed += this.DispatchMessages;
this._dispatchTimer.Start();
}
Expand Down Expand Up @@ -201,7 +192,7 @@ public void OnDequeue(Func<string, Task<bool>> processMessageAction)

try
{
if (message.DequeueCount <= MaxRetryBeforePoisonQueue)
if (message.DequeueCount <= this._config.MaxRetriesBeforePoisonQueue)
{
bool success = await processMessageAction.Invoke(message.MessageText).ConfigureAwait(false);
if (success)
Expand Down Expand Up @@ -271,7 +262,7 @@ private void DispatchMessages(object? sender, ElapsedEventArgs ev)
try
{
// Fetch and Hide N messages
Response<QueueMessage[]> receiveMessages = this._queue.ReceiveMessages(FetchBatchSize, visibilityTimeout: TimeSpan.FromSeconds(FetchLockSeconds));
Response<QueueMessage[]> receiveMessages = this._queue.ReceiveMessages(this._config.FetchBatchSize, visibilityTimeout: TimeSpan.FromSeconds(this._config.FetchLockSeconds));
if (receiveMessages.HasValue && receiveMessages.Value.Length > 0)
{
messages = receiveMessages.Value;
Expand Down Expand Up @@ -336,10 +327,10 @@ private async Task MoveMessageToPoisonQueueAsync(QueueMessage message, Cancellat

var poisonMsg = new
{
MessageText = message.MessageText,
message.MessageText,
Id = message.MessageId,
InsertedOn = message.InsertedOn,
DequeueCount = message.DequeueCount,
message.InsertedOn,
message.DequeueCount,
};

var neverExpire = TimeSpan.FromSeconds(-1);
Expand Down
4 changes: 4 additions & 0 deletions extensions/AzureQueues/DependencyInjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public static partial class KernelMemoryBuilderExtensions
{
public static IKernelMemoryBuilder WithAzureQueuesOrchestration(this IKernelMemoryBuilder builder, AzureQueuesConfig config)
{
config.Validate();

builder.Services.AddAzureQueuesOrchestration(config);
return builder;
}
Expand All @@ -22,6 +24,8 @@ public static partial class DependencyInjection
{
public static IServiceCollection AddAzureQueuesOrchestration(this IServiceCollection services, AzureQueuesConfig config)
{
config.Validate();

IQueue QueueFactory(IServiceProvider serviceProvider)
{
return serviceProvider.GetService<AzureQueuesPipeline>()
Expand Down
12 changes: 11 additions & 1 deletion service/Service/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,17 @@
// Note: you can use an env var 'KernelMemory__Services__AzureQueue__ConnectionString' to set this
"ConnectionString": "",
// Setting used only for country clouds
"EndpointSuffix": "core.windows.net"
"EndpointSuffix": "core.windows.net",
// How often to check if there are new messages
"PollDelayMsecs": 100,
// How many messages to fetch at a time
"FetchBatchSize": 3,
// How long to lock messages once fetched. Azure Queue default is 30 secs
"FetchLockSeconds": 300,
// How many times to dequeue a messages and process before moving it to a poison queue
"MaxRetriesBeforePoisonQueue": 20,
// Suffix used for the poison queues.
"PoisonQueueSuffix": "-poison"
},
"Elasticsearch": {
// SHA-256 fingerprint. When running the docker image this is printed after starting the server
Expand Down

0 comments on commit bcf07e4

Please sign in to comment.