diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs
index 49c1c9966aaf..d83c3aad339d 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs
@@ -341,10 +341,10 @@ public virtual void UpdateCheckpointStart(string partitionId,
/// The name of the consumer group the checkpoint is associated with.
///
[Event(33, Level = EventLevel.Informational, Message = "Completed the attempt to create/update a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'.")]
- public virtual void UpdateCheckpointComplete(string partitionId,
- string fullyQualifiedNamespace,
- string eventHubName,
- string consumerGroup)
+ public virtual void UpdateCheckpointComplete(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup)
{
if (IsEnabled())
{
@@ -363,11 +363,11 @@ public virtual void UpdateCheckpointComplete(string partitionId,
/// The message for the exception that occurred.
///
[Event(34, Level = EventLevel.Error, Message = "An exception occurred when creating/updating a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'. ErrorMessage: '{4}'.")]
- public virtual void UpdateCheckpointError(string partitionId,
- string fullyQualifiedNamespace,
- string eventHubName,
- string consumerGroup,
- string errorMessage)
+ public virtual void UpdateCheckpointError(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string errorMessage)
{
if (IsEnabled())
{
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs
index 9056902721c5..217bac1b9a3b 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs
@@ -37,7 +37,10 @@ static BlobsCheckpointStore()
/// The name of the consumer group the ownership are associated with.
/// The amount of ownership received from the storage service.
///
- partial void ListOwnershipComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, int ownershipCount) =>
+ partial void ListOwnershipComplete(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ int ownershipCount) =>
Logger.ListOwnershipComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, ownershipCount);
///
@@ -49,7 +52,10 @@ partial void ListOwnershipComplete(string fullyQualifiedNamespace, string eventH
/// The name of the consumer group the ownership are associated with.
/// The exception that occurred.
///
- partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) =>
+ partial void ListOwnershipError(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ Exception exception) =>
Logger.ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message);
///
@@ -60,7 +66,9 @@ partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubN
/// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.
/// The name of the consumer group the ownership are associated with.
///
- partial void ListOwnershipStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup) =>
+ partial void ListOwnershipStart(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup) =>
Logger.ListOwnershipStart(fullyQualifiedNamespace, eventHubName, consumerGroup);
///
@@ -72,7 +80,10 @@ partial void ListOwnershipStart(string fullyQualifiedNamespace, string eventHubN
/// The name of the consumer group the checkpoints are associated with.
/// The amount of checkpoints received from the storage service.
///
- partial void ListCheckpointsComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, int checkpointCount) =>
+ partial void ListCheckpointsComplete(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ int checkpointCount) =>
Logger.ListCheckpointsComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, checkpointCount);
///
@@ -84,7 +95,10 @@ partial void ListCheckpointsComplete(string fullyQualifiedNamespace, string even
/// The name of the consumer group the ownership are associated with.
/// The exception that occurred.
///
- partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) =>
+ partial void ListCheckpointsError(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ Exception exception) =>
Logger.ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message);
///
@@ -96,7 +110,10 @@ partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHu
/// The name of the specific Event Hub the data is associated with, relative to the Event Hubs namespace that contains it.
/// The name of the consumer group the data is associated with.
///
- partial void InvalidCheckpointFound(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup) =>
+ partial void InvalidCheckpointFound(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup) =>
Logger.InvalidCheckpointFound(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup);
///
@@ -107,7 +124,9 @@ partial void InvalidCheckpointFound(string partitionId, string fullyQualifiedNam
/// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.
/// The name of the consumer group the checkpoints are associated with.
///
- partial void ListCheckpointsStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup) =>
+ partial void ListCheckpointsStart(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup) =>
Logger.ListCheckpointsStart(fullyQualifiedNamespace, eventHubName, consumerGroup);
///
@@ -120,7 +139,11 @@ partial void ListCheckpointsStart(string fullyQualifiedNamespace, string eventHu
/// The name of the consumer group the checkpoint is associated with.
/// The exception that occurred.
///
- partial void UpdateCheckpointError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) =>
+ partial void UpdateCheckpointError(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ Exception exception) =>
Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message);
///
@@ -132,7 +155,10 @@ partial void UpdateCheckpointError(string partitionId, string fullyQualifiedName
/// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.
/// The name of the consumer group the checkpoint is associated with.
///
- partial void UpdateCheckpointComplete(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup) =>
+ partial void UpdateCheckpointComplete(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup) =>
Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup);
///
@@ -144,7 +170,10 @@ partial void UpdateCheckpointComplete(string partitionId, string fullyQualifiedN
/// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.
/// The name of the consumer group the checkpoint is associated with.
///
- partial void UpdateCheckpointStart(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup) =>
+ partial void UpdateCheckpointStart(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup) =>
Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup);
///
@@ -157,7 +186,11 @@ partial void UpdateCheckpointStart(string partitionId, string fullyQualifiedName
/// The name of the consumer group the ownership is associated with.
/// The identifier of the processor that attempted to claim the ownership for.
///
- partial void ClaimOwnershipComplete(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier) =>
+ partial void ClaimOwnershipComplete(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string ownerIdentifier) =>
Logger.ClaimOwnershipComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier);
///
@@ -171,7 +204,12 @@ partial void ClaimOwnershipComplete(string partitionId, string fullyQualifiedNam
/// The identifier of the processor that attempted to claim the ownership for.
/// The exception that occurred.
///
- partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, Exception exception) =>
+ partial void ClaimOwnershipError(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string ownerIdentifier,
+ Exception exception) =>
Logger.ClaimOwnershipError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier, exception.Message);
///
@@ -185,7 +223,12 @@ partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamesp
/// The identifier of the processor that attempted to claim the ownership for.
/// The message for the failure.
///
- partial void OwnershipNotClaimable(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, string message) =>
+ partial void OwnershipNotClaimable(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string ownerIdentifier,
+ string message) =>
Logger.OwnershipNotClaimable(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier, message);
///
@@ -211,7 +254,11 @@ partial void OwnershipClaimed(string partitionId, string fullyQualifiedNamespace
/// The name of the consumer group the ownership is associated with.
/// The identifier of the processor that attempted to claim the ownership for.
///
- partial void ClaimOwnershipStart(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier) =>
+ partial void ClaimOwnershipStart(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string ownerIdentifier) =>
Logger.ClaimOwnershipStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier);
///
@@ -222,7 +269,9 @@ partial void ClaimOwnershipStart(string partitionId, string fullyQualifiedNamesp
/// The Storage account name corresponding to the associated container client.
/// The name of the associated container client.
///
- partial void BlobsCheckpointStoreCreated(string typeName, string accountName, string containerName) =>
+ partial void BlobsCheckpointStoreCreated(string typeName,
+ string accountName,
+ string containerName) =>
Logger.BlobsCheckpointStoreCreated(typeName, accountName, containerName);
///
@@ -234,7 +283,10 @@ partial void BlobsCheckpointStoreCreated(string typeName, string accountName, st
/// The name of the consumer group the checkpoint is associated with.
/// The partition id the specific checkpoint is associated with.
///
- partial void GetCheckpointStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId) =>
+ partial void GetCheckpointStart(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string partitionId) =>
Logger.GetCheckpointStart(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId);
///
@@ -246,7 +298,10 @@ partial void GetCheckpointStart(string fullyQualifiedNamespace, string eventHubN
/// The name of the consumer group the checkpoint is associated with.
/// The partition id the specific checkpoint is associated with.
///
- partial void GetCheckpointComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId) =>
+ partial void GetCheckpointComplete(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string partitionId) =>
Logger.GetCheckpointComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId);
///
@@ -259,7 +314,11 @@ partial void GetCheckpointComplete(string fullyQualifiedNamespace, string eventH
/// The partition id the specific checkpoint is associated with.
/// The message for the exception that occurred.
///
- partial void GetCheckpointError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, Exception exception) =>
+ partial void GetCheckpointError(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string partitionId,
+ Exception exception) =>
Logger.GetCheckpointError(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, exception.Message);
}
-}
\ No newline at end of file
+}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs
index 4b4fca6b58d4..7f70beee1e50 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs
@@ -1466,7 +1466,7 @@ public async Task GetCheckpointLogsStartAndComplete()
await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken());
mockLog.Verify(m => m.GetCheckpointStart(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0"));
- mockLog.Verify(m => m.GetCheckpointComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0"));
+ mockLog.Verify(m => m.GetCheckpointComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0"));
}
///
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageTestEnvironment.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageTestEnvironment.cs
index 19f4741375b3..8b3bbc6d42dc 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageTestEnvironment.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageTestEnvironment.cs
@@ -13,7 +13,7 @@ namespace Azure.Messaging.EventHubs.Processor.Tests
/// being run, offering access to information such as environment variables.
///
///
- public class StorageTestEnvironment: TestEnvironment
+ public class StorageTestEnvironment : TestEnvironment
{
/// The singleton instance of the , lazily created.
private static readonly Lazy Singleton = new Lazy(() => new StorageTestEnvironment(), LazyThreadSafetyMode.ExecutionAndPublication);
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs
index f83add6dff32..c5d72a888549 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs
@@ -47,7 +47,7 @@ public async Task EventsCanBeReadByOneProcessorClient(LoadBalancingStrategy load
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);
using var cancellationSource = new CancellationTokenSource();
- cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
+ cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
// Send a set of events.
@@ -79,7 +79,7 @@ public async Task EventsCanBeReadByOneProcessorClient(LoadBalancingStrategy load
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -97,7 +97,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingAnIdentityCredential()
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);
using var cancellationSource = new CancellationTokenSource();
- cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
+ cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
// Send a set of events.
@@ -129,7 +129,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingAnIdentityCredential()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -148,7 +148,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingTheSharedKeyCredential
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);
using var cancellationSource = new CancellationTokenSource();
- cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
+ cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
// Send a set of events.
@@ -180,7 +180,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingTheSharedKeyCredential
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -244,7 +244,7 @@ public async Task EventsCanBeReadByMultipleProcessorClients()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -288,7 +288,7 @@ public async Task ProcessorClientCreatesOwnership()
var processedEvents = new ConcurrentDictionary();
var completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- var storageManager = new InMemoryStorageManager(_ => {});
+ var storageManager = new InMemoryStorageManager(_ => { });
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(250) };
var processor = CreateProcessorWithIdentity(scope.ConsumerGroups.First(), scope.EventHubName, storageManager, options);
@@ -346,7 +346,7 @@ public async Task ProcessorClientCanStartFromAnInitialPosition()
await using (var consumer = new EventHubConsumerClient(scope.ConsumerGroups.First(), connectionString))
{
- await foreach (var partitionEvent in consumer.ReadEventsAsync(new ReadEventOptions { MaximumWaitTime = null }, cancellationSource.Token))
+ await foreach (var partitionEvent in consumer.ReadEventsAsync(new ReadEventOptions { MaximumWaitTime = null }, cancellationSource.Token))
{
if (partitionEvent.Data.IsEquivalentTo(lastSourceEvent))
{
@@ -393,7 +393,7 @@ public async Task ProcessorClientCanStartFromAnInitialPosition()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -438,7 +438,7 @@ public async Task ProcessorClientBeginsWithTheNextEventAfterCheckpointing()
var completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var beforeCheckpointProcessHandler = CreateEventTrackingHandler(segmentEventCount, processedEvents, completionSource, cancellationSource.Token, processedEventCallback);
var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(250) };
- var storageManager = new InMemoryStorageManager(_ => {});
+ var storageManager = new InMemoryStorageManager(_ => { });
var processor = CreateProcessor(scope.ConsumerGroups.First(), connectionString, storageManager, options);
processor.ProcessErrorAsync += CreateAssertingErrorHandler();
@@ -476,7 +476,7 @@ public async Task ProcessorClientBeginsWithTheNextEventAfterCheckpointing()
foreach (var sourceEvent in afterCheckpointEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -500,7 +500,7 @@ private EventProcessorClient CreateProcessor(string consumerGroup,
{
EventHubConnection createConnection() => new EventHubConnection(connectionString);
- storageManager ??= new InMemoryStorageManager(_=> {});
+ storageManager ??= new InMemoryStorageManager(_ => { });
return new TestEventProcessorClient(storageManager, consumerGroup, "fakeNamespace", "fakeEventHub", Mock.Of(), createConnection, options);
}
@@ -524,7 +524,7 @@ private EventProcessorClient CreateProcessorWithIdentity(string consumerGroup,
var credential = EventHubsTestEnvironment.Instance.Credential;
EventHubConnection createConnection() => new EventHubConnection(EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential);
- storageManager ??= new InMemoryStorageManager(_=> {});
+ storageManager ??= new InMemoryStorageManager(_ => { });
return new TestEventProcessorClient(storageManager, consumerGroup, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential, createConnection, options);
}
@@ -548,7 +548,7 @@ private EventProcessorClient CreateProcessorWithSharedAccessKey(string consumerG
var credential = new EventHubsSharedAccessKeyCredential(EventHubsTestEnvironment.Instance.SharedAccessKeyName, EventHubsTestEnvironment.Instance.SharedAccessKey);
EventHubConnection createConnection() => null; //new EventHubConnection(EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential);
- storageManager ??= new InMemoryStorageManager(_=> {});
+ storageManager ??= new InMemoryStorageManager(_ => { });
return new TestEventProcessorClient(storageManager, consumerGroup, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential, createConnection, options);
}
@@ -621,7 +621,7 @@ private Func CreateEventTrackingHandler(int targetCount,
if (processedEvents.Count >= targetCount)
{
- completionSource.TrySetResult(true);
+ completionSource.TrySetResult(true);
}
}
}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs
index 4d268db208f9..87a5ec601271 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs
@@ -131,27 +131,27 @@ void assertOptionsMatch(EventProcessorOptions expected,
Assert.That(actual, Is.Not.Null, $"The processor options should have been created for the { constructorDescription } constructor.");
Assert.That(actual.ConnectionOptions.TransportType, Is.EqualTo(expected.ConnectionOptions.TransportType), $"The connection options are incorrect for the { constructorDescription } constructor.");
Assert.That(actual.RetryOptions.MaximumRetries, Is.EqualTo(expected.RetryOptions.MaximumRetries), $"The retry options are incorrect for the { constructorDescription } constructor.");
- Assert.That(actual.Identifier, Is.EqualTo(expected.Identifier), $"The identifier is incorrect for the { constructorDescription } constructor.");
- Assert.That(actual.MaximumWaitTime, Is.EqualTo(expected.MaximumWaitTime), $"The maximum wait time is incorrect for the { constructorDescription } constructor.");
- Assert.That(actual.TrackLastEnqueuedEventProperties, Is.EqualTo(expected.TrackLastEnqueuedEventProperties), $"The last event tracking flag is incorrect for the { constructorDescription } constructor.");
- Assert.That(actual.DefaultStartingPosition, Is.EqualTo(expected.DefaultStartingPosition), $"The default starting position is incorrect for the { constructorDescription } constructor.");
- Assert.That(actual.LoadBalancingUpdateInterval, Is.EqualTo(expected.LoadBalancingUpdateInterval), $"The load balancing interval is incorrect for the { constructorDescription } constructor.");
- Assert.That(actual.PartitionOwnershipExpirationInterval, Is.EqualTo(expected.PartitionOwnershipExpirationInterval), $"The ownership expiration interval incorrect for the { constructorDescription } constructor.");
- Assert.That(actual.PrefetchCount, Is.EqualTo(expected.PrefetchCount), $"The prefetch count is incorrect for the { constructorDescription } constructor.");
- Assert.That(actual.PrefetchSizeInBytes, Is.EqualTo(expected.PrefetchSizeInBytes), $"The prefetch byte size is incorrect for the { constructorDescription } constructor.");
+ Assert.That(actual.Identifier, Is.EqualTo(expected.Identifier), $"The identifier is incorrect for the { constructorDescription } constructor.");
+ Assert.That(actual.MaximumWaitTime, Is.EqualTo(expected.MaximumWaitTime), $"The maximum wait time is incorrect for the { constructorDescription } constructor.");
+ Assert.That(actual.TrackLastEnqueuedEventProperties, Is.EqualTo(expected.TrackLastEnqueuedEventProperties), $"The last event tracking flag is incorrect for the { constructorDescription } constructor.");
+ Assert.That(actual.DefaultStartingPosition, Is.EqualTo(expected.DefaultStartingPosition), $"The default starting position is incorrect for the { constructorDescription } constructor.");
+ Assert.That(actual.LoadBalancingUpdateInterval, Is.EqualTo(expected.LoadBalancingUpdateInterval), $"The load balancing interval is incorrect for the { constructorDescription } constructor.");
+ Assert.That(actual.PartitionOwnershipExpirationInterval, Is.EqualTo(expected.PartitionOwnershipExpirationInterval), $"The ownership expiration interval incorrect for the { constructorDescription } constructor.");
+ Assert.That(actual.PrefetchCount, Is.EqualTo(expected.PrefetchCount), $"The prefetch count is incorrect for the { constructorDescription } constructor.");
+ Assert.That(actual.PrefetchSizeInBytes, Is.EqualTo(expected.PrefetchSizeInBytes), $"The prefetch byte size is incorrect for the { constructorDescription } constructor.");
}
var clientOptions = new EventProcessorClientOptions
{
- ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets },
- RetryOptions = new EventHubsRetryOptions { MaximumRetries = 99 },
- Identifier = "OMG, HAI!",
- MaximumWaitTime = TimeSpan.FromDays(54),
- TrackLastEnqueuedEventProperties = true,
- PrefetchCount = 5,
- PrefetchSizeInBytes = 500,
- LoadBalancingUpdateInterval = TimeSpan.FromDays(65),
- PartitionOwnershipExpirationInterval = TimeSpan.FromMilliseconds(65)
+ ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets },
+ RetryOptions = new EventHubsRetryOptions { MaximumRetries = 99 },
+ Identifier = "OMG, HAI!",
+ MaximumWaitTime = TimeSpan.FromDays(54),
+ TrackLastEnqueuedEventProperties = true,
+ PrefetchCount = 5,
+ PrefetchSizeInBytes = 500,
+ LoadBalancingUpdateInterval = TimeSpan.FromDays(65),
+ PartitionOwnershipExpirationInterval = TimeSpan.FromMilliseconds(65)
};
var expectedOptions = InvokeCreateOptions(clientOptions);
@@ -608,7 +608,7 @@ public void ProcessorAllowsRemovingEventHandlers()
processorClient.PartitionInitializingAsync += initHandler;
processorClient.PartitionClosingAsync += closeHandler;
processorClient.ProcessEventAsync += eventHandler;
- processorClient.ProcessErrorAsync +=errorHandler;
+ processorClient.ProcessErrorAsync += errorHandler;
Assert.That(() => processorClient.PartitionInitializingAsync -= initHandler, Throws.Nothing, "The initializing handler should allow removing registrations.");
Assert.That(() => processorClient.PartitionClosingAsync -= closeHandler, Throws.Nothing, "The closing handler should allow removing registrations.");
@@ -630,7 +630,8 @@ public async Task ProcessorDoesNotAllowEventHandlerChangesWhenRunning()
Func errorHandler = eventArgs => Task.CompletedTask;
var processorClient = new TestEventProcessorClient(Mock.Of(), "consumerGroup", "namespace", "eventHub", Mock.Of(), Mock.Of(), default);
- processorClient.ProcessEventAsync += eventHandler;;
+ processorClient.ProcessEventAsync += eventHandler;
+ ;
processorClient.ProcessErrorAsync += errorHandler;
// Handlers should not be allowed when the processor is running.
@@ -944,7 +945,7 @@ public async Task EventProcessingToleratesAndSurfacesMultipleExceptions()
var eventBatch = Enumerable
.Range(0, eventCount)
- .Select(index => new MockEventData(Array.Empty(), offset: 1000 + index, sequenceNumber: 2000 + index))
+ .Select(index => new MockEventData(Array.Empty(), offset: 1000 + index, sequenceNumber: 2000 + index))
.ToList();
processorClient.ProcessEventAsync += eventArgs =>
@@ -1422,16 +1423,16 @@ public void ClientOptionsCanBeTranslated()
{
var clientOptions = new EventProcessorClientOptions
{
- ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets },
- RetryOptions = new EventHubsRetryOptions { MaximumRetries = 99 },
- Identifier = "OMG, HAI!",
- MaximumWaitTime = TimeSpan.FromDays(54),
- TrackLastEnqueuedEventProperties = true,
- LoadBalancingStrategy = LoadBalancingStrategy.Greedy,
- PrefetchCount = 9990,
- PrefetchSizeInBytes = 400,
- LoadBalancingUpdateInterval = TimeSpan.FromSeconds(45),
- PartitionOwnershipExpirationInterval = TimeSpan.FromMilliseconds(44)
+ ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets },
+ RetryOptions = new EventHubsRetryOptions { MaximumRetries = 99 },
+ Identifier = "OMG, HAI!",
+ MaximumWaitTime = TimeSpan.FromDays(54),
+ TrackLastEnqueuedEventProperties = true,
+ LoadBalancingStrategy = LoadBalancingStrategy.Greedy,
+ PrefetchCount = 9990,
+ PrefetchSizeInBytes = 400,
+ LoadBalancingUpdateInterval = TimeSpan.FromSeconds(45),
+ PartitionOwnershipExpirationInterval = TimeSpan.FromMilliseconds(44)
};
var defaultOptions = new EventProcessorOptions();
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs
index 81c33dfb65ee..20e9e47dccd0 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs
@@ -49,7 +49,16 @@ internal partial class BlobsCheckpointStore : StorageManager
/// with the specified prefix.
///
///
- private const string LegacyCheckpointPrefix = "{0}/{1}/{2}/";
+ ///
+ /// This pattern is specific to the prefix used by the Azure Functions extension. The legacy
+ /// EventProcessorHost allowed this value to be specified as an option, defaulting to
+ /// an empty prefix.
+ ///
+ /// For this to be general-purpose, it will need to be refactored into an option with this
+ /// pattern passed by the Functions extension.
+ ///
+ ///
+ private const string FunctionsLegacyCheckpointPrefix = "{0}/{1}/{2}/";
///
/// Specifies a string that filters the results to return only ownership blobs whose name begins
@@ -295,7 +304,7 @@ public override async Task> ListCheckpoint
if (InitializeWithLegacyCheckpoints)
{
// Legacy checkpoints are not normalized to lowercase
- var legacyPrefix = string.Format(CultureInfo.InvariantCulture, LegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup);
+ var legacyPrefix = string.Format(CultureInfo.InvariantCulture, FunctionsLegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup);
await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(prefix: legacyPrefix, cancellationToken: cancellationToken).ConfigureAwait(false))
{
@@ -351,7 +360,11 @@ public override async Task> ListCheckpoint
///
/// A initialized with checkpoint properties if the checkpoint exists, otherwise null
.
///
- public override async Task GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, CancellationToken cancellationToken)
+ public override async Task GetCheckpointAsync(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string partitionId,
+ CancellationToken cancellationToken)
{
try
{
@@ -376,7 +389,7 @@ public override async Task GetCheckpointAsync(string f
{
if (InitializeWithLegacyCheckpoints)
{
- var legacyPrefix = string.Format(CultureInfo.InvariantCulture, LegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup) + partitionId;
+ var legacyPrefix = string.Format(CultureInfo.InvariantCulture, FunctionsLegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup) + partitionId;
return await CreateLegacyCheckpoint(fullyQualifiedNamespace, eventHubName, consumerGroup, legacyPrefix, partitionId, cancellationToken).ConfigureAwait(false);
}
}
@@ -479,7 +492,7 @@ private async Task CreateLegacyCheckpoint(string fully
await blobClient.DownloadToAsync(memoryStream, cancellationToken).ConfigureAwait(false);
if (TryReadLegacyCheckpoint(
- memoryStream.GetBuffer().AsSpan(0, (int) memoryStream.Length),
+ memoryStream.GetBuffer().AsSpan(0, (int)memoryStream.Length),
out long? offset,
out long? sequenceNumber))
{
@@ -588,8 +601,11 @@ public override async Task UpdateCheckpointAsync(EventProcessorCheckpoint checkp
/// "Offset":"8591964920",
/// "SequenceNumber":960180
/// }
- /// ///
- private static bool TryReadLegacyCheckpoint(Span data, out long? offset, out long? sequenceNumber)
+ ///
+ ///
+ private static bool TryReadLegacyCheckpoint(Span data,
+ out long? offset,
+ out long? sequenceNumber)
{
offset = null;
sequenceNumber = null;
@@ -647,6 +663,7 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
catch (JsonException)
{
// Ignore this because if the data is malformed, it will be treated as if the checkpoint didn't exist.
+
return false;
}
@@ -662,7 +679,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the consumer group the ownership are associated with.
/// The amount of ownership received from the storage service.
///
- partial void ListOwnershipComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, int ownershipCount);
+ partial void ListOwnershipComplete(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ int ownershipCount);
///
/// Indicates that an unhandled exception was encountered while retrieving a list of ownership.
@@ -673,7 +693,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the consumer group the ownership are associated with.
/// The message for the exception that occurred.
///
- partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception);
+ partial void ListOwnershipError(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ Exception exception);
///
/// Indicates that an attempt to retrieve a list of ownership has started.
@@ -683,7 +706,9 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.
/// The name of the consumer group the ownership are associated with.
///
- partial void ListOwnershipStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup);
+ partial void ListOwnershipStart(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup);
///
/// Indicates that an attempt to retrieve a list of checkpoints has completed.
@@ -694,7 +719,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the consumer group the checkpoints are associated with.
/// The amount of checkpoints received from the storage service.
///
- partial void ListCheckpointsComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, int checkpointCount);
+ partial void ListCheckpointsComplete(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ int checkpointCount);
///
/// Indicates that an unhandled exception was encountered while retrieving a list of checkpoints.
@@ -705,7 +733,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the consumer group the ownership are associated with.
/// The message for the exception that occurred.
///
- partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception);
+ partial void ListCheckpointsError(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ Exception exception);
///
/// Indicates that an attempt to retrieve a checkpoint has started.
@@ -716,7 +747,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the consumer group the checkpoint is associated with.
/// The partition id the specific checkpoint is associated with.
///
- partial void GetCheckpointStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId);
+ partial void GetCheckpointStart(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string partitionId);
///
/// Indicates that an attempt to retrieve a checkpoint has completed.
@@ -727,7 +761,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the consumer group the checkpoint is associated with.
/// The partition id the specific checkpoint is associated with.
///
- partial void GetCheckpointComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId);
+ partial void GetCheckpointComplete(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string partitionId);
///
/// Indicates that an unhandled exception was encountered while retrieving a checkpoint.
@@ -739,7 +776,11 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The partition id the specific checkpoint is associated with.
/// The message for the exception that occurred.
///
- partial void GetCheckpointError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, Exception exception);
+ partial void GetCheckpointError(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string partitionId,
+ Exception exception);
///
/// Indicates that invalid checkpoint data was found during an attempt to retrieve a list of checkpoints.
@@ -750,7 +791,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the specific Event Hub the data is associated with, relative to the Event Hubs namespace that contains it.
/// The name of the consumer group the data is associated with.
///
- partial void InvalidCheckpointFound(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup);
+ partial void InvalidCheckpointFound(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup);
///
/// Indicates that an attempt to retrieve a list of checkpoints has started.
@@ -760,7 +804,9 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it.
/// The name of the consumer group the checkpoints are associated with.
///
- partial void ListCheckpointsStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup);
+ partial void ListCheckpointsStart(string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup);
///
/// Indicates that an unhandled exception was encountered while updating a checkpoint.
@@ -772,7 +818,11 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the consumer group the checkpoint is associated with.
/// The message for the exception that occurred.
///
- partial void UpdateCheckpointError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception);
+ partial void UpdateCheckpointError(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ Exception exception);
///
/// Indicates that an attempt to update a checkpoint has completed.
@@ -783,7 +833,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.
/// The name of the consumer group the checkpoint is associated with.
///
- partial void UpdateCheckpointComplete(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup);
+ partial void UpdateCheckpointComplete(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup);
///
/// Indicates that an attempt to create/update a checkpoint has started.
@@ -794,7 +847,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it.
/// The name of the consumer group the checkpoint is associated with.
///
- partial void UpdateCheckpointStart(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup);
+ partial void UpdateCheckpointStart(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup);
///
/// Indicates that an attempt to retrieve claim partition ownership has completed.
@@ -806,7 +862,11 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the consumer group the ownership is associated with.
/// The identifier of the processor that attempted to claim the ownership for.
///
- partial void ClaimOwnershipComplete(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier);
+ partial void ClaimOwnershipComplete(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string ownerIdentifier);
///
/// Indicates that an exception was encountered while attempting to retrieve claim partition ownership.
@@ -819,7 +879,12 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The identifier of the processor that attempted to claim the ownership for.
/// The message for the exception that occurred.
///
- partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, Exception exception);
+ partial void ClaimOwnershipError(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string ownerIdentifier,
+ Exception exception);
///
/// Indicates that ownership was unable to be claimed.
@@ -832,7 +897,12 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The identifier of the processor that attempted to claim the ownership for.
/// The message for the failure.
///
- partial void OwnershipNotClaimable(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, string message);
+ partial void OwnershipNotClaimable(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string ownerIdentifier,
+ string message);
///
/// Indicates that ownership was successfully claimed.
@@ -844,7 +914,11 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the consumer group the ownership is associated with.
/// The identifier of the processor that attempted to claim the ownership for.
///
- partial void OwnershipClaimed(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier);
+ partial void OwnershipClaimed(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string ownerIdentifier);
///
/// Indicates that an attempt to claim a partition ownership has started.
@@ -856,7 +930,11 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The name of the consumer group the ownership is associated with.
/// The identifier of the processor that attempted to claim the ownership for.
///
- partial void ClaimOwnershipStart(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier);
+ partial void ClaimOwnershipStart(string partitionId,
+ string fullyQualifiedNamespace,
+ string eventHubName,
+ string consumerGroup,
+ string ownerIdentifier);
///
/// Indicates that a was created.
@@ -866,12 +944,15 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o
/// The Storage account name corresponding to the associated container client.
/// The name of the associated container client.
///
- partial void BlobsCheckpointStoreCreated(string typeName, string accountName, string containerName);
+ partial void BlobsCheckpointStoreCreated(string typeName,
+ string accountName,
+ string containerName);
///
/// Contains the information to reflect the state of event processing for a given Event Hub partition.
/// Provides access to the offset and the sequence number retrieved from the blob.
///
+ ///
public class BlobStorageCheckpoint : EventProcessorCheckpoint
{
public long? Offset { get; set; }
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs
index 008b531ddd0d..82d78116243c 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs
@@ -55,7 +55,7 @@ internal class PartitionLoadBalancer
/// read only in the context of this group.
///
///
- public string ConsumerGroup { get; }
+ public string ConsumerGroup { get; }
///
/// The identifier of the EventProcessorClient that owns this load balancer.
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventGenerator.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventGenerator.cs
index da3384a44484..141bbcb989c4 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventGenerator.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventGenerator.cs
@@ -3,7 +3,6 @@
using System;
using System.Collections.Generic;
-using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs.Producer;
@@ -134,7 +133,7 @@ public static async Task> BuildBatchesAsync(IEnumera
}
else
{
- queuedEvents.Dequeue();
+ queuedEvents.Dequeue();
}
}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs
index 6ca33dd94615..efbf0cb58cd5 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs
@@ -5,8 +5,6 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Core.TestFramework;
-using Azure.Messaging.EventHubs.Authorization;
-using Azure.Messaging.EventHubs.Core;
namespace Azure.Messaging.EventHubs.Tests
{
@@ -16,7 +14,7 @@ namespace Azure.Messaging.EventHubs.Tests
/// variables.
///
///
- public sealed class EventHubsTestEnvironment: TestEnvironment
+ public sealed class EventHubsTestEnvironment : TestEnvironment
{
/// The name of the shared access key to be used for accessing an Event Hubs namespace.
public const string EventHubsDefaultSharedAccessKey = "RootManageSharedAccessKey";
@@ -131,7 +129,7 @@ public sealed class EventHubsTestEnvironment: TestEnvironment
/// The location of the resource manager for the active cloud environment.
///
///
- public new string ResourceManagerUrl => base.ResourceManagerUrl ?? "https://management.azure.com/";
+ public new string ResourceManagerUrl => base.ResourceManagerUrl ?? "https://management.azure.com/";
///
/// Initializes a new instance of .
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs
index 72c41640e906..140cba1b05d5 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs
@@ -636,7 +636,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealOwnershipAsRecovery()
//
// Assign the processor ownership over half of the partitions in storage, but do not formally claim them.
- await storageManager.ClaimOwnershipAsync(CreatePartitionOwnership( partitionIds.Skip(MinimumPartitionCount).Take(OrphanedPartitionCount), loadBalancer.OwnerIdentifier));
+ await storageManager.ClaimOwnershipAsync(CreatePartitionOwnership(partitionIds.Skip(MinimumPartitionCount).Take(OrphanedPartitionCount), loadBalancer.OwnerIdentifier));
completeOwnership = await storageManager.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup);
Assert.That(completeOwnership.Count(), Is.EqualTo(OrphanedPartitionCount + MinimumPartitionCount), "Storage should be tracking half the partitions as owned by another processor as well as some orphans.");
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/InMemoryStorageManagerTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/InMemoryStorageManagerTests.cs
old mode 100755
new mode 100644
index d9b33d62e252..6d9ad4c9aff8
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/InMemoryStorageManagerTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/InMemoryStorageManagerTests.cs
@@ -6,7 +6,6 @@
using System.Linq;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs.Primitives;
-using Moq;
using NUnit.Framework;
namespace Azure.Messaging.EventHubs.Tests
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs
index 0224b4d86623..8fd0d57aa14f 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs
@@ -909,7 +909,7 @@ protected virtual async Task RequestAuthorizationUsingCbsAsync(AmqpCon
if (connection.IsClosing())
{
- throw new EventHubsException(true, EventHubName, Resources.UnknownCommunicationException, EventHubsException.FailureReason.ServiceCommunicationProblem);
+ throw new EventHubsException(true, EventHubName, Resources.UnknownCommunicationException, EventHubsException.FailureReason.ServiceCommunicationProblem);
}
var authLink = connection.Extensions.Find();
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs
index c76f60a90839..26b646f50623 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs
@@ -578,7 +578,7 @@ protected virtual async Task CreateLinkAndEnsureProducerStateAs
}
catch (Exception ex)
{
- ExceptionDispatchInfo.Capture(ex.TranslateConnectionCloseDuringLinkCreationException(EventHubName)).Throw();
+ ExceptionDispatchInfo.Capture(ex.TranslateConnectionCloseDuringLinkCreationException(EventHubName)).Throw();
}
return link;
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/ChannelReaderExtensions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/ChannelReaderExtensions.cs
old mode 100755
new mode 100644
index 7c4eb0dddd35..203860809eaf
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/ChannelReaderExtensions.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/ChannelReaderExtensions.cs
@@ -32,7 +32,7 @@ internal static class ChannelReaderExtensions
///
public static async IAsyncEnumerable EnumerateChannel(this ChannelReader reader,
TimeSpan? maximumWaitTime,
- [EnumeratorCancellation]CancellationToken cancellationToken)
+ [EnumeratorCancellation] CancellationToken cancellationToken)
{
Argument.AssertNotNull(reader, nameof(reader));
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs
index 83497535dd7b..bc7d532ef075 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs
@@ -290,7 +290,7 @@ public PoolItem(string partitionId,
/// A class wrapping a , triggering a clean-up when the object is disposed.
///
///
- internal class PooledProducer: IAsyncDisposable
+ internal class PooledProducer : IAsyncDisposable
{
///
/// A function responsible of cleaning up the resources in use.
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs
index ce9fb45124f3..02114d671ea6 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
-using System;
using System.Diagnostics.Tracing;
using Azure.Core.Diagnostics;
using Azure.Messaging.EventHubs.Consumer;
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs
index 7727f0e6e20e..c4130344291c 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs
@@ -783,7 +783,7 @@ async Task performProcessing()
protected abstract Task> ListCheckpointsAsync(CancellationToken cancellationToken);
///
- /// Returns a checkpoint for the Event Hub, consumer group, and identifier of the partition associated with the
+ /// Returns a checkpoint for the Event Hub, consumer group, and identifier of the partition associated with the
/// event processor instance, so that processing for a given partition can be properly initialized.
/// The default implementation calls the and filters results by .
/// It's recommended that this method is overriden in implementations to achieve an optimal performance.
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs
index a509ac3b6235..ab9d9447bd70 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs
@@ -624,8 +624,8 @@ public virtual async Task SendAsync(IEnumerable eventBatch,
var events = eventBatch switch
{
- IReadOnlyList eventList => eventList,
- _ => eventBatch.ToList()
+ IReadOnlyList eventList => eventList,
+ _ => eventBatch.ToList()
};
if (events.Count == 0)
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs
index b76564afd12c..64fce1034895 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs
@@ -31,7 +31,7 @@ internal class PartitionPublishingOptions
///
///
///
- public long? ProducerGroupId { get; set; }
+ public long? ProducerGroupId { get; set; }
///
/// The owner level indicates that a publishing is intended to be performed exclusively for events in the
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs
index c19f4ea15be1..c4584957f738 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs
@@ -44,7 +44,7 @@ internal static PartitionPublishingProperties Empty
/// The identifier of the producer group for which this producer is publishing to the associated partition.
///
///
- public long? ProducerGroupId { get; }
+ public long? ProducerGroupId { get; }
///
/// The owner level of the producer publishing to the associated partition.
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs
index 331471d8ee02..4455b436246b 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs
@@ -41,7 +41,7 @@ internal class PartitionPublishingState
/// The identifier of the producer group for which publishing is being performed.
///
///
- public long? ProducerGroupId { get; set; }
+ public long? ProducerGroupId { get; set; }
///
/// The owner level for which publishing is being performed.
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs
index 436873aced42..57fcb88e564c 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs
@@ -2,8 +2,8 @@
// Licensed under the MIT License.
using System;
-using System.Collections.Generic;
using System.Collections.Concurrent;
+using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Reflection;
@@ -1966,7 +1966,7 @@ public async Task RequestAuthorizationUsingCbsAsyncRespectsTheConnectionClosing(
}
catch
{
- // Ignore any other exception; the assertions will fail with better context.
+ // Ignore any other exception; the assertions will fail with better context.
}
Assert.That(observedException, Is.Not.Null, "An Event Hubs exception should have been thrown when requesting authorization.");
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpProducerTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpProducerTests.cs
index 69dbb63f8ff4..9742d4723d4c 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpProducerTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpProducerTests.cs
@@ -12,7 +12,6 @@
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Framing;
-using Microsoft.Identity.Client;
using Moq;
using Moq.Protected;
using NUnit.Framework;
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Authorization/EventHubsSharedAccessKeyCredentialTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Authorization/EventHubsSharedAccessKeyCredentialTests.cs
index 5da17e8517e9..bcaec3f71f90 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Authorization/EventHubsSharedAccessKeyCredentialTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Authorization/EventHubsSharedAccessKeyCredentialTests.cs
@@ -3,7 +3,6 @@
using System;
using System.Reflection;
-using Azure.Core;
using Azure.Messaging.EventHubs.Authorization;
using NUnit.Framework;
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs
index cf764a957143..0cd77df67a91 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs
@@ -9,7 +9,6 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
-using Azure.Identity;
using Azure.Messaging.EventHubs.Authorization;
using Azure.Messaging.EventHubs.Core;
using NUnit.Framework;
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs
index a7316d86ff8d..ab5273c1fa2a 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs
@@ -52,7 +52,7 @@ public async Task ConsumerWithNoOptionsCanRead(EventHubsTransportType transportT
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
{
using var cancellationSource = new CancellationTokenSource();
- cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
+ cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);
@@ -237,7 +237,7 @@ public async Task ConsumerCanReadBatchOfZeroLengthEvents()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -275,7 +275,7 @@ public async Task ConsumerCanReadBatchOfEvents()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -314,7 +314,7 @@ public async Task ConsumerCanReadBatchOfEventsWithCustomPrefetchAndBatchCounts()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -346,14 +346,14 @@ public async Task ConsumerCanReadBatchOfEventsWithCustomPrefetchAndBatchCountsAn
// Read the events and validate the resulting state.
- var readOptions = new ReadEventOptions { PrefetchCount = 150, CacheEventCount = 50, PrefetchSizeInBytes = 128 };
+ var readOptions = new ReadEventOptions { PrefetchCount = 150, CacheEventCount = 50, PrefetchSizeInBytes = 128 };
var readState = await ReadEventsFromPartitionAsync(consumer, partition, sourceEvents.Count, cancellationSource.Token, readOptions: readOptions);
Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled.");
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -392,7 +392,7 @@ public async Task ConsumerCanReadEventsWithPrefetchDisabled()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -439,7 +439,7 @@ public async Task ConsumerCanReadEventsWithCustomProperties()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -479,7 +479,7 @@ public async Task ConsumerCanReadEventsUsingAnIdentityCredential()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -519,7 +519,7 @@ public async Task ConsumerCanReadEventsUsingTheSharedKeyCredential()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -557,7 +557,7 @@ public async Task ConsumerCanReadFromEarliest()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -608,7 +608,7 @@ public async Task ConsumerCanReadFromLatest()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -667,7 +667,7 @@ public async Task ConsumerCanReadFromOffset(bool isInclusive)
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -726,7 +726,7 @@ public async Task ConsumerCanReadFromSequenceNumber(bool isInclusive)
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -780,7 +780,7 @@ public async Task ConsumerCanReadFromEnqueuedTime()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -826,10 +826,10 @@ public async Task ConsumerCanReadFromMultipleConsumerGroups()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState[0].Events.TryGetValue(sourceId, out var customReadEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed for the custom consumer group." );
+ Assert.That(readState[0].Events.TryGetValue(sourceId, out var customReadEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed for the custom consumer group.");
Assert.That(sourceEvent.IsEquivalentTo(customReadEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event for the custom consumer group.");
- Assert.That(readState[1].Events.TryGetValue(sourceId, out var defaultReadEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed for the default consumer group." );
+ Assert.That(readState[1].Events.TryGetValue(sourceId, out var defaultReadEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed for the default consumer group.");
Assert.That(sourceEvent.IsEquivalentTo(defaultReadEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event for the default consumer group.");
}
}
@@ -935,7 +935,7 @@ public async Task ConsumerCannotReadFromInvalidPartition()
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
{
using var cancellationSource = new CancellationTokenSource();
- cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
+ cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
await using (var consumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, EventHubsTestEnvironment.Instance.EventHubsConnectionString, scope.EventHubName))
{
@@ -961,7 +961,7 @@ public async Task ConsumerCannotReadFromInvalidConsumerGroup()
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
{
using var cancellationSource = new CancellationTokenSource();
- cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
+ cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
var invalidConsumerGroup = "ThisIsFake";
@@ -1257,7 +1257,7 @@ public async Task ExclusiveConsumerSupercedesNonExclusiveActiveReader()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -1319,7 +1319,7 @@ public async Task ConsumerWithHigherOwnerLevelSupercedesActiveReader()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -1484,7 +1484,7 @@ public async Task ConsumerIsNotCompromisedByFailureToReadFromInvalidPartition()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -1559,10 +1559,10 @@ await Task.WhenAll
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(higherReadState.Events.TryGetValue(sourceId, out var higherEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed by the higher reader." );
+ Assert.That(higherReadState.Events.TryGetValue(sourceId, out var higherEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed by the higher reader.");
Assert.That(sourceEvent.IsEquivalentTo(higherEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event for the higher reader.");
- Assert.That(lowerReadState.Events.TryGetValue(sourceId, out var lowerEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed by the lower reader." );
+ Assert.That(lowerReadState.Events.TryGetValue(sourceId, out var lowerEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed by the lower reader.");
Assert.That(sourceEvent.IsEquivalentTo(lowerEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event for the lower reader.");
}
}
@@ -1833,7 +1833,7 @@ public async Task ConsumerCanReadFromAllPartitions()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -1874,7 +1874,7 @@ public async Task ConsumerCanReadFromAllPartitionsWithCustomPrefetchAndBatchCoun
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -1915,7 +1915,7 @@ public async Task ConsumerCanReadFromAllPartitionsUsingAnIdentityCredential()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -1956,7 +1956,7 @@ public async Task ConsumerCanReadFromAllPartitionsUsingTheSharedKeyCredential()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
@@ -2007,7 +2007,7 @@ public async Task ConsumerCanReadFromAllPartitionsStartingWithLatest()
foreach (var sourceEvent in sourceEvents)
{
var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString();
- Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." );
+ Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed.");
Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event.");
}
}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventDataTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventDataTests.cs
index 7baf8dd98f94..b97ff8219092 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventDataTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventDataTests.cs
@@ -87,8 +87,8 @@ public void CloneProducesACopy()
{
var sourceEvent = new EventData(
new byte[] { 0x21, 0x22 },
- new Dictionary { {"Test", 123 } },
- new Dictionary { { "System", "Hello" }},
+ new Dictionary { { "Test", 123 } },
+ new Dictionary { { "System", "Hello" } },
33334444,
666777,
DateTimeOffset.Parse("2015-10-27T00:00:00Z"),
@@ -117,8 +117,8 @@ public void CloneIsolatesPropertyChanges()
{
var sourceEvent = new EventData(
new byte[] { 0x21, 0x22 },
- new Dictionary { {"Test", 123 } },
- new Dictionary { { "System", "Hello" }},
+ new Dictionary { { "Test", 123 } },
+ new Dictionary { { "System", "Hello" } },
33334444,
666777,
DateTimeOffset.Parse("2015-10-27T00:00:00Z"),
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsConnectionStringPropertiesTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsConnectionStringPropertiesTests.cs
index abf2f81074b6..b1db18c4b43f 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsConnectionStringPropertiesTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsConnectionStringPropertiesTests.cs
@@ -4,7 +4,6 @@
using System;
using System.Collections.Generic;
using System.Reflection;
-using Azure.Messaging.EventHubs;
using NUnit.Framework;
using NUnit.Framework.Constraints;
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsModelFactoryTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsModelFactoryTests.cs
index 7ef24491be14..18babe16d950 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsModelFactoryTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsModelFactoryTests.cs
@@ -148,8 +148,8 @@ public void PartitionContextDefaultsLastEnqueuedEventProperties()
public void EventDataInitializesProperties()
{
var body = new BinaryData("Hello");
- var properties = new Dictionary {{ "id", 12 }};
- var systemProperties = new Dictionary {{ "custom", "sys-value" }};
+ var properties = new Dictionary { { "id", 12 } };
+ var systemProperties = new Dictionary { { "custom", "sys-value" } };
var sequenceNumber = long.MaxValue - 512;
var offset = long.MaxValue - 1024;
var enqueueTime = new DateTimeOffset(2015, 10, 27, 12, 0, 0, TimeSpan.Zero);
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs
index 64406bc975ff..41a8c9e5ae90 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs
@@ -3,7 +3,6 @@
using System;
using System.Collections.Generic;
-using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
@@ -736,10 +735,10 @@ public async Task BackgroundProcessingLogsHandlerErrorWhenPartitionProcessingSto
.Callback(() => startCompletionSource.TrySetResult(true))
.CallBase();
- mockProcessor
- .Protected()
- .Setup("OnPartitionProcessingStoppedAsync", ItExpr.IsAny(), ItExpr.IsAny(), ItExpr.IsAny())
- .Throws(expectedException);
+ mockProcessor
+ .Protected()
+ .Setup("OnPartitionProcessingStoppedAsync", ItExpr.IsAny(), ItExpr.IsAny(), ItExpr.IsAny())
+ .Throws(expectedException);
await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token);
Assert.That(mockProcessor.Object.Status, Is.EqualTo(EventProcessorStatus.Running), "The processor should have started.");
@@ -1308,9 +1307,9 @@ public async Task BackgroundProcessingDispatchesExceptionsWhenStartingToProcessC
var mockConnection = new Mock();
var mockProcessor = new Mock>(65, "consumerGroup", "namespace", "eventHub", Mock.Of(), options, mockLoadBalancer.Object) { CallBase = true };
- mockLoadBalancer
- .SetupGet(lb => lb.OwnedPartitionIds)
- .Returns(ownedPartitions);
+ mockLoadBalancer
+ .SetupGet(lb => lb.OwnedPartitionIds)
+ .Returns(ownedPartitions);
mockLoadBalancer
.SetupSequence(lb => lb.RunLoadBalancingAsync(partitionIds, It.IsAny()))
@@ -1670,8 +1669,8 @@ public async Task LoadBalancingWhenGreedyAppliesTheTimeoutAfterBalance()
})
.Returns(() =>
{
- completionSource.TrySetResult(true);
- return new ValueTask(default(EventProcessorPartitionOwnership));
+ completionSource.TrySetResult(true);
+ return new ValueTask(default(EventProcessorPartitionOwnership));
});
mockConnection
@@ -1762,8 +1761,8 @@ public async Task LoadBalancingAppliesTheBalancedStrategy()
})
.Returns(() =>
{
- completionSource.TrySetResult(true);
- return new ValueTask(default(EventProcessorPartitionOwnership));
+ completionSource.TrySetResult(true);
+ return new ValueTask(default(EventProcessorPartitionOwnership));
});
mockConnection
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientOptionsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientOptionsTests.cs
index 3310c39bad7b..fd8a585f8039 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientOptionsTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientOptionsTests.cs
@@ -118,7 +118,7 @@ public void CreateFeatureFlagsDetectsIdempotentPublishing()
public void GetPublishingOptionsOrDefaultForPartitionDefaultsWhenNoPartitionIsSpecified(string partitionId)
{
var options = new EventHubProducerClientOptions();
- options.PartitionOptions.Add("1", new PartitionPublishingOptions{ ProducerGroupId = 1 });
+ options.PartitionOptions.Add("1", new PartitionPublishingOptions { ProducerGroupId = 1 });
Assert.That(options.GetPublishingOptionsOrDefaultForPartition(partitionId), Is.EqualTo(default(PartitionPublishingOptions)));
}
@@ -132,7 +132,7 @@ public void GetPublishingOptionsOrDefaultForPartitionDefaultsWhenNoPartitionIsSp
public void GetPublishingOptionsOrDefaultForPartitionDefaultsWhenNoPartitionIsFound()
{
var options = new EventHubProducerClientOptions();
- options.PartitionOptions.Add("1", new PartitionPublishingOptions{ ProducerGroupId = 1 });
+ options.PartitionOptions.Add("1", new PartitionPublishingOptions { ProducerGroupId = 1 });
Assert.That(options.GetPublishingOptionsOrDefaultForPartition("0"), Is.EqualTo(default(PartitionPublishingOptions)));
}
@@ -146,7 +146,7 @@ public void GetPublishingOptionsOrDefaultForPartitionDefaultsWhenNoPartitionIsFo
public void GetPublishingOptionsOrDefaultForPartitionReturnsTheOptionsWhenThePartitionIsFound()
{
var partitionId = "12";
- var expectedPartitionOptions = new PartitionPublishingOptions{ ProducerGroupId = 1 };
+ var expectedPartitionOptions = new PartitionPublishingOptions { ProducerGroupId = 1 };
var options = new EventHubProducerClientOptions();
options.PartitionOptions.Add(partitionId, expectedPartitionOptions);
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs
index 51ce9ac7125e..ed54bea17ca0 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs
@@ -52,7 +52,7 @@ public void ConstructorValidatesTheConnectionStringIsPopulated(string connection
[TestCase("HostName=value.azure-devices.net;SharedAccessKeyName=[value];SharedAccessKey=[value];EntityPath=[value]")]
public void ConstructorValidatesConnectionString(string connectionString)
{
- Assert.That(() =>new EventHubProducerClient(connectionString), Throws.ArgumentException.And.Message.StartsWith(Resources.MissingConnectionInformation));
+ Assert.That(() => new EventHubProducerClient(connectionString), Throws.ArgumentException.And.Message.StartsWith(Resources.MissingConnectionInformation));
}
///
@@ -406,9 +406,9 @@ public async Task ReadPartitionPublishingPropertiesAsyncInitializesPartitionStat
clientOptions.PartitionOptions.Add(expectedPartition, new PartitionPublishingOptions
{
- ProducerGroupId = 999,
- OwnerLevel = 999,
- StartingSequenceNumber = 999
+ ProducerGroupId = 999,
+ OwnerLevel = 999,
+ StartingSequenceNumber = 999
});
var producer = new EventHubProducerClient(connection, clientOptions);
@@ -786,10 +786,10 @@ public void SendIdempotentDoesNotAllowResending()
var events = EventGenerator.CreateEvents(5).Select(item =>
{
- item.PendingPublishSequenceNumber = 5;
- item.CommitPublishingState();
+ item.PendingPublishSequenceNumber = 5;
+ item.CommitPublishingState();
- return item;
+ return item;
});
var sendOptions = new SendEventOptions { PartitionId = "0" };
@@ -880,9 +880,9 @@ public async Task SendIdempotentInitializesPartitionState()
clientOptions.PartitionOptions.Add(expectedPartition, new PartitionPublishingOptions
{
- ProducerGroupId = 999,
- OwnerLevel = 999,
- StartingSequenceNumber = 999
+ ProducerGroupId = 999,
+ OwnerLevel = 999,
+ StartingSequenceNumber = 999
});
var producer = new EventHubProducerClient(connection, clientOptions);
@@ -1295,7 +1295,8 @@ public void SendIdempotentRequiresThePartitionWithABatch()
var batchOptions = new CreateBatchOptions { PartitionKey = "testKey" };
batch = new EventDataBatch(new MockTransportBatch(1), "ns", "eh", batchOptions);
- Assert.That(async () => await producer.SendAsync(batch), Throws.InstanceOf(), "A partition key cannot be used with idempotent publishing.");;
+ Assert.That(async () => await producer.SendAsync(batch), Throws.InstanceOf(), "A partition key cannot be used with idempotent publishing.");
+ ;
}
///
@@ -1340,10 +1341,10 @@ public void SendIdempotentDoesNotAllowResendingWithABatchContainingPublishedEven
var events = EventGenerator.CreateEvents(5).Skip(4).Select(item =>
{
- item.PendingPublishSequenceNumber = 5;
- item.CommitPublishingState();
+ item.PendingPublishSequenceNumber = 5;
+ item.CommitPublishingState();
- return item;
+ return item;
});
var batch = new EventDataBatch(new MockTransportBatch(), "ns", "eh", new CreateBatchOptions { PartitionId = "0" });
@@ -1434,9 +1435,9 @@ public async Task SendIdempotentInitializesPartitionStateWithABatch()
clientOptions.PartitionOptions.Add(expectedPartition, new PartitionPublishingOptions
{
- ProducerGroupId = 999,
- OwnerLevel = 999,
- StartingSequenceNumber = 999
+ ProducerGroupId = 999,
+ OwnerLevel = 999,
+ StartingSequenceNumber = 999
});
var producer = new EventHubProducerClient(connection, clientOptions);
@@ -2749,7 +2750,7 @@ private class MockPooledProducer : TransportProducerPool.PooledProducer
{
public bool WasClosed { get; set; } = false;
- public MockPooledProducer(TransportProducer transportProducer): base(transportProducer, (_) => Task.CompletedTask)
+ public MockPooledProducer(TransportProducer transportProducer) : base(transportProducer, (_) => Task.CompletedTask)
{
}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs
index d32d8fa3e2cd..14beba232074 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs
@@ -60,7 +60,7 @@ public async Task ProducerCanPublishEvents(EventHubsTransportType transportType)
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
{
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);
- var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true, ConnectionOptions = new EventHubConnectionOptions { TransportType = transportType }};
+ var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true, ConnectionOptions = new EventHubConnectionOptions { TransportType = transportType } };
await using var producer = new EventHubProducerClient(connectionString, options);
@@ -88,7 +88,7 @@ public async Task ProducerCanPublishBatches(EventHubsTransportType transportType
await using (EventHubScope scope = await EventHubScope.CreateAsync(1))
{
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName);
- var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true, ConnectionOptions = new EventHubConnectionOptions { TransportType = transportType }};
+ var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true, ConnectionOptions = new EventHubConnectionOptions { TransportType = transportType } };
await using var producer = new EventHubProducerClient(connectionString, options);
@@ -193,7 +193,7 @@ public async Task ProducerManagesConcurrencyWhenPublishingEvents()
var partition = (await producer.GetPartitionIdsAsync(cancellationSource.Token)).First();
var sendOptions = new SendEventOptions { PartitionId = partition };
- async Task sendEvents (int delayMilliseconds)
+ async Task sendEvents(int delayMilliseconds)
{
await Task.Delay(delayMilliseconds);
await producer.SendAsync(EventGenerator.CreateEvents(2), sendOptions, cancellationSource.Token);
@@ -230,7 +230,7 @@ public async Task ProducerManagesConcurrencyWhenPublishingBatches()
var partition = (await producer.GetPartitionIdsAsync(cancellationSource.Token)).First();
var batchOptions = new CreateBatchOptions { PartitionId = partition };
- async Task sendBatch (int delayMilliseconds)
+ async Task sendBatch(int delayMilliseconds)
{
await Task.Delay(delayMilliseconds);
@@ -462,7 +462,7 @@ public async Task ProducerCanInitializeWithPartitionOptions()
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
var partition = default(string);
- var partitionProperties = default(PartitionPublishingProperties);
+ var partitionProperties = default(PartitionPublishingProperties);
// Create a producer for a small scope that will Send some events and read the properties.
@@ -505,7 +505,7 @@ public async Task ProducerCanInitializeWithPartialPartitionOptions()
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
var partition = default(string);
- var partitionProperties = default(PartitionPublishingProperties);
+ var partitionProperties = default(PartitionPublishingProperties);
// Create a producer for a small scope that will Send some events and read the properties.
@@ -560,7 +560,7 @@ public async Task ProducerIsRejectedWithPartitionOptionsForInvalidState()
cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit);
var partition = default(string);
- var partitionProperties = default(PartitionPublishingProperties);
+ var partitionProperties = default(PartitionPublishingProperties);
// Create a producer for a small scope that will Send some events and read the properties.