Skip to content

Commit

Permalink
Exceptions and Event Source Logging (#10460)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLove-msft authored Mar 12, 2020
1 parent 44e3a88 commit 33b7e31
Show file tree
Hide file tree
Showing 50 changed files with 3,201 additions and 2,198 deletions.
61 changes: 18 additions & 43 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Azure.Core;
using Azure.Messaging.ServiceBus.Authorization;
using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Diagnostics;
using Azure.Messaging.ServiceBus.Primitives;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
Expand Down Expand Up @@ -73,7 +74,7 @@ internal class AmqpClient : TransportClient
///
/// <param name="host">The fully qualified host name for the Service Bus namespace. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="credential">The Azure managed identity credential to use for authorization. Access controls may be specified by the Service Bus namespace or the requested Service Bus entity, depending on Azure configuration.</param>
/// <param name="clientOptions">A set of options to apply when configuring the client.</param>
/// <param name="options">A set of options to apply when configuring the client.</param>
///
/// <remarks>
/// As an internal type, this class performs only basic sanity checks against its arguments. It
Expand All @@ -84,58 +85,29 @@ internal class AmqpClient : TransportClient
/// caller.
/// </remarks>
///
public AmqpClient(string host,
ServiceBusTokenCredential credential,
ServiceBusClientOptions clientOptions) : this(host, credential, clientOptions, null)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="AmqpClient"/> class.
/// </summary>
///
/// <param name="host">The fully qualified host name for the Service Bus namespace. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="credential">The Azure managed identity credential to use for authorization. Access controls may be specified by the Service Bus namespace or the requested Service Bus entity, depending on Azure configuration.</param>
/// <param name="clientOptions">A set of options to apply when configuring the client.</param>
/// <param name="connectionScope">The optional scope to use for AMQP connection management. If <c>null</c>, a new scope will be created.</param>
///
/// <remarks>
/// As an internal type, this class performs only basic sanity checks against its arguments. It
/// is assumed that callers are trusted and have performed deep validation.
///
/// Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
/// creation of clones or otherwise protecting the parameters is assumed to be the purview of the
/// caller.
/// </remarks>
///
protected AmqpClient(
internal AmqpClient(
string host,
ServiceBusTokenCredential credential,
ServiceBusClientOptions clientOptions,
AmqpConnectionScope connectionScope)
ServiceBusClientOptions options)
{
Argument.AssertNotNullOrEmpty(host, nameof(host));
Argument.AssertNotNull(credential, nameof(credential));
Argument.AssertNotNull(clientOptions, nameof(clientOptions));
Argument.AssertNotNull(options, nameof(options));

try
ServiceEndpoint = new UriBuilder
{
//TODO add event ServiceBusEventSource.Log.ClientCreateStart(host, entityName);
Scheme = options.TransportType.GetUriScheme(),
Host = host

ServiceEndpoint = new UriBuilder
{
Scheme = clientOptions.TransportType.GetUriScheme(),
Host = host
}.Uri;

}.Uri;
Credential = credential;
ConnectionScope = new AmqpConnectionScope(
ServiceEndpoint,
credential,
options.TransportType,
options.Proxy);

Credential = credential;
ConnectionScope = connectionScope ?? new AmqpConnectionScope(ServiceEndpoint, credential, clientOptions.TransportType, clientOptions.Proxy);
}
finally
{
// TODO add event ServiceBusEventSource.Log.ServiceBusClientCreateComplete(host, entityName);
}
}

