Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispose processor in tests #14784

Merged
merged 1 commit into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -42,7 +41,7 @@ public async Task ProcessMessages(int numThreads, bool autoComplete)
AutoComplete = autoComplete,
MaxReceiveWaitTime = TimeSpan.FromSeconds(30)
};
var processor = client.CreateProcessor(scope.QueueName, options);
await using var processor = client.CreateProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool>[] completionSources = Enumerable
Expand Down Expand Up @@ -114,7 +113,7 @@ public async Task UserSettlingWithAutoCompleteDoesNotThrow(int numThreads)
AutoComplete = true,
MaxReceiveWaitTime = TimeSpan.FromSeconds(30)
};
var processor = client.CreateProcessor(scope.QueueName, options);
await using var processor = client.CreateProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool>[] completionSources = Enumerable
Expand Down Expand Up @@ -195,7 +194,7 @@ public async Task AutoLockRenewalWorks(int numThreads)
MaxConcurrentCalls = numThreads,
AutoComplete = false
};
var processor = client.CreateProcessor(scope.QueueName, options);
await using var processor = client.CreateProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool>[] completionSources = Enumerable
Expand Down Expand Up @@ -268,7 +267,7 @@ public async Task MaxAutoLockRenewalDurationRespected(int numThreads, int autoLo
AutoComplete = false,
MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(autoLockRenewalDuration)
};
var processor = client.CreateProcessor(scope.QueueName, options);
await using var processor = client.CreateProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool>[] completionSources = Enumerable
Expand Down Expand Up @@ -330,7 +329,7 @@ public async Task CanStopProcessingFromHandler(int numThreads)
MaxConcurrentCalls = numThreads,
ReceiveMode = ReceiveMode.ReceiveAndDelete
};
var processor = client.CreateProcessor(scope.QueueName, options);
await using var processor = client.CreateProcessor(scope.QueueName, options);
int messageProcessedCt = 0;

// stop processing halfway through
Expand Down Expand Up @@ -420,11 +419,11 @@ Task ProcessErrors(ProcessErrorEventArgs args)
}

[Test]
public void StartStopMultipleTimes()
public async Task StartStopMultipleTimes()
{
var invalidQueueName = "nonexistentqueuename";
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName);
await using ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName);
TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
processor.ProcessMessageAsync += eventArgs => Task.CompletedTask;
processor.ProcessErrorAsync += eventArgs => Task.CompletedTask;
Expand Down Expand Up @@ -455,7 +454,7 @@ public async Task CannotAddHandlerWhileProcessorIsRunning()
{
await using var client = GetClient();

var processor = client.CreateProcessor(scope.QueueName);
await using var processor = client.CreateProcessor(scope.QueueName);

Func<ProcessMessageEventArgs, Task> eventHandler = eventArgs => Task.CompletedTask;
Func<ProcessErrorEventArgs, Task> errorHandler = eventArgs => Task.CompletedTask;
Expand Down Expand Up @@ -488,7 +487,7 @@ public async Task StopProcessingDoesNotCancelAutoCompletion()
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage());
var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
{
AutoComplete = true
});
Expand Down Expand Up @@ -527,7 +526,7 @@ public async Task UserCallbackThrowingCausesMessageToBeAbandonedIfNotSettled(str
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage());
var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
{
AutoComplete = true
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public async Task CannotRemoveHandlersWhileProcessorIsRunning()
{
await using var client = GetClient();

var processor = client.CreateSessionProcessor(scope.QueueName);
await using var processor = client.CreateSessionProcessor(scope.QueueName);

Func<ProcessSessionMessageEventArgs, Task> eventHandler = eventArgs => Task.CompletedTask;
Func<ProcessErrorEventArgs, Task> errorHandler = eventArgs => Task.CompletedTask;
Expand Down Expand Up @@ -65,7 +65,7 @@ public async Task CannotAddHandlersWhileProcessorIsRunning()
{
await using var client = GetClient();

var processor = client.CreateSessionProcessor(scope.QueueName);
await using var processor = client.CreateSessionProcessor(scope.QueueName);

Func<ProcessSessionMessageEventArgs, Task> eventHandler = eventArgs => Task.CompletedTask;
Func<ProcessErrorEventArgs, Task> errorHandler = eventArgs => Task.CompletedTask;
Expand Down Expand Up @@ -120,7 +120,7 @@ public async Task ProcessSessionMessage(int numThreads, bool autoComplete)
MaxConcurrentSessions = numThreads,
AutoComplete = autoComplete
};
var processor = client.CreateSessionProcessor(scope.QueueName, options);
await using var processor = client.CreateSessionProcessor(scope.QueueName, options);
int messageCt = 0;
int sessionOpenEventCt = 0;
int sessionCloseEventCt = 0;
Expand Down Expand Up @@ -216,7 +216,7 @@ await sender.SendMessageAsync(new ServiceBusMessage()
MaxAutoLockRenewalDuration = lockDuration,
AutoComplete = false
};
var processor = client.CreateSessionProcessor(scope.QueueName, options);
await using var processor = client.CreateSessionProcessor(scope.QueueName, options);
bool receivedDelayMsg = false;
List<string> receivedMessages = new List<string>();
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -276,7 +276,7 @@ public async Task UserSettlingWithAutoCompleteDoesNotThrow(int numThreads)
MaxConcurrentSessions = numThreads,
AutoComplete = true
};
var processor = client.CreateSessionProcessor(scope.QueueName, options);
await using var processor = client.CreateSessionProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
Expand Down Expand Up @@ -362,7 +362,7 @@ public async Task ProcessConsumesAllMessages(int numThreads, bool autoComplete)
AutoComplete = autoComplete
};

