diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs index dbe85f404c71..a65a9c4b73e1 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs @@ -429,47 +429,52 @@ public virtual async Task StartProcessingAsync( CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); - if (ActiveReceiveTask == null) + bool releaseGuard = false; + try { - Logger.StartProcessingStart(Identifier); - bool releaseGuard = false; + await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + releaseGuard = true; - try + if (ActiveReceiveTask == null) { - await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - releaseGuard = true; - ValidateMessageHandler(); - ValidateErrorHandler(); - cancellationToken.ThrowIfCancellationRequested(); + Logger.StartProcessingStart(Identifier); - InitializeReceiverManagers(); + try + { + ValidateMessageHandler(); + ValidateErrorHandler(); + cancellationToken.ThrowIfCancellationRequested(); - // We expect the token source to be null, but we are playing safe. + InitializeReceiverManagers(); - RunningTaskTokenSource?.Cancel(); - RunningTaskTokenSource?.Dispose(); - RunningTaskTokenSource = new CancellationTokenSource(); + // We expect the token source to be null, but we are playing safe. - // Start the main running task. - ActiveReceiveTask = RunReceiveTaskAsync(RunningTaskTokenSource.Token); - } - catch (Exception exception) - { - Logger.StartProcessingException(Identifier, exception.ToString()); - throw; - } - finally - { - if (releaseGuard) + RunningTaskTokenSource?.Cancel(); + RunningTaskTokenSource?.Dispose(); + RunningTaskTokenSource = new CancellationTokenSource(); + + // Start the main running task. + ActiveReceiveTask = RunReceiveTaskAsync(RunningTaskTokenSource.Token); + } + catch (Exception exception) { - ProcessingStartStopSemaphore.Release(); + Logger.StartProcessingException(Identifier, exception.ToString()); + throw; } + + Logger.StartProcessingComplete(Identifier); + } + else + { + throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation); } - Logger.StartProcessingComplete(Identifier); } - else + finally { - throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation); + if (releaseGuard) + { + ProcessingStartStopSemaphore.Release(); + } } } @@ -554,19 +559,18 @@ private void ValidateMessageHandler() /// A instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the will keep running. public virtual async Task StopProcessingAsync(CancellationToken cancellationToken = default) { + cancellationToken.ThrowIfCancellationRequested(); bool releaseGuard = false; try { + await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + releaseGuard = true; + if (ActiveReceiveTask != null) { Logger.StopProcessingStart(Identifier); cancellationToken.ThrowIfCancellationRequested(); - await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - releaseGuard = true; - - cancellationToken.ThrowIfCancellationRequested(); - // Cancel the current running task. RunningTaskTokenSource.Cancel(); @@ -594,6 +598,10 @@ await receiverManager.CloseReceiverIfNeeded( .ConfigureAwait(false); } } + else + { + throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation); + } } catch (Exception exception) { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.Designer.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.Designer.cs index a501bc3fd7f0..afabc34e5d35 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.Designer.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.Designer.cs @@ -357,6 +357,15 @@ internal static string MessageLockLost { } } + /// + /// Looks up a localized string similar to The message processor is not currently running. It needs to be started before it can be stopped.. + /// + internal static string MessageProcessorIsNotRunning { + get { + return ResourceManager.GetString("MessageProcessorIsNotRunning", resourceCulture); + } + } + /// /// Looks up a localized string similar to The message (id:{0}, size:{1} bytes) is larger than is currently allowed ({2} bytes).. /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.resx b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.resx index da67f889361c..aca585e42e7f 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.resx +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.resx @@ -294,4 +294,7 @@ When sending via a different entity, an entity path is not allowed to specified in the connection string. - + + The message processor is not currently running. It needs to be started before it can be stopped. + + \ No newline at end of file diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs index 3acc8c8614c7..d154665b0c74 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs @@ -1303,11 +1303,11 @@ async Task MessageHandler(ProcessMessageEventArgs arg) } await processor.StartProcessingAsync(); - var cts = new CancellationTokenSource(); - cts.Cancel(); + await processor.StopProcessingAsync(); + Assert.That( - async () => await processor.StopProcessingAsync(cts.Token), - Throws.InstanceOf()); + async () => await processor.StopProcessingAsync(), + Throws.InstanceOf()); mockLogger .Verify( @@ -1320,9 +1320,6 @@ async Task MessageHandler(ProcessMessageEventArgs arg) processor.Identifier, It.IsAny()), Times.Once); - - // actually stop processing - await processor.StopProcessingAsync(); } private Mock GetMockConnection(Mock mockTransportReceiver) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs index e6c6f21a14c1..c8a444541952 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System; +using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; @@ -414,6 +415,35 @@ Task ProcessErrors(ProcessErrorEventArgs args) await processor.StopProcessingAsync(); } + [Test] + public void StartStopMultipleTimes() + { + var invalidQueueName = "nonexistentqueuename"; + var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName); + TaskCompletionSource taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + processor.ProcessMessageAsync += eventArgs => Task.CompletedTask; + processor.ProcessErrorAsync += eventArgs => Task.CompletedTask; + + var startTasks = new List + { + processor.StartProcessingAsync(), + processor.StartProcessingAsync() + }; + Assert.That( + async () => await Task.WhenAll(startTasks), + Throws.InstanceOf()); + + var stopTasks = new List() + { + processor.StopProcessingAsync(), + processor.StopProcessingAsync() + }; + Assert.That( + async () => await Task.WhenAll(stopTasks), + Throws.InstanceOf()); + } + [Test] public async Task CannotAddHandlerWhileProcessorIsRunning() {