Skip to content

Commit

Permalink
[Event Hubs] Client Identifier Option (#22614)
Browse files Browse the repository at this point in the history
The focus of these changes is to add an option for setting an Identifier
for each Event Hubs client type.  The identifier is informational and is
associated with the AMQP links used, allowing the service to provide additional
context in error messages and the SDK logs to provide an additional point
of correlation.
  • Loading branch information
jsquire authored Jul 13, 2021
1 parent 02f27a2 commit 0899d47
Show file tree
Hide file tree
Showing 34 changed files with 1,084 additions and 501 deletions.
2 changes: 2 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Each Event Hubs client type now offers an option to set an Identifier. The identifier is informational and is associated with the AMQP links used, allowing the service to provide additional context in error messages and the SDK logs to provide an additional point of correlation.

### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ public EventHubConsumerClient(string consumerGroup, string connectionString, str
public string ConsumerGroup { get { throw null; } }
public string EventHubName { get { throw null; } }
public string FullyQualifiedNamespace { get { throw null; } }
public string Identifier { get { throw null; } }
public bool IsClosed { get { throw null; } protected set { } }
public virtual System.Threading.Tasks.Task CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
Expand All @@ -226,6 +227,7 @@ public partial class EventHubConsumerClientOptions
{
public EventHubConsumerClientOptions() { }
public Azure.Messaging.EventHubs.EventHubConnectionOptions ConnectionOptions { get { throw null; } set { } }
public string Identifier { get { throw null; } set { } }
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
Expand Down Expand Up @@ -400,6 +402,7 @@ public PartitionReceiver(string consumerGroup, string partitionId, Azure.Messagi
public string ConsumerGroup { get { throw null; } }
public string EventHubName { get { throw null; } }
public string FullyQualifiedNamespace { get { throw null; } }
public string Identifier { get { throw null; } }
public Azure.Messaging.EventHubs.Consumer.EventPosition InitialPosition { get { throw null; } }
public bool IsClosed { get { throw null; } protected set { } }
public string PartitionId { get { throw null; } }
Expand All @@ -421,6 +424,7 @@ public partial class PartitionReceiverOptions
public PartitionReceiverOptions() { }
public Azure.Messaging.EventHubs.EventHubConnectionOptions ConnectionOptions { get { throw null; } set { } }
public System.TimeSpan? DefaultMaximumReceiveWaitTime { get { throw null; } set { } }
public string Identifier { get { throw null; } set { } }
public long? OwnerLevel { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public long? PrefetchSizeInBytes { get { throw null; } set { } }
Expand Down Expand Up @@ -519,6 +523,7 @@ public EventHubProducerClient(string fullyQualifiedNamespace, string eventHubNam
public EventHubProducerClient(string connectionString, string eventHubName, Azure.Messaging.EventHubs.Producer.EventHubProducerClientOptions clientOptions) { }
public string EventHubName { get { throw null; } }
public string FullyQualifiedNamespace { get { throw null; } }
public string Identifier { get { throw null; } }
public bool IsClosed { get { throw null; } protected set { } }
public virtual System.Threading.Tasks.Task CloseAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.ValueTask<Azure.Messaging.EventHubs.Producer.EventDataBatch> CreateBatchAsync(Azure.Messaging.EventHubs.Producer.CreateBatchOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand All @@ -541,6 +546,7 @@ public partial class EventHubProducerClientOptions
{
public EventHubProducerClientOptions() { }
public Azure.Messaging.EventHubs.EventHubConnectionOptions ConnectionOptions { get { throw null; } set { } }
public string Identifier { get { throw null; } set { } }
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
Expand Down
6 changes: 6 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,15 @@ public override async Task<PartitionProperties> GetPartitionPropertiesAsync(stri
/// </summary>
///
/// <param name="partitionId">The identifier of the partition to which the transport producer should be bound; if <c>null</c>, the producer is unbound.</param>
/// <param name="producerIdentifier">The identifier to associate with the consumer; if <c>null</c> or <see cref="string.Empty" />, a random identifier will be generated.</param>
/// <param name="requestedFeatures">The flags specifying the set of special transport features that should be opted-into.</param>
/// <param name="partitionOptions">The set of options, if any, that should be considered when initializing the producer.</param>
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
///
/// <returns>A <see cref="TransportProducer"/> configured in the requested manner.</returns>
///
public override TransportProducer CreateProducer(string partitionId,
string producerIdentifier,
TransportProducerFeatures requestedFeatures,
PartitionPublishingOptions partitionOptions,
EventHubsRetryPolicy retryPolicy)
Expand All @@ -420,6 +422,7 @@ public override TransportProducer CreateProducer(string partitionId,
(
EventHubName,
partitionId,
producerIdentifier,
ConnectionScope,
MessageConverter,
retryPolicy,
Expand Down Expand Up @@ -447,6 +450,7 @@ public override TransportProducer CreateProducer(string partitionId,
///
/// <param name="consumerGroup">The name of the consumer group this consumer is associated with. Events are read in the context of this group.</param>
/// <param name="partitionId">The identifier of the Event Hub partition from which events will be received.</param>
/// <param name="consumerIdentifier">The identifier to associate with the consumer; if <c>null</c> or <see cref="string.Empty" />, a random identifier will be generated.</param>
/// <param name="eventPosition">The position within the partition where the consumer should begin reading events.</param>
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
/// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on the partition is sent as events are received.</param>
Expand All @@ -459,6 +463,7 @@ public override TransportProducer CreateProducer(string partitionId,
///
public override TransportConsumer CreateConsumer(string consumerGroup,
string partitionId,
string consumerIdentifier,
EventPosition eventPosition,
EventHubsRetryPolicy retryPolicy,
bool trackLastEnqueuedEventProperties,
Expand All @@ -474,6 +479,7 @@ public override TransportConsumer CreateConsumer(string consumerGroup,
EventHubName,
consumerGroup,
partitionId,
consumerIdentifier,
eventPosition,
trackLastEnqueuedEventProperties,
invalidateConsumerWhenPartitionStolen,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ public virtual async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(TimeS
/// <param name="prefetchSizeInBytes">The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size.</param>
/// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this value should be <c>null</c>.</param>
/// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on the partition is sent as events are received.</param>
/// <param name="linkIdentifier">The identifier to assign to the link; if <c>null</c> or <see cref="string.Empty" />, a random identifier will be generated.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>A link for use with consumer operations.</returns>
Expand All @@ -320,6 +321,7 @@ public virtual async Task<ReceivingAmqpLink> OpenConsumerLinkAsync(string consum
long? prefetchSizeInBytes,
long? ownerLevel,
bool trackLastEnqueuedEventProperties,
string linkIdentifier,
CancellationToken cancellationToken)
{
Argument.AssertNotDisposed(_disposed, nameof(AmqpConnectionScope));
Expand All @@ -341,6 +343,11 @@ public virtual async Task<ReceivingAmqpLink> OpenConsumerLinkAsync(string consum
var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

if (string.IsNullOrEmpty(linkIdentifier))
{
linkIdentifier = Guid.NewGuid().ToString();
}

var link = await CreateReceivingLinkAsync(
connection,
consumerEndpoint,
Expand All @@ -350,6 +357,7 @@ public virtual async Task<ReceivingAmqpLink> OpenConsumerLinkAsync(string consum
prefetchSizeInBytes,
ownerLevel,
trackLastEnqueuedEventProperties,
linkIdentifier,
cancellationToken
).ConfigureAwait(false);

Expand Down Expand Up @@ -379,6 +387,7 @@ public virtual async Task<ReceivingAmqpLink> OpenConsumerLinkAsync(string consum
/// <param name="features">The set of features which are active for the producer requesting the link.</param>
/// <param name="options">The set of options to consider when creating the link.</param>
/// <param name="timeout">The timeout to apply when creating the link.</param>
/// <param name="linkIdentifier">The identifier to assign to the link; if <c>null</c> or <see cref="string.Empty" />, a random identifier will be generated.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>A link for use with producer operations.</returns>
Expand All @@ -387,6 +396,7 @@ public virtual async Task<SendingAmqpLink> OpenProducerLinkAsync(string partitio
TransportProducerFeatures features,
PartitionPublishingOptions options,
TimeSpan timeout,
string linkIdentifier,
CancellationToken cancellationToken)
{
Argument.AssertNotDisposed(_disposed, nameof(AmqpConnectionScope));
Expand All @@ -405,7 +415,12 @@ public virtual async Task<SendingAmqpLink> OpenProducerLinkAsync(string partitio
var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var link = await CreateSendingLinkAsync(connection, producerEndpoint, features, options, timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);
if (string.IsNullOrEmpty(linkIdentifier))
{
linkIdentifier = Guid.NewGuid().ToString();
}

var link = await CreateSendingLinkAsync(connection, producerEndpoint, features, options, timeout.CalculateRemaining(stopWatch.GetElapsedTime()), linkIdentifier, cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
Expand Down Expand Up @@ -587,12 +602,13 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
///
/// <param name="connection">The active and opened AMQP connection to use for this link.</param>
/// <param name="endpoint">The fully qualified endpoint to open the link for.</param>
/// <param name="timeout">The timeout to apply when creating the link.</param>
/// <param name="eventPosition">The position of the event in the partition where the link should be filtered to.</param>
/// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whether an operation was requested.</param>
/// <param name="prefetchSizeInBytes">The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size.</param>
/// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this value should be <c>null</c>.</param>
/// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on the partition is sent as events are received.</param>
/// <param name="timeout">The timeout to apply when creating the link.</param>
/// <param name="linkIdentifier">The identifier to assign to the link; this is assumed to be a non-null value.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>A link for use for operations related to receiving events.</returns>
Expand All @@ -605,6 +621,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpCon
long? prefetchSizeInBytes,
long? ownerLevel,
bool trackLastEnqueuedEventProperties,
string linkIdentifier,
CancellationToken cancellationToken)
{
Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
Expand Down Expand Up @@ -641,7 +658,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpCon
AutoSendFlow = prefetchCount > 0,
SettleType = SettleMode.SettleOnSend,
Source = new Source { Address = endpoint.AbsolutePath, FilterSet = filters },
Target = new Target { Address = Guid.NewGuid().ToString() },
Target = new Target { Address = linkIdentifier },
TotalCacheSizeInBytes = prefetchSizeInBytes
};

Expand All @@ -652,6 +669,11 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpCon
linkSettings.AddProperty(AmqpProperty.ConsumerOwnerLevel, ownerLevel.Value);
}

if (linkIdentifier != null)
{
linkSettings.AddProperty(AmqpProperty.ConsumerIdentifier, linkIdentifier);
}

if (trackLastEnqueuedEventProperties)
{
linkSettings.DesiredCapabilities ??= new Multiple<AmqpSymbol>();
Expand Down Expand Up @@ -705,6 +727,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpCon
/// <param name="features">The set of features which are active for the producer for which the link is being created.</param>
/// <param name="options">The set of options to consider when creating the link.</param>
/// <param name="timeout">The timeout to apply when creating the link.</param>
/// <param name="linkIdentifier">The identifier to assign to the link; this is assumed to be a non-null value.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>A link for use for operations related to receiving events.</returns>
Expand All @@ -714,6 +737,7 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(AmqpConnect
TransportProducerFeatures features,
PartitionPublishingOptions options,
TimeSpan timeout,
string linkIdentifier,
CancellationToken cancellationToken)
{
Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
Expand Down Expand Up @@ -744,7 +768,7 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(AmqpConnect
{
Role = false,
InitialDeliveryCount = 0,
Source = new Source { Address = Guid.NewGuid().ToString() },
Source = new Source { Address = linkIdentifier },
Target = new Target { Address = endpoint.AbsolutePath }
};

Expand Down
Loading

0 comments on commit 0899d47

Please sign in to comment.