ServiceBusSessionProcessor processor = GetNoRetryClient().CreateSessionProcessor(scope.QueueName, options);
await using ServiceBusSessionProcessor processor = GetNoRetryClient().CreateSessionProcessor(scope.QueueName, options);

processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;
Expand Down Expand Up @@ -423,7 +423,7 @@ public async Task OnSessionExceptionHandlerCalledWhenRegisteredOnNonSessionfulQu
var exceptionReceivedHandlerCalled = false;
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);

var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 1
});
Expand Down Expand Up @@ -465,7 +465,7 @@ public async Task OnSessionExceptionHandlerCalledWhenRegisteredOnNonSessionfulTo
{
var exceptionReceivedHandlerCalled = false;
var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString);
var processor = client.CreateSessionProcessor(scope.TopicName, scope.SubscriptionNames.First(), new ServiceBusSessionProcessorOptions
await using var processor = client.CreateSessionProcessor(scope.TopicName, scope.SubscriptionNames.First(), new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 1
});
Expand Down Expand Up @@ -535,7 +535,7 @@ public async Task ProcessSessionMessageUsingNamedSessionId(int numThreads, bool
SessionIds = new string[] { sessionId }
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);

Expand Down Expand Up @@ -605,7 +605,7 @@ public async Task AutoLockRenewalWorks(int numThreads, int maxCallsPerSession)
AutoComplete = false,
MaxConcurrentCallsPerSession = maxCallsPerSession
};
var processor = client.CreateSessionProcessor(scope.QueueName, options);
await using var processor = client.CreateSessionProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool>[] completionSources = Enumerable
Expand Down Expand Up @@ -682,7 +682,7 @@ public async Task MaxAutoLockRenewalDurationRespected(
AutoComplete = false,
MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(autoLockRenewalDuration)
};
var processor = client.CreateSessionProcessor(scope.QueueName, options);
await using var processor = client.CreateSessionProcessor(scope.QueueName, options);
int messageCt = 0;

TaskCompletionSource<bool>[] completionSources = Enumerable
Expand Down Expand Up @@ -759,7 +759,7 @@ public async Task StopProcessingDoesNotCancelAutoCompletion()
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage("sessionId"));
var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
{
AutoComplete = true
});
Expand Down Expand Up @@ -799,7 +799,7 @@ public async Task UserCallbackThrowingCausesMessageToBeAbandonedIfNotSettled(str
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage("sessionId"));
var processor = client.CreateSessionProcessor(scope.QueueName);
await using var processor = client.CreateSessionProcessor(scope.QueueName);
var tcs = new TaskCompletionSource<bool>();

async Task ProcessMessage(ProcessSessionMessageEventArgs args)
Expand Down Expand Up @@ -920,7 +920,7 @@ public async Task ProcessMessagesFromMultipleNamedSessions(
SessionIds = sessionIds.ToArray()
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);

Expand Down Expand Up @@ -1018,7 +1018,7 @@ public async Task SessionLockLostWhenProcessSessionMessages(int numSessions, int
SessionIds = sessionIds.ToArray()
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);

Expand Down Expand Up @@ -1161,7 +1161,7 @@ public async Task UserErrorHandlerInvokedOnceIfSessionLockLost()
SessionIds = new string[] { sessionId }
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);

Expand Down Expand Up @@ -1268,7 +1268,7 @@ public async Task ErrorSourceRespected(ServiceBusErrorSource errorSource)
SessionIds = new string[] { sessionId }
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);

Expand Down Expand Up @@ -1441,7 +1441,7 @@ public async Task SessionOpenEventDoesNotLoseLock()
SessionIds = new string[] { sessionId }
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);

Expand Down Expand Up @@ -1513,7 +1513,7 @@ public async Task MaxCallsPerSessionRespected(int numSessions, int maxConcurrent
MaxConcurrentCallsPerSession = maxCallsPerSession
};

var processor = client.CreateSessionProcessor(
await using var processor = client.CreateSessionProcessor(
scope.QueueName,
options);
ConcurrentDictionary<string, int> sessionDict = new ConcurrentDictionary<string, int>();
Expand Down Expand Up @@ -1582,7 +1582,7 @@ public async Task StopProcessingDoesNotCloseLink()
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage("sessionId"));
var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
{
AutoComplete = false,
MaxConcurrentSessions = 1
Expand Down