/// <summary>
Expand Down Expand Up @@ -169,6 +141,7 @@ public override TransportSender CreateSender(string entityName, ServiceBusRetryP
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
/// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults to PeekLock mode.</param>
/// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whether an operation was requested. If <c>null</c> a default will be used.</param>
/// <param name="identifier"></param>
/// <param name="sessionId"></param>
/// <param name="isSessionReceiver"></param>
///
Expand All @@ -179,6 +152,7 @@ public override TransportReceiver CreateReceiver(
ServiceBusRetryPolicy retryPolicy,
ReceiveMode receiveMode,
uint prefetchCount,
string identifier,
string sessionId,
bool isSessionReceiver)
{
Expand All @@ -191,6 +165,7 @@ public override TransportReceiver CreateReceiver(
prefetchCount,
ConnectionScope,
retryPolicy,
identifier,
sessionId,
isSessionReceiver
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;

namespace Azure.Messaging.ServiceBus.Amqp
{
using System;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;

internal class AmqpClientConstants
{
// AMQP Management Operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using Azure.Messaging.ServiceBus.Diagnostics;
using Azure.Messaging.ServiceBus.Primitives;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.Amqp.Sasl;
using Microsoft.Azure.Amqp.Transport;
Expand Down Expand Up @@ -78,13 +77,6 @@ internal class AmqpConnectionScope : TransportConnectionScope
///
private static TimeSpan AuthorizationRefreshTimeout { get; } = TimeSpan.FromMinutes(3);

/// <summary>
/// The recommended timeout to associate with an AMQP session. It is recommended that this
/// interval be used when creating or opening AMQP links and related constructs.
/// </summary>
///
public override TimeSpan SessionTimeout { get; } = TimeSpan.FromSeconds(30);

/// <summary>
/// Indicates whether this <see cref="AmqpConnectionScope"/> has been disposed.
/// </summary>
Expand Down Expand Up @@ -251,7 +243,7 @@ public virtual async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var link = await CreateReceivingLinkAsync(
ReceivingAmqpLink link = await CreateReceivingLinkAsync(
entityName,
connection,
consumerEndpoint,
Expand Down Expand Up @@ -292,10 +284,10 @@ public virtual async Task<SendingAmqpLink> OpenSenderLinkAsync(
var stopWatch = Stopwatch.StartNew();
var producerEndpoint = new Uri(ServiceEndpoint, entityName);

var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
AmqpConnection connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var link = await CreateSendingLinkAsync(
SendingAmqpLink link = await CreateSendingLinkAsync(
entityName,
connection,
producerEndpoint,
Expand Down Expand Up @@ -437,14 +429,18 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(

var authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, TokenProvider, ServiceEndpoint, endpointUri.AbsoluteUri, endpointUri.AbsoluteUri, claims, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);

var link = new RequestResponseAmqpLink(/*AmqpManagement.LinkType*/"entity-mgmt", session, entityPath, linkSettings.Properties);
var link = new RequestResponseAmqpLink(
AmqpClientConstants.EntityTypeManagement,
session,
entityPath,
linkSettings.Properties);
linkSettings.LinkName = $"{connection.Settings.ContainerId};{connection.Identifier}:{session.Identifier}:{link.Identifier}";
stopWatch.Stop();

// Track the link before returning it, so that it can be managed with the scope.
var refreshTimer = default(Timer);

var refreshHandler = CreateAuthorizationRefreshHandler
TimerCallback refreshHandler = CreateAuthorizationRefreshHandler
(
entityName,
connection,
Expand Down Expand Up @@ -535,7 +531,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(
var linkSettings = new AmqpLinkSettings
{
Role = true,
TotalLinkCredit = (uint) prefetchCount,
TotalLinkCredit = prefetchCount,
AutoSendFlow = prefetchCount > 0,
SettleType = (receiveMode == ReceiveMode.PeekLock) ? SettleMode.SettleOnDispose : SettleMode.SettleOnSend,
Source = new Source { Address = endpoint.AbsolutePath, FilterSet = filters },
Expand All @@ -553,7 +549,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(

var refreshTimer = default(Timer);

var refreshHandler = CreateAuthorizationRefreshHandler
TimerCallback refreshHandler = CreateAuthorizationRefreshHandler
(
entityName,
connection,
Expand Down Expand Up @@ -647,7 +643,7 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(

var refreshTimer = default(Timer);

var refreshHandler = CreateAuthorizationRefreshHandler
TimerCallback refreshHandler = CreateAuthorizationRefreshHandler
(
entityName,
connection,
Expand Down Expand Up @@ -780,7 +776,7 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler(
return async _ =>
{
ServiceBusEventSource.Log.AmqpLinkAuthorizationRefreshStart(entityName, endpoint.AbsoluteUri);
var refreshTimer = refreshTimerFactory();
Timer refreshTimer = refreshTimerFactory();

try
{
Expand All @@ -789,7 +785,7 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler(
return;
}

var authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, tokenProvider, endpoint, audience, resource, requiredClaims, refreshTimeout).ConfigureAwait(false);
DateTime authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, tokenProvider, endpoint, audience, resource, requiredClaims, refreshTimeout).ConfigureAwait(false);

// Reset the timer for the next refresh.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static Exception CreateExceptionForResponse(
///
/// <returns>The exception that most accurately represents the error that was encountered.</returns>
///
public static Exception CreateExceptionForError(
private static Exception CreateExceptionForError(
Error error,
string serviceBusResource)
{
Expand Down
Loading

0 comments on commit 33b7e31

Please sign in to comment.