Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Badly encoded handler in Webjobs #18326

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
7d7b268
wip
kasobol-msft Nov 16, 2020
74a3175
export api.
kasobol-msft Nov 16, 2020
f8d31c8
Merge remote-tracking branch 'upstream/master' into failed-message-ha…
kasobol-msft Nov 19, 2020
f0755e0
that works.
kasobol-msft Nov 19, 2020
f91dc77
handle peeked messages
kasobol-msft Nov 19, 2020
fe4a3ce
api
kasobol-msft Nov 19, 2020
6185993
merge master
kasobol-msft Dec 4, 2020
2b92bbb
post merge.
kasobol-msft Dec 4, 2020
cf90918
propagate queueclient.
kasobol-msft Dec 4, 2020
7fb3036
merge upstream/master
kasobol-msft Dec 18, 2020
dc3e36a
Merge remote-tracking branch 'upstream/master' into failed-message-ha…
kasobol-msft Jan 7, 2021
e489930
fire and forget callback.
kasobol-msft Jan 7, 2021
415e4fd
tweaks.
kasobol-msft Jan 7, 2021
31279da
merge master.
kasobol-msft Jan 26, 2021
87c5c9e
re-record.
kasobol-msft Jan 26, 2021
4d5eeac
hack core temporarily.
kasobol-msft Jan 26, 2021
451c330
use event hander from core.
kasobol-msft Jan 26, 2021
3f8aeec
revert test change.
kasobol-msft Jan 26, 2021
13cf452
remove direct core reference from test package.
kasobol-msft Jan 26, 2021
af47e2c
that won't be necessary.
kasobol-msft Jan 26, 2021
7cad45d
more tests.
kasobol-msft Jan 26, 2021
918349f
readme.
kasobol-msft Jan 26, 2021
ea14fa1
whitespace.
kasobol-msft Jan 26, 2021
b860b98
change options creation.
kasobol-msft Jan 27, 2021
a078234
merge master
kasobol-msft Jan 27, 2021
27fb680
some pr feedback.
kasobol-msft Jan 27, 2021
8eb362e
api
kasobol-msft Jan 27, 2021
7872397
Merge branch 'failed-message-handler' into failed-message-handler-in-…
kasobol-msft Jan 27, 2021
23c5214
get parent queue service.
kasobol-msft Jan 28, 2021
d5f4003
Merge branch 'failed-message-handler' into failed-message-handler-in-…
kasobol-msft Jan 28, 2021
50988ae
can handle bad message.
kasobol-msft Jan 28, 2021
4a607e9
blob trigger.
kasobol-msft Jan 28, 2021
3263066
merge master.
kasobol-msft Feb 1, 2021
4ed7343
post-merge.
kasobol-msft Feb 1, 2021
8b00355
post-merge.
kasobol-msft Feb 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,64 @@
// Licensed under the MIT License.

using System;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs
{
internal class QueueServiceClientProvider : StorageClientProvider<QueueServiceClient, QueueClientOptions>
{
private readonly QueuesOptions _queuesOptions;
private readonly ILogger<QueueServiceClientProvider> _logger;

public QueueServiceClientProvider(
IConfiguration configuration,
AzureComponentFactory componentFactory,
AzureEventSourceLogForwarder logForwarder,
IOptions<QueuesOptions> queueOptions)
IOptions<QueuesOptions> queueOptions,
ILogger<QueueServiceClientProvider> logger)
: base(configuration, componentFactory, logForwarder)
{
_queuesOptions = queueOptions?.Value;
_logger = logger;
}

protected override QueueServiceClient CreateClientFromConnectionString(string connectionString, QueueClientOptions options)
protected override QueueClientOptions CreateClientOptions(IConfiguration configuration)
{
var options = base.CreateClientOptions(configuration);
options.MessageEncoding = _queuesOptions.MessageEncoding;
options.MessageDecodingFailed += HandleMessageDecodingFailed;
return options;
}

protected override QueueServiceClient CreateClientFromConnectionString(string connectionString, QueueClientOptions options)
{
return new QueueServiceClient(connectionString, options);
}

protected override QueueServiceClient CreateClientFromTokenCredential(Uri endpointUri, TokenCredential tokenCredential, QueueClientOptions options)
{
options.MessageEncoding = _queuesOptions.MessageEncoding;
return new QueueServiceClient(endpointUri, tokenCredential, options);
}

private async Task HandleMessageDecodingFailed(QueueMessageDecodingFailedEventArgs args)
{
// SharedBlobQueueProcessor moves to poison queue only if message is parsable and has corresponding registration.
// Therefore, we log and discard garbage here.
if (args.ReceivedMessage != null)
{
_logger.LogWarning("Invalid message in blob trigger queue {QueueName}, messageId={messageId}, body={body}",
args.Queue.Name, args.ReceivedMessage.MessageId, args.ReceivedMessage.Body.ToString());
await args.Queue.DeleteMessageAsync(args.ReceivedMessage.MessageId, args.ReceivedMessage.PopReceipt).ConfigureAwait(false);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal class FakeQueueServiceClientProvider : QueueServiceClientProvider
private readonly QueueServiceClient _queueServiceClient;

public FakeQueueServiceClientProvider(QueueServiceClient queueServiceClient)
: base(null, null, null, null)
: base(null, null, null, null, null)
{
_queueServiceClient = queueServiceClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ internal protected virtual async Task CompleteProcessingMessageAsync(QueueMessag
}
}

private async Task HandlePoisonMessageAsync(QueueMessage message, CancellationToken cancellationToken)
internal async Task HandlePoisonMessageAsync(QueueMessage message, CancellationToken cancellationToken)
{
if (_poisonQueue != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public virtual TClient GetHost()
return this.Get(null);
}

private TClientOptions CreateClientOptions(IConfiguration configuration)
protected virtual TClientOptions CreateClientOptions(IConfiguration configuration)
{
var clientOptions = (TClientOptions) _componentFactory.CreateClientOptions(typeof(TClientOptions), null, configuration);
clientOptions.Diagnostics.ApplicationId ??= "AzureWebJobs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ internal static QueueProcessor CreateQueueProcessor(QueueClient queue, QueueClie
return queueProcessor;
}

private static QueueClient CreatePoisonQueueReference(QueueServiceClient client, string name)
internal static QueueClient CreatePoisonQueueReference(QueueServiceClient client, string name)
{
Debug.Assert(client != null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,82 @@
using System;
using Azure.Core;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners;
using Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Listeners;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Queues;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Microsoft.Azure.WebJobs.Extensions.Storage.Queues
{
internal class QueueServiceClientProvider : StorageClientProvider<QueueServiceClient, QueueClientOptions>
{
private readonly QueuesOptions _queuesOptions;
private readonly ILoggerFactory _loggerFactory;
private readonly IQueueProcessorFactory _queueProcessorFactory;
private readonly SharedQueueWatcher _messageEnqueuedWatcher;

public QueueServiceClientProvider(
IConfiguration configuration,
AzureComponentFactory componentFactory,
AzureEventSourceLogForwarder logForwarder,
IOptions<QueuesOptions> queueOptions)
IOptions<QueuesOptions> queueOptions,
ILoggerFactory loggerFactory,
IQueueProcessorFactory queueProcessorFactory,
SharedQueueWatcher messageEnqueuedWatcher)
: base(configuration, componentFactory, logForwarder)
{
_queuesOptions = queueOptions?.Value;
_loggerFactory = loggerFactory;
_queueProcessorFactory = queueProcessorFactory;
_messageEnqueuedWatcher = messageEnqueuedWatcher;
}

protected override QueueServiceClient CreateClientFromConnectionString(string connectionString, QueueClientOptions options)
protected override QueueClientOptions CreateClientOptions(IConfiguration configuration)
{
var options = base.CreateClientOptions(configuration);
options.MessageEncoding = _queuesOptions.MessageEncoding;
return options;
}

protected override QueueServiceClient CreateClientFromConnectionString(string connectionString, QueueClientOptions options)
{
var originalEncoding = options.MessageEncoding;
options.MessageEncoding = QueueMessageEncoding.None;
var nonEncodingClient = new QueueServiceClient(connectionString, options);
options.MessageDecodingFailed += CreateMessageDecodingFailedHandler(nonEncodingClient);
options.MessageEncoding = originalEncoding;
return new QueueServiceClient(connectionString, options);
}

protected override QueueServiceClient CreateClientFromTokenCredential(Uri endpointUri, TokenCredential tokenCredential, QueueClientOptions options)
{
options.MessageEncoding = _queuesOptions.MessageEncoding;
var originalEncoding = options.MessageEncoding;
options.MessageEncoding = QueueMessageEncoding.None;
var nonEncodingClient = new QueueServiceClient(endpointUri, tokenCredential, options);
options.MessageDecodingFailed += CreateMessageDecodingFailedHandler(nonEncodingClient);
options.MessageEncoding = originalEncoding;
return new QueueServiceClient(endpointUri, tokenCredential, options);
}

private SyncAsyncEventHandler<QueueMessageDecodingFailedEventArgs> CreateMessageDecodingFailedHandler(QueueServiceClient nonEncodingQueueServiceClient)
{
return async (QueueMessageDecodingFailedEventArgs args) =>
{
// This event is raised only in async paths hence args.RunSynchronously is ignored.
if (args.ReceivedMessage != null)
{
var queueClient = args.Queue;
var poisonQueueClient = QueueListenerFactory.CreatePoisonQueueReference(nonEncodingQueueServiceClient, queueClient.Name);
var queueProcessor = QueueListenerFactory.CreateQueueProcessor(queueClient, poisonQueueClient, _loggerFactory, _queueProcessorFactory, _queuesOptions, _messageEnqueuedWatcher);
await queueProcessor.HandlePoisonMessageAsync(args.ReceivedMessage, args.CancellationToken).ConfigureAwait(false);
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ internal class FakeQueueServiceClientProvider : QueueServiceClientProvider
private readonly QueueServiceClient _queueServiceClient;

public FakeQueueServiceClientProvider(QueueServiceClient queueServiceClient)
: base(null, null, null, null)
: base(null, null, null, null, null, null, null)
{
_queueServiceClient = queueServiceClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@ public class AzureStorageEndToEndTests : LiveTestBase<WebJobsTestEnvironment>
private const string TestQueueNameEtag = TestArtifactsPrefix + "etag2equeue%rnd%";
private const string DoneQueueName = TestArtifactsPrefix + "donequeue%rnd%";

private const string BadMessageQueue1 = TestArtifactsPrefix + "-badmessage1-%rnd%";
private const string BadMessageQueue2 = TestArtifactsPrefix + "-badmessage2-%rnd%";
private const string BadMessageQueue = TestArtifactsPrefix + "-badmessage-%rnd%";

private static int _badMessage1Calls;
private static int _badMessage2Calls;
private static int _badMessageCalls;

private static EventWaitHandle _startWaitHandle;
private static EventWaitHandle _functionChainWaitHandle;
private QueueServiceClient _queueServiceClient;
private QueueServiceClient _queueServiceClientWithoutEncoding;
private BlobServiceClient _blobServiceClient;
private RandomNameResolver _resolver;

Expand All @@ -59,6 +58,7 @@ public void SetUp()
{
_fixture = new AzureStorageEndToEndTests.TestFixture(TestEnvironment);
_queueServiceClient = _fixture.QueueServiceClient;
_queueServiceClientWithoutEncoding = _fixture.QueueServiceClientWithoutEncoding;
_blobServiceClient = _fixture.BlobServiceClient;
}

Expand Down Expand Up @@ -138,28 +138,13 @@ public static void NotifyCompletion(
_functionChainWaitHandle.Set();
}

/// <summary>
/// We'll insert a bad message. It should get here okay. It will
/// then pass it on to the next trigger.
/// </summary>
public static void BadMessage_CloudQueueMessage(
[QueueTrigger(BadMessageQueue1)] QueueMessage badMessageIn,
[Queue(BadMessageQueue2)] out string badMessageOut,
#pragma warning disable CS0618 // Type or member is obsolete
TraceWriter log)
#pragma warning restore CS0618 // Type or member is obsolete
{
_badMessage1Calls++;
badMessageOut = badMessageIn.MessageText;
}

public static void BadMessage_String(
[QueueTrigger(BadMessageQueue2)] string message,
[QueueTrigger(BadMessageQueue)] string message,
#pragma warning disable CS0618 // Type or member is obsolete
TraceWriter log)
#pragma warning restore CS0618 // Type or member is obsolete
{
_badMessage2Calls++;
_badMessageCalls++;
}

// Uncomment the Fact attribute to run
Expand Down Expand Up @@ -220,14 +205,12 @@ private async Task EndToEndTest(bool uploadBlobBeforeHostStart)
}

[Test]
[Ignore("TODO (kasobol-msft) revisit this test when base64/BinaryData is supported in SDK")]
public async Task BadQueueMessageE2ETests()
{
// This test ensures that the host does not crash on a bad message (it previously did)
// Insert a bad message into a queue that should:
// - trigger BadMessage_CloudQueueMessage, which will put it into a second queue that will
// - trigger BadMessage_String, which should fail
// - BadMessage_String should fail repeatedly until it is moved to the poison queue
// - BadMessage_String should be transfered to poison queue.
// The test will watch that poison queue to know when to complete

// Reinitialize the name resolver to avoid conflicts
Expand All @@ -254,14 +237,12 @@ public async Task BadQueueMessageE2ETests()
// - use a GUID as the content, which is not a valid base64 string
// - pass 'true', to indicate that it is a base64 string
string messageContent = Guid.NewGuid().ToString();
// var message = new CloudQueueMessage(messageContent, true); // TODO (kasobol-msft) check this base64 thing

var queue = _queueServiceClient.GetQueueClient(_resolver.ResolveInString(BadMessageQueue1));
var queue = _queueServiceClientWithoutEncoding.GetQueueClient(_resolver.ResolveInString(BadMessageQueue));
await queue.CreateIfNotExistsAsync();
await queue.ClearMessagesAsync();

// the poison queue will end up off of the second queue
var poisonQueue = _queueServiceClient.GetQueueClient(_resolver.ResolveInString(BadMessageQueue2) + "-poison");
var poisonQueue = _queueServiceClientWithoutEncoding.GetQueueClient(_resolver.ResolveInString(BadMessageQueue) + "-poison");
await poisonQueue.DeleteIfExistsAsync();

await queue.SendMessageAsync(messageContent);
Expand All @@ -272,22 +253,8 @@ await TestHelpers.Await(async () =>
bool done = false;
if (await poisonQueue.ExistsAsync())
{
poisonMessage = (await poisonQueue.ReceiveMessagesAsync(1)).Value.FirstOrDefault();
poisonMessage = await poisonQueue.ReceiveMessageAsync();
done = poisonMessage != null;

if (done)
{
// Sleep briefly, then make sure the other message has been deleted.
// If so, trying to delete it again will throw an error.
Thread.Sleep(1000);

// The message is in the second queue
var queue2 = _queueServiceClient.GetQueueClient(_resolver.ResolveInString(BadMessageQueue2));

RequestFailedException ex = Assert.ThrowsAsync<RequestFailedException>(
() => queue2.DeleteMessageAsync(_lastMessageId, _lastMessagePopReceipt));
Assert.AreEqual("MessageNotFound", ex.ErrorCode);
}
}
var logs = loggerProvider.GetAllLogMessages();
return done;
Expand All @@ -300,8 +267,7 @@ await TestHelpers.Await(async () =>
Assert.AreEqual(messageContent, poisonMessage.MessageText);

// Make sure the functions were called correctly
Assert.AreEqual(1, _badMessage1Calls);
Assert.AreEqual(0, _badMessage2Calls);
Assert.AreEqual(0, _badMessageCalls);

// Validate Logger
var loggerErrors = loggerProvider.GetAllLogMessages().Where(l => l.Level == Microsoft.Extensions.Logging.LogLevel.Error);
Expand Down Expand Up @@ -376,6 +342,7 @@ public TestFixture(WebJobsTestEnvironment testEnvironment)

var queueOptions = new QueueClientOptions() { MessageEncoding = QueueMessageEncoding.Base64 };
this.QueueServiceClient = new QueueServiceClient(testEnvironment.PrimaryStorageAccountConnectionString, queueOptions);
this.QueueServiceClientWithoutEncoding = new QueueServiceClient(testEnvironment.PrimaryStorageAccountConnectionString);
this.BlobServiceClient = new BlobServiceClient(testEnvironment.PrimaryStorageAccountConnectionString);
}

Expand All @@ -385,6 +352,12 @@ public QueueServiceClient QueueServiceClient
private set;
}

public QueueServiceClient QueueServiceClientWithoutEncoding
{
get;
private set;
}

public BlobServiceClient BlobServiceClient
{
get;
Expand Down
Loading