Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processor bug #14280

Merged
merged 2 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -429,47 +429,52 @@ public virtual async Task StartProcessingAsync(
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
if (ActiveReceiveTask == null)
bool releaseGuard = false;
try
{
Logger.StartProcessingStart(Identifier);
bool releaseGuard = false;
await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
releaseGuard = true;

try
if (ActiveReceiveTask == null)
{
await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
releaseGuard = true;
ValidateMessageHandler();
ValidateErrorHandler();
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Logger.StartProcessingStart(Identifier);

InitializeReceiverManagers();
try
{
ValidateMessageHandler();
ValidateErrorHandler();
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

// We expect the token source to be null, but we are playing safe.
InitializeReceiverManagers();

RunningTaskTokenSource?.Cancel();
RunningTaskTokenSource?.Dispose();
RunningTaskTokenSource = new CancellationTokenSource();
// We expect the token source to be null, but we are playing safe.

// Start the main running task.
ActiveReceiveTask = RunReceiveTaskAsync(RunningTaskTokenSource.Token);
}
catch (Exception exception)
{
Logger.StartProcessingException(Identifier, exception.ToString());
throw;
}
finally
{
if (releaseGuard)
RunningTaskTokenSource?.Cancel();
RunningTaskTokenSource?.Dispose();
RunningTaskTokenSource = new CancellationTokenSource();

// Start the main running task.
ActiveReceiveTask = RunReceiveTaskAsync(RunningTaskTokenSource.Token);
}
catch (Exception exception)
{
ProcessingStartStopSemaphore.Release();
Logger.StartProcessingException(Identifier, exception.ToString());
throw;
}

Logger.StartProcessingComplete(Identifier);
}
else
{
throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation);
}
Logger.StartProcessingComplete(Identifier);
}
else
finally
{
throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation);
if (releaseGuard)
{
ProcessingStartStopSemaphore.Release();
}
}
}

Expand Down Expand Up @@ -554,19 +559,18 @@ private void ValidateMessageHandler()
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the <see cref="ServiceBusProcessor" /> will keep running.</param>
public virtual async Task StopProcessingAsync(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
bool releaseGuard = false;
try
{
await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
releaseGuard = true;

if (ActiveReceiveTask != null)
{
Logger.StopProcessingStart(Identifier);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

await ProcessingStartStopSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
releaseGuard = true;

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

// Cancel the current running task.

RunningTaskTokenSource.Cancel();
Expand Down Expand Up @@ -594,6 +598,10 @@ await receiverManager.CloseReceiverIfNeeded(
.ConfigureAwait(false);
}
}
else
{
throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation);
}
}
catch (Exception exception)
{
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion sdk/servicebus/Azure.Messaging.ServiceBus/src/Resources.resx
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,7 @@
<data name="SendViaCannotBeUsedWithEntityInConnectionString" xml:space="preserve">
<value>When sending via a different entity, an entity path is not allowed to specified in the connection string.</value>
</data>
</root>
<data name="MessageProcessorIsNotRunning" xml:space="preserve">
<value>The message processor is not currently running. It needs to be started before it can be stopped.</value>
</data>
</root>
Original file line number Diff line number Diff line change
Expand Up @@ -1303,11 +1303,11 @@ async Task MessageHandler(ProcessMessageEventArgs arg)
}

await processor.StartProcessingAsync();
var cts = new CancellationTokenSource();
cts.Cancel();
await processor.StopProcessingAsync();

Assert.That(
async () => await processor.StopProcessingAsync(cts.Token),
Throws.InstanceOf<TaskCanceledException>());
async () => await processor.StopProcessingAsync(),
Throws.InstanceOf<InvalidOperationException>());

mockLogger
.Verify(
Expand All @@ -1320,9 +1320,6 @@ async Task MessageHandler(ProcessMessageEventArgs arg)
processor.Identifier,
It.IsAny<string>()),
Times.Once);

// actually stop processing
await processor.StopProcessingAsync();
}

private Mock<ServiceBusConnection> GetMockConnection(Mock<TransportReceiver> mockTransportReceiver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -414,6 +415,35 @@ Task ProcessErrors(ProcessErrorEventArgs args)
await processor.StopProcessingAsync();
}

[Test]
public void StartStopMultipleTimes()
{
var invalidQueueName = "nonexistentqueuename";
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName);
TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
processor.ProcessMessageAsync += eventArgs => Task.CompletedTask;
processor.ProcessErrorAsync += eventArgs => Task.CompletedTask;

var startTasks = new List<Task>
{
processor.StartProcessingAsync(),
processor.StartProcessingAsync()
};
Assert.That(
async () => await Task.WhenAll(startTasks),
Throws.InstanceOf<InvalidOperationException>());

var stopTasks = new List<Task>()
{
processor.StopProcessingAsync(),
processor.StopProcessingAsync()
};
Assert.That(
async () => await Task.WhenAll(stopTasks),
Throws.InstanceOf<InvalidOperationException>());
}

[Test]
public async Task CannotAddHandlerWhileProcessorIsRunning()
{
Expand Down