Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Queues - Failed message handler #17001

Merged
34 commits merged into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7d7b268
wip
kasobol-msft Nov 16, 2020
74a3175
export api.
kasobol-msft Nov 16, 2020
f8d31c8
Merge remote-tracking branch 'upstream/master' into failed-message-ha…
kasobol-msft Nov 19, 2020
f0755e0
that works.
kasobol-msft Nov 19, 2020
f91dc77
handle peeked messages
kasobol-msft Nov 19, 2020
fe4a3ce
api
kasobol-msft Nov 19, 2020
6185993
merge master
kasobol-msft Dec 4, 2020
2b92bbb
post merge.
kasobol-msft Dec 4, 2020
cf90918
propagate queueclient.
kasobol-msft Dec 4, 2020
7fb3036
merge upstream/master
kasobol-msft Dec 18, 2020
dc3e36a
Merge remote-tracking branch 'upstream/master' into failed-message-ha…
kasobol-msft Jan 7, 2021
e489930
fire and forget callback.
kasobol-msft Jan 7, 2021
415e4fd
tweaks.
kasobol-msft Jan 7, 2021
31279da
merge master.
kasobol-msft Jan 26, 2021
87c5c9e
re-record.
kasobol-msft Jan 26, 2021
4d5eeac
hack core temporarily.
kasobol-msft Jan 26, 2021
451c330
use event hander from core.
kasobol-msft Jan 26, 2021
3f8aeec
revert test change.
kasobol-msft Jan 26, 2021
13cf452
remove direct core reference from test package.
kasobol-msft Jan 26, 2021
af47e2c
that won't be necessary.
kasobol-msft Jan 26, 2021
7cad45d
more tests.
kasobol-msft Jan 26, 2021
918349f
readme.
kasobol-msft Jan 26, 2021
ea14fa1
whitespace.
kasobol-msft Jan 26, 2021
a078234
merge master
kasobol-msft Jan 27, 2021
27fb680
some pr feedback.
kasobol-msft Jan 27, 2021
8eb362e
api
kasobol-msft Jan 27, 2021
23c5214
get parent queue service.
kasobol-msft Jan 28, 2021
40e56ec
use project ref.
kasobol-msft Jan 29, 2021
30aad88
readme tweaks.
kasobol-msft Jan 29, 2021
13d53d5
renaming.
kasobol-msft Jan 29, 2021
6585e9f
misc.
kasobol-msft Jan 29, 2021
46f69cb
protected onevent thing.
kasobol-msft Jan 29, 2021
59c3d24
merge master.
kasobol-msft Jan 29, 2021
523d886
post merge.
kasobol-msft Jan 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/core/Azure.Core.TestFramework/src/ClientTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ internal object InstrumentClient(Type clientType, object client, IEnumerable<IIn
{
foreach (MethodInfo methodInfo in clientType.GetMethods(BindingFlags.Instance | BindingFlags.Public))
{
if (methodInfo.Name.EndsWith("Async") && !methodInfo.IsVirtual)
if (!methodInfo.Name.StartsWith("add_") && !methodInfo.Name.StartsWith("remove_") // exclude events
&& methodInfo.Name.EndsWith("Async") && !methodInfo.IsVirtual)
{
validationException = new InvalidOperationException($"Client type contains public non-virtual async method {methodInfo.Name}");

Expand Down
22 changes: 22 additions & 0 deletions sdk/storage/Azure.Storage.Common/tests/Shared/StorageTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,28 @@ protected async Task WaitForProgressAsync(System.Collections.Concurrent.Concurre
Assert.Warn("Progress notifications never completed!");
}

protected async Task WaitForCondition(Func<bool> predicate, int tries = 5, TimeSpan delay = default)
{
if (delay == default)
{
delay = TimeSpan.FromSeconds(2);
}

for (int i = 0; i < tries; i++)
{
if (predicate())
{
return;
}
else
{
await Task.Delay(delay);
}
}

throw new Exception("Condition not reached");
}

protected void AssertSecondaryStorageFirstRetrySuccessful(string primaryHost, string secondaryHost, TestExceptionPolicy testExceptionPolicy)
{
Assert.AreEqual(primaryHost, testExceptionPolicy.HostsSetInRequests[0]);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
namespace Azure.Storage.Queues
{
public partial class InvalidQueueMessageEventArgs : System.EventArgs
{
public InvalidQueueMessageEventArgs(Azure.Storage.Queues.QueueClient sender, Azure.Storage.Queues.Models.PeekedMessage message, System.Threading.CancellationToken cancellationToken) { }
public InvalidQueueMessageEventArgs(Azure.Storage.Queues.QueueClient sender, Azure.Storage.Queues.Models.QueueMessage message, System.Threading.CancellationToken cancellationToken) { }
public System.Threading.CancellationToken CancellationToken { get { throw null; } }
public object Message { get { throw null; } }
public Azure.Storage.Queues.QueueClient Sender { get { throw null; } }
}
public partial class QueueClient
{
protected QueueClient() { }
Expand All @@ -15,6 +23,7 @@ public QueueClient(System.Uri queueUri, Azure.Storage.StorageSharedKeyCredential
protected virtual System.Uri MessagesUri { get { throw null; } }
public virtual string Name { get { throw null; } }
public virtual System.Uri Uri { get { throw null; } }
public event System.Func<Azure.Storage.Queues.InvalidQueueMessageEventArgs, System.Threading.Tasks.Task> InvalidQueueMessageAsync { add { } remove { } }
public virtual Azure.Response ClearMessages(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> ClearMessagesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response Create(System.Collections.Generic.IDictionary<string, string> metadata = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Threading;
using Azure.Storage.Queues.Models;

namespace Azure.Storage.Queues
{
/// <summary>
/// Contains information about invalid message.
/// See also <see cref="QueueClient.InvalidQueueMessageAsync"/>.
/// </summary>
public class InvalidQueueMessageEventArgs : EventArgs
{
/// <summary>
/// Gets the <see cref="QueueClient"/> that has received invalid message.
/// </summary>
public QueueClient Sender { get; }

/// <summary>
/// Gets the invalid message which can be either <see cref="QueueMessage"/> or <see cref="PeekedMessage"/>.
/// The body of the message is as received, i.e. no decoding is attempted.
/// </summary>
public object Message { get; }

/// <summary>
/// A <see cref="CancellationToken"/> instance to signal the request to cancel the operation.
/// </summary>
public CancellationToken CancellationToken { get; }

/// <summary>
/// Initializes a new instance of the <see cref="InvalidQueueMessageEventArgs"/>.
/// </summary>
/// <param name="sender">The <see cref="QueueClient"/> that has received invalid message.</param>
/// <param name="message">The invalid message</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
public InvalidQueueMessageEventArgs(QueueClient sender, QueueMessage message, CancellationToken cancellationToken)
{
Sender = sender;
Message = message;
CancellationToken = cancellationToken;
}

/// <summary>
/// Initializes a new instance of the <see cref="InvalidQueueMessageEventArgs"/>.
/// </summary>
/// <param name="sender">The <see cref="QueueClient"/> that has received invalid message.</param>
/// <param name="message">The invalid message</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
public InvalidQueueMessageEventArgs(QueueClient sender, PeekedMessage message, CancellationToken cancellationToken)
{
Sender = sender;
Message = message;
CancellationToken = cancellationToken;
}
}
}
145 changes: 126 additions & 19 deletions sdk/storage/Azure.Storage.Queues/src/QueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Globalization;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -126,6 +125,29 @@ public virtual string AccountName

internal virtual QueueMessageEncoding MessageEncoding => _messageEncoding;

/// <summary>
/// Optional. Performs the tasks needed when an invalid message is received or peaked from the queue.
///
/// <para>Invalid message can be received or peaked when <see cref="QueueClient"/> is expecting certain <see cref="QueueMessageEncoding"/>
/// but there's another producer that is not encoding messages in expected way. I.e. the queue contains messages with different encoding.</para>
///
/// <para><see cref="InvalidQueueMessageEventArgs"/> contains <see cref="QueueClient"/> that has received invalid message as well as the message
/// which can be either <see cref="QueueMessage"/> or <see cref="PeekedMessage"/> with raw body, i.e. no decoding will be attempted so that
/// body can be inspected as has been received from the queue.</para>
///
/// <para>The <see cref="QueueClient"/> won't attempt to remove invalid message from the queue. Therefore such handling should be included into
/// the event handler itself.</para>
/// </summary>
#pragma warning disable AZC0002 // DO ensure all service methods, both asynchronous and synchronous, take an optional CancellationToken parameter called cancellationToken.
#pragma warning disable AZC0003 // DO make service methods virtual.
#pragma warning disable AZC0004 // DO provide both asynchronous and synchronous variants for all service methods.
#pragma warning disable AZC0015 // Unexpected client method return type.
public event Func<InvalidQueueMessageEventArgs, Task> InvalidQueueMessageAsync;
#pragma warning restore AZC0015 // Unexpected client method return type.
#pragma warning restore AZC0004 // DO provide both asynchronous and synchronous variants for all service methods.
#pragma warning restore AZC0003 // DO make service methods virtual.
#pragma warning restore AZC0002 // DO ensure all service methods, both asynchronous and synchronous, take an optional CancellationToken parameter called cancellationToken.

/// <summary>
/// Gets the name of the queue.
/// </summary>
Expand Down Expand Up @@ -2016,8 +2038,11 @@ private async Task<Response<QueueMessage[]>> ReceiveMessagesInternal(
$"Uri: {MessagesUri}\n" +
$"{nameof(maxMessages)}: {maxMessages}\n" +
$"{nameof(visibilityTimeout)}: {visibilityTimeout}");
DiagnosticScope scope = ClientDiagnostics.CreateScope(operationName);
try
{
scope.Start();

var response = await QueueRestClient.Messages.DequeueAsync(
ClientDiagnostics,
Pipeline,
Expand All @@ -2034,31 +2059,69 @@ private async Task<Response<QueueMessage[]>> ReceiveMessagesInternal(
if (response.IsUnavailable())
{
return response.GetRawResponse().AsNoBodyResponse<QueueMessage[]>();
}
else if (UsingClientSideEncryption)
{
return Response.FromValue(
await new QueueClientSideDecryptor(ClientSideEncryption)
.ClientSideDecryptMessagesInternal(response.Value.Select(x => QueueMessage.ToQueueMessage(x, _messageEncoding)).ToArray(), async, cancellationToken).ConfigureAwait(false),
response.GetRawResponse());
}
else
} else
{
return Response.FromValue(response.Value.Select(x => QueueMessage.ToQueueMessage(x, _messageEncoding)).ToArray(), response.GetRawResponse());
QueueMessage[] queueMessages;
if (InvalidQueueMessageAsync != null)
{
queueMessages = ToQueueMessagesWithInvalidMessageHandling(response.Value, cancellationToken);
} else
{
queueMessages = response.Value.Select(x => QueueMessage.ToQueueMessage(x, _messageEncoding)).ToArray();
}

if (UsingClientSideEncryption)
{
return Response.FromValue(
await new QueueClientSideDecryptor(ClientSideEncryption)
.ClientSideDecryptMessagesInternal(queueMessages, async, cancellationToken).ConfigureAwait(false),
response.GetRawResponse());
}
else
{
return Response.FromValue(queueMessages, response.GetRawResponse());
}
}
}
catch (Exception ex)
{
scope.Failed(ex);
Pipeline.LogException(ex);
throw;
}
finally
{
scope.Dispose();
Pipeline.LogMethodExit(nameof(QueueClient));
}
}
}

private QueueMessage[] ToQueueMessagesWithInvalidMessageHandling(
IEnumerable<DequeuedMessageItem> dequeuedMessageItems,
CancellationToken cancellationToken)
{
List<QueueMessage> queueMessages = new List<QueueMessage>();

foreach (var dequeuedMessageItem in dequeuedMessageItems)
{
try
{
queueMessages.Add(QueueMessage.ToQueueMessage(dequeuedMessageItem, _messageEncoding));
}
catch (FormatException)
{
_ = Task.Run(() => InvalidQueueMessageAsync.Invoke(
new InvalidQueueMessageEventArgs(
this,
QueueMessage.ToQueueMessage(dequeuedMessageItem, QueueMessageEncoding.None),
cancellationToken)), cancellationToken);
}
}

return queueMessages.ToArray();
}

#endregion ReceiveMessages

#region ReceiveMessage
Expand Down Expand Up @@ -2308,8 +2371,11 @@ private async Task<Response<PeekedMessage[]>> PeekMessagesInternal(
message:
$"Uri: {MessagesUri}\n" +
$"{nameof(maxMessages)}: {maxMessages}");
DiagnosticScope scope = ClientDiagnostics.CreateScope(operationName);
try
{
scope.Start();

Response<IEnumerable<PeekedMessageItem>> response = await QueueRestClient.Messages.PeekAsync(
ClientDiagnostics,
Pipeline,
Expand All @@ -2326,29 +2392,70 @@ private async Task<Response<PeekedMessage[]>> PeekMessagesInternal(
{
return response.GetRawResponse().AsNoBodyResponse<PeekedMessage[]>();
}
else if (UsingClientSideEncryption)
{
return Response.FromValue(
await new QueueClientSideDecryptor(ClientSideEncryption)
.ClientSideDecryptMessagesInternal(response.Value.Select(x => PeekedMessage.ToPeekedMessage(x, _messageEncoding)).ToArray(), async, cancellationToken).ConfigureAwait(false),
response.GetRawResponse());
}
else
{
return Response.FromValue(response.Value.Select(x => PeekedMessage.ToPeekedMessage(x, _messageEncoding)).ToArray(), response.GetRawResponse());
PeekedMessage[] peekedMessages;
if (InvalidQueueMessageAsync != null)
{
peekedMessages = ToPeekedMessagesWithInvalidMessageHandling(response.Value, cancellationToken);
}
else
{
peekedMessages = response.Value.Select(x => PeekedMessage.ToPeekedMessage(x, _messageEncoding)).ToArray();
}

if (UsingClientSideEncryption)
{
return Response.FromValue(
await new QueueClientSideDecryptor(ClientSideEncryption)
.ClientSideDecryptMessagesInternal(peekedMessages, async, cancellationToken).ConfigureAwait(false),
response.GetRawResponse());
}
else
{
return Response.FromValue(peekedMessages, response.GetRawResponse());
}
}
}
catch (Exception ex)
{
scope.Failed(ex);
Pipeline.LogException(ex);
throw;
}
finally
{
scope.Dispose();
Pipeline.LogMethodExit(nameof(QueueClient));
}
}
}

private PeekedMessage[] ToPeekedMessagesWithInvalidMessageHandling(
IEnumerable<PeekedMessageItem> peekedMessageItems,
CancellationToken cancellationToken)
{
List<PeekedMessage> peekedMessages = new List<PeekedMessage>();

foreach (var peekedMessageItem in peekedMessageItems)
{
try
{
peekedMessages.Add(PeekedMessage.ToPeekedMessage(peekedMessageItem, _messageEncoding));
}
catch (FormatException)
{
_ = Task.Run(() => InvalidQueueMessageAsync.Invoke(
new InvalidQueueMessageEventArgs(
this,
PeekedMessage.ToPeekedMessage(peekedMessageItem, QueueMessageEncoding.None),
cancellationToken)), cancellationToken);
}
}

return peekedMessages.ToArray();
}

#endregion PeekMessages

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/Azure.Storage.Queues/src/QueueClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
// Licensed under the MIT License.

using System;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Core.Pipeline;
using Azure.Storage.Queues.Models;
using Azure.Storage.Queues.Specialized;

namespace Azure.Storage.Queues
{
Expand Down
Loading