diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/QueueServiceClientProvider.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/QueueServiceClientProvider.cs index aaf00d65ddfb..31f4ca14a111 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/QueueServiceClientProvider.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/QueueServiceClientProvider.cs @@ -2,12 +2,15 @@ // Licensed under the MIT License. using System; +using System.Threading.Tasks; using Azure.Core; using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; using Microsoft.Azure.WebJobs.Extensions.Storage.Common; using Microsoft.Azure.WebJobs.Host; using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs @@ -15,27 +18,48 @@ namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs internal class QueueServiceClientProvider : StorageClientProvider { private readonly QueuesOptions _queuesOptions; + private readonly ILogger _logger; public QueueServiceClientProvider( IConfiguration configuration, AzureComponentFactory componentFactory, AzureEventSourceLogForwarder logForwarder, - IOptions queueOptions) + IOptions queueOptions, + ILogger logger) : base(configuration, componentFactory, logForwarder) { _queuesOptions = queueOptions?.Value; + _logger = logger; } - protected override QueueServiceClient CreateClientFromConnectionString(string connectionString, QueueClientOptions options) + protected override QueueClientOptions CreateClientOptions(IConfiguration configuration) { + var options = base.CreateClientOptions(configuration); options.MessageEncoding = _queuesOptions.MessageEncoding; + options.MessageDecodingFailed += HandleMessageDecodingFailed; + return options; + } + + protected override QueueServiceClient CreateClientFromConnectionString(string connectionString, QueueClientOptions options) + { return new QueueServiceClient(connectionString, options); } protected override QueueServiceClient CreateClientFromTokenCredential(Uri endpointUri, TokenCredential tokenCredential, QueueClientOptions options) { - options.MessageEncoding = _queuesOptions.MessageEncoding; return new QueueServiceClient(endpointUri, tokenCredential, options); } + + private async Task HandleMessageDecodingFailed(QueueMessageDecodingFailedEventArgs args) + { + // SharedBlobQueueProcessor moves to poison queue only if message is parsable and has corresponding registration. + // Therefore, we log and discard garbage here. + if (args.ReceivedMessage != null) + { + _logger.LogWarning("Invalid message in blob trigger queue {QueueName}, messageId={messageId}, body={body}", + args.Queue.Name, args.ReceivedMessage.MessageId, args.ReceivedMessage.Body.ToString()); + await args.Queue.DeleteMessageAsync(args.ReceivedMessage.MessageId, args.ReceivedMessage.PopReceipt).ConfigureAwait(false); + } + } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/FakeQueueServiceClientProvider.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/FakeQueueServiceClientProvider.cs index 6c8600f4339c..c1b897e884c0 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/FakeQueueServiceClientProvider.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/FakeQueueServiceClientProvider.cs @@ -14,7 +14,7 @@ internal class FakeQueueServiceClientProvider : QueueServiceClientProvider private readonly QueueServiceClient _queueServiceClient; public FakeQueueServiceClientProvider(QueueServiceClient queueServiceClient) - : base(null, null, null, null) + : base(null, null, null, null, null) { _queueServiceClient = queueServiceClient; } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueProcessor.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueProcessor.cs index c71f31de4789..c32e3c3a0c1c 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueProcessor.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueProcessor.cs @@ -110,7 +110,7 @@ internal protected virtual async Task CompleteProcessingMessageAsync(QueueMessag } } - private async Task HandlePoisonMessageAsync(QueueMessage message, CancellationToken cancellationToken) + internal async Task HandlePoisonMessageAsync(QueueMessage message, CancellationToken cancellationToken) { if (_poisonQueue != null) { diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/StorageClientProvider.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/StorageClientProvider.cs index 0200dfa6b353..35604993e907 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/StorageClientProvider.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/StorageClientProvider.cs @@ -84,7 +84,7 @@ public virtual TClient GetHost() return this.Get(null); } - private TClientOptions CreateClientOptions(IConfiguration configuration) + protected virtual TClientOptions CreateClientOptions(IConfiguration configuration) { var clientOptions = (TClientOptions) _componentFactory.CreateClientOptions(typeof(TClientOptions), null, configuration); clientOptions.Diagnostics.ApplicationId ??= "AzureWebJobs"; diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/Listeners/QueueListenerFactory.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/Listeners/QueueListenerFactory.cs index 30a6362f580b..9869661ea002 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/Listeners/QueueListenerFactory.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/Listeners/QueueListenerFactory.cs @@ -93,7 +93,7 @@ internal static QueueProcessor CreateQueueProcessor(QueueClient queue, QueueClie return queueProcessor; } - private static QueueClient CreatePoisonQueueReference(QueueServiceClient client, string name) + internal static QueueClient CreatePoisonQueueReference(QueueServiceClient client, string name) { Debug.Assert(client != null); diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/QueueServiceClientProvider.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/QueueServiceClientProvider.cs index f2001f85fb74..0ecb97a14961 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/QueueServiceClientProvider.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/QueueServiceClientProvider.cs @@ -4,10 +4,15 @@ using System; using Azure.Core; using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; using Microsoft.Azure.WebJobs.Extensions.Storage.Common; +using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners; +using Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Listeners; using Microsoft.Azure.WebJobs.Host; +using Microsoft.Azure.WebJobs.Host.Queues; using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace Microsoft.Azure.WebJobs.Extensions.Storage.Queues @@ -15,27 +20,66 @@ namespace Microsoft.Azure.WebJobs.Extensions.Storage.Queues internal class QueueServiceClientProvider : StorageClientProvider { private readonly QueuesOptions _queuesOptions; + private readonly ILoggerFactory _loggerFactory; + private readonly IQueueProcessorFactory _queueProcessorFactory; + private readonly SharedQueueWatcher _messageEnqueuedWatcher; public QueueServiceClientProvider( IConfiguration configuration, AzureComponentFactory componentFactory, AzureEventSourceLogForwarder logForwarder, - IOptions queueOptions) + IOptions queueOptions, + ILoggerFactory loggerFactory, + IQueueProcessorFactory queueProcessorFactory, + SharedQueueWatcher messageEnqueuedWatcher) : base(configuration, componentFactory, logForwarder) { _queuesOptions = queueOptions?.Value; + _loggerFactory = loggerFactory; + _queueProcessorFactory = queueProcessorFactory; + _messageEnqueuedWatcher = messageEnqueuedWatcher; } - protected override QueueServiceClient CreateClientFromConnectionString(string connectionString, QueueClientOptions options) + protected override QueueClientOptions CreateClientOptions(IConfiguration configuration) { + var options = base.CreateClientOptions(configuration); options.MessageEncoding = _queuesOptions.MessageEncoding; + return options; + } + + protected override QueueServiceClient CreateClientFromConnectionString(string connectionString, QueueClientOptions options) + { + var originalEncoding = options.MessageEncoding; + options.MessageEncoding = QueueMessageEncoding.None; + var nonEncodingClient = new QueueServiceClient(connectionString, options); + options.MessageDecodingFailed += CreateMessageDecodingFailedHandler(nonEncodingClient); + options.MessageEncoding = originalEncoding; return new QueueServiceClient(connectionString, options); } protected override QueueServiceClient CreateClientFromTokenCredential(Uri endpointUri, TokenCredential tokenCredential, QueueClientOptions options) { - options.MessageEncoding = _queuesOptions.MessageEncoding; + var originalEncoding = options.MessageEncoding; + options.MessageEncoding = QueueMessageEncoding.None; + var nonEncodingClient = new QueueServiceClient(endpointUri, tokenCredential, options); + options.MessageDecodingFailed += CreateMessageDecodingFailedHandler(nonEncodingClient); + options.MessageEncoding = originalEncoding; return new QueueServiceClient(endpointUri, tokenCredential, options); } + + private SyncAsyncEventHandler CreateMessageDecodingFailedHandler(QueueServiceClient nonEncodingQueueServiceClient) + { + return async (QueueMessageDecodingFailedEventArgs args) => + { + // This event is raised only in async paths hence args.RunSynchronously is ignored. + if (args.ReceivedMessage != null) + { + var queueClient = args.Queue; + var poisonQueueClient = QueueListenerFactory.CreatePoisonQueueReference(nonEncodingQueueServiceClient, queueClient.Name); + var queueProcessor = QueueListenerFactory.CreateQueueProcessor(queueClient, poisonQueueClient, _loggerFactory, _queueProcessorFactory, _queuesOptions, _messageEnqueuedWatcher); + await queueProcessor.HandlePoisonMessageAsync(args.ReceivedMessage, args.CancellationToken).ConfigureAwait(false); + } + }; + } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/FakeQueueServiceClientProvider.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/FakeQueueServiceClientProvider.cs index ec5b21c87c83..7de9ac565498 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/FakeQueueServiceClientProvider.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/FakeQueueServiceClientProvider.cs @@ -12,7 +12,7 @@ internal class FakeQueueServiceClientProvider : QueueServiceClientProvider private readonly QueueServiceClient _queueServiceClient; public FakeQueueServiceClientProvider(QueueServiceClient queueServiceClient) - : base(null, null, null, null) + : base(null, null, null, null, null, null, null) { _queueServiceClient = queueServiceClient; } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/AzureStorageEndToEndTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/AzureStorageEndToEndTests.cs index 00e627530607..8441d1ce0c8b 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/AzureStorageEndToEndTests.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/AzureStorageEndToEndTests.cs @@ -37,15 +37,14 @@ public class AzureStorageEndToEndTests : LiveTestBase private const string TestQueueNameEtag = TestArtifactsPrefix + "etag2equeue%rnd%"; private const string DoneQueueName = TestArtifactsPrefix + "donequeue%rnd%"; - private const string BadMessageQueue1 = TestArtifactsPrefix + "-badmessage1-%rnd%"; - private const string BadMessageQueue2 = TestArtifactsPrefix + "-badmessage2-%rnd%"; + private const string BadMessageQueue = TestArtifactsPrefix + "-badmessage-%rnd%"; - private static int _badMessage1Calls; - private static int _badMessage2Calls; + private static int _badMessageCalls; private static EventWaitHandle _startWaitHandle; private static EventWaitHandle _functionChainWaitHandle; private QueueServiceClient _queueServiceClient; + private QueueServiceClient _queueServiceClientWithoutEncoding; private BlobServiceClient _blobServiceClient; private RandomNameResolver _resolver; @@ -59,6 +58,7 @@ public void SetUp() { _fixture = new AzureStorageEndToEndTests.TestFixture(TestEnvironment); _queueServiceClient = _fixture.QueueServiceClient; + _queueServiceClientWithoutEncoding = _fixture.QueueServiceClientWithoutEncoding; _blobServiceClient = _fixture.BlobServiceClient; } @@ -138,28 +138,13 @@ public static void NotifyCompletion( _functionChainWaitHandle.Set(); } - /// - /// We'll insert a bad message. It should get here okay. It will - /// then pass it on to the next trigger. - /// - public static void BadMessage_CloudQueueMessage( - [QueueTrigger(BadMessageQueue1)] QueueMessage badMessageIn, - [Queue(BadMessageQueue2)] out string badMessageOut, -#pragma warning disable CS0618 // Type or member is obsolete - TraceWriter log) -#pragma warning restore CS0618 // Type or member is obsolete - { - _badMessage1Calls++; - badMessageOut = badMessageIn.MessageText; - } - public static void BadMessage_String( - [QueueTrigger(BadMessageQueue2)] string message, + [QueueTrigger(BadMessageQueue)] string message, #pragma warning disable CS0618 // Type or member is obsolete TraceWriter log) #pragma warning restore CS0618 // Type or member is obsolete { - _badMessage2Calls++; + _badMessageCalls++; } // Uncomment the Fact attribute to run @@ -220,14 +205,12 @@ private async Task EndToEndTest(bool uploadBlobBeforeHostStart) } [Test] - [Ignore("TODO (kasobol-msft) revisit this test when base64/BinaryData is supported in SDK")] public async Task BadQueueMessageE2ETests() { // This test ensures that the host does not crash on a bad message (it previously did) // Insert a bad message into a queue that should: - // - trigger BadMessage_CloudQueueMessage, which will put it into a second queue that will // - trigger BadMessage_String, which should fail - // - BadMessage_String should fail repeatedly until it is moved to the poison queue + // - BadMessage_String should be transfered to poison queue. // The test will watch that poison queue to know when to complete // Reinitialize the name resolver to avoid conflicts @@ -254,14 +237,12 @@ public async Task BadQueueMessageE2ETests() // - use a GUID as the content, which is not a valid base64 string // - pass 'true', to indicate that it is a base64 string string messageContent = Guid.NewGuid().ToString(); - // var message = new CloudQueueMessage(messageContent, true); // TODO (kasobol-msft) check this base64 thing - var queue = _queueServiceClient.GetQueueClient(_resolver.ResolveInString(BadMessageQueue1)); + var queue = _queueServiceClientWithoutEncoding.GetQueueClient(_resolver.ResolveInString(BadMessageQueue)); await queue.CreateIfNotExistsAsync(); await queue.ClearMessagesAsync(); - // the poison queue will end up off of the second queue - var poisonQueue = _queueServiceClient.GetQueueClient(_resolver.ResolveInString(BadMessageQueue2) + "-poison"); + var poisonQueue = _queueServiceClientWithoutEncoding.GetQueueClient(_resolver.ResolveInString(BadMessageQueue) + "-poison"); await poisonQueue.DeleteIfExistsAsync(); await queue.SendMessageAsync(messageContent); @@ -272,22 +253,8 @@ await TestHelpers.Await(async () => bool done = false; if (await poisonQueue.ExistsAsync()) { - poisonMessage = (await poisonQueue.ReceiveMessagesAsync(1)).Value.FirstOrDefault(); + poisonMessage = await poisonQueue.ReceiveMessageAsync(); done = poisonMessage != null; - - if (done) - { - // Sleep briefly, then make sure the other message has been deleted. - // If so, trying to delete it again will throw an error. - Thread.Sleep(1000); - - // The message is in the second queue - var queue2 = _queueServiceClient.GetQueueClient(_resolver.ResolveInString(BadMessageQueue2)); - - RequestFailedException ex = Assert.ThrowsAsync( - () => queue2.DeleteMessageAsync(_lastMessageId, _lastMessagePopReceipt)); - Assert.AreEqual("MessageNotFound", ex.ErrorCode); - } } var logs = loggerProvider.GetAllLogMessages(); return done; @@ -300,8 +267,7 @@ await TestHelpers.Await(async () => Assert.AreEqual(messageContent, poisonMessage.MessageText); // Make sure the functions were called correctly - Assert.AreEqual(1, _badMessage1Calls); - Assert.AreEqual(0, _badMessage2Calls); + Assert.AreEqual(0, _badMessageCalls); // Validate Logger var loggerErrors = loggerProvider.GetAllLogMessages().Where(l => l.Level == Microsoft.Extensions.Logging.LogLevel.Error); @@ -376,6 +342,7 @@ public TestFixture(WebJobsTestEnvironment testEnvironment) var queueOptions = new QueueClientOptions() { MessageEncoding = QueueMessageEncoding.Base64 }; this.QueueServiceClient = new QueueServiceClient(testEnvironment.PrimaryStorageAccountConnectionString, queueOptions); + this.QueueServiceClientWithoutEncoding = new QueueServiceClient(testEnvironment.PrimaryStorageAccountConnectionString); this.BlobServiceClient = new BlobServiceClient(testEnvironment.PrimaryStorageAccountConnectionString); } @@ -385,6 +352,12 @@ public QueueServiceClient QueueServiceClient private set; } + public QueueServiceClient QueueServiceClientWithoutEncoding + { + get; + private set; + } + public BlobServiceClient BlobServiceClient { get; diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/BlobTriggerEndToEndTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/BlobTriggerEndToEndTests.cs index f89502e1dd6b..3971c4445a44 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/BlobTriggerEndToEndTests.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/BlobTriggerEndToEndTests.cs @@ -10,8 +10,10 @@ using Azure.Core.TestFramework; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Specialized; +using Azure.Storage.Queues; using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Tests; using Microsoft.Azure.WebJobs.Host; +using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Newtonsoft.Json.Linq; @@ -37,6 +39,7 @@ public class BlobTriggerEndToEndTests : LiveTestBase, ID private readonly BlobContainerClient _testContainer; private readonly BlobServiceClient _blobServiceClient; + private readonly QueueServiceClient _queueServiceClient; private readonly RandomNameResolver _nameResolver; private static object _syncLock = new object(); @@ -53,6 +56,7 @@ public BlobTriggerEndToEndTests() }) .Build(); _blobServiceClient = new BlobServiceClient(TestEnvironment.PrimaryStorageAccountConnectionString); + _queueServiceClient = new QueueServiceClient(TestEnvironment.PrimaryStorageAccountConnectionString); // No encoding _testContainer = _blobServiceClient.GetBlobContainerClient(_nameResolver.ResolveInString(SingleTriggerContainerName)); Assert.False(_testContainer.ExistsAsync().Result); _testContainer.CreateAsync().Wait(); @@ -275,6 +279,43 @@ public async Task BlobChainTest() } } + [Test] + public async Task GarbageMessageDoesNotCrashHost() + { + // write the initial trigger blob to start the chain + var container = _blobServiceClient.GetBlobContainerClient(_nameResolver.ResolveInString(BlobChainContainerName)); + await container.CreateIfNotExistsAsync(); + var blob = container.GetBlockBlobClient(BlobChainTriggerBlobName); + await blob.UploadTextAsync("0"); + + var prog = new BlobChainTest_Program(); + var host = NewBuilder(prog).Build(); + + var hostId = await host.Services.GetService().GetHostIdAsync(CancellationToken.None); + string hostBlobTriggerQueueName = "azure-webjobs-blobtrigger-" + hostId; + + var blobTriggerQueue = _queueServiceClient.GetQueueClient(hostBlobTriggerQueueName); + await blobTriggerQueue.CreateIfNotExistsAsync(); + + // Inject garbage message, i.e. not base-64 encoded + await blobTriggerQueue.SendMessageAsync("KABOOM"); + + using (prog._completedEvent = new ManualResetEvent(initialState: false)) + using (host) + { + host.Start(); + Assert.True(prog._completedEvent.WaitOne(TimeSpan.FromSeconds(60))); + + // make sure it was logged. + string invalidMessageLog = host.GetTestLoggerProvider().GetAllLogMessages() + .Where(p => p.FormattedMessage != null) + .SelectMany(p => p.FormattedMessage.Split(Environment.NewLine.ToCharArray(), StringSplitOptions.None)) + .Where(s => s.Contains("KABOOM")) + .FirstOrDefault(); + StringAssert.Contains("KABOOM", invalidMessageLog); + } + } + [Test] public async Task BlobGetsProcessedOnlyOnce_MultipleHosts() {