Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging clean up #12222

Merged
merged 2 commits into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ public virtual async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(
/// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults to PeekLock mode.</param>
/// <param name="sessionId"></param>
/// <param name="isSessionReceiver"></param>
/// <param name="identifier">The identifier for the receive link.</param>
/// <param name="timeout">The timeout to apply when creating the link.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
Expand All @@ -284,44 +283,34 @@ public virtual async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
ReceiveMode receiveMode,
string sessionId,
bool isSessionReceiver,
string identifier,
CancellationToken cancellationToken)
{
ServiceBusEventSource.Log.CreateReceiveLinkStart(identifier);
try
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var stopWatch = ValueStopwatch.StartNew();
var receiverEndpoint = new Uri(ServiceEndpoint, entityPath);
var stopWatch = ValueStopwatch.StartNew();
var receiverEndpoint = new Uri(ServiceEndpoint, entityPath);

var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

ReceivingAmqpLink link = await CreateReceivingLinkAsync(
entityPath,
connection,
receiverEndpoint,
timeout.CalculateRemaining(stopWatch.GetElapsedTime()),
prefetchCount,
receiveMode,
sessionId,
isSessionReceiver,
cancellationToken
).ConfigureAwait(false);
ReceivingAmqpLink link = await CreateReceivingLinkAsync(
entityPath,
connection,
receiverEndpoint,
timeout.CalculateRemaining(stopWatch.GetElapsedTime()),
prefetchCount,
receiveMode,
sessionId,
isSessionReceiver,
cancellationToken
).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
return link;

await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
ServiceBusEventSource.Log.CreateReceiveLinkComplete(identifier);
return link;
}
catch (Exception ex)
{
ServiceBusEventSource.Log.CreateReceiveLinkException(identifier, ex.ToString());
throw;
}
}

/// <summary>
Expand Down Expand Up @@ -910,28 +899,30 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler(
refreshTimeout)
.ConfigureAwait(false);

// Reset the timer for the next refresh.
// Reset the timer for the next refresh.

if (authExpirationUtc >= DateTimeOffset.UtcNow)
if (authExpirationUtc >= DateTimeOffset.UtcNow)
{
refreshTimer.Change(CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan);
}
}
catch (ObjectDisposedException)
{
// This can occur if the connection is closed or the scope disposed after the factory
// is called but before the timer is updated. The callback may also fire while the timer is
// in the act of disposing. Do not consider it an error.
}
// This can occur if the connection is closed or the scope disposed after the factory
// is called but before the timer is updated. The callback may also fire while the timer is
// in the act of disposing. Do not consider it an error.
}
catch (Exception ex)
{
ServiceBusEventSource.Log.AmqpLinkAuthorizationRefreshError(entityPath, endpoint.AbsoluteUri, ex.Message);

// Attempt to unset the timer; there's a decent chance that it has been disposed at this point or
// that the connection has been closed. Ignore potential exceptions, as they won't impact operation.
// At worse, another timer tick will occur and the operation will be retried.
// Attempt to unset the timer; there's a decent chance that it has been disposed at this point or
// that the connection has been closed. Ignore potential exceptions, as they won't impact operation.
// At worse, another timer tick will occur and the operation will be retried.

try { refreshTimer.Change(Timeout.Infinite, Timeout.Infinite); } catch {}
try
{ refreshTimer.Change(Timeout.Infinite, Timeout.Infinite); }
catch { }
}
finally
{
Expand Down
36 changes: 31 additions & 5 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,12 @@ public AmqpReceiver(

_receiveLink = new FaultTolerantAmqpObject<ReceivingAmqpLink>(
timeout =>
_connectionScope.OpenReceiverLinkAsync(
entityPath: _entityPath,
OpenReceiverLinkAsync(
timeout: timeout,
prefetchCount: prefetchCount,
receiveMode: receiveMode,
sessionId: sessionId,
isSessionReceiver: isSessionReceiver,
identifier: _identifier,
cancellationToken: CancellationToken.None),
isSessionReceiver: isSessionReceiver),
link => CloseLink(link));

_managementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
Expand All @@ -155,6 +152,35 @@ public AmqpReceiver(
link => CloseLink(link));
}

private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
TimeSpan timeout,
uint prefetchCount,
ReceiveMode receiveMode,
string sessionId,
bool isSessionReceiver)
{
ServiceBusEventSource.Log.CreateReceiveLinkStart(_identifier);

try
{
ReceivingAmqpLink link = await _connectionScope.OpenReceiverLinkAsync(
entityPath: _entityPath,
timeout: timeout,
prefetchCount: prefetchCount,
receiveMode: receiveMode,
sessionId: sessionId,
isSessionReceiver: isSessionReceiver,
cancellationToken: CancellationToken.None).ConfigureAwait(false);
ServiceBusEventSource.Log.CreateReceiveLinkComplete(_identifier);
return link;
}
catch (Exception ex)
{
ServiceBusEventSource.Log.CreateReceiveLinkException(_identifier, ex.ToString());
throw;
}
}

private void CloseLink(ReceivingAmqpLink link)
{
link.Session?.SafeClose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ internal ServiceBusEventSource() { }
internal const int AmqpLinkRefreshStartEvent = 39;
internal const int AmqpLinkRefreshCompleteEvent = 40;
internal const int AmqpLinkRefreshExceptionEvent = 41;

internal const int ManagementSerializedExceptionEvent = 42;
internal const int RunOperationExceptionEvent = 43;

internal const int ClientDisposeStartEvent = 44;
Expand Down Expand Up @@ -170,8 +172,6 @@ internal ServiceBusEventSource() { }

internal const int ProcessorErrorHandlerThrewExceptionEvent = 94;
internal const int ScheduleTaskFailedEvent = 95;
internal const int ManagementSerializedExceptionEvent = 96;


#endregion
// add new event numbers here incrementing from previous
Expand Down Expand Up @@ -724,8 +724,6 @@ public void ProcessorErrorHandlerThrewException(string exception)
}
}



#endregion region

#region Rule management
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ public void ReceiveAsyncAppliesTheRetryPolicy(ServiceBusRetryOptions retryOption
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Throws(retriableException);

Expand All @@ -180,7 +179,6 @@ public void ReceiveAsyncAppliesTheRetryPolicy(ServiceBusRetryOptions retryOption
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()),
Times.Exactly(1 + retryOptions.MaxRetries));
}
Expand Down Expand Up @@ -218,7 +216,6 @@ public void ReceiveAsyncConsidersOperationCanceledExceptionAsRetriable(ServiceBu
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Throws(retriableException);

Expand All @@ -236,7 +233,6 @@ public void ReceiveAsyncConsidersOperationCanceledExceptionAsRetriable(ServiceBu
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()),
Times.Exactly(1 + retryOptions.MaxRetries));
}
Expand Down Expand Up @@ -282,7 +278,6 @@ public void ReceiveAsyncAppliesTheRetryPolicyForAmqpErrors(ServiceBusRetryOption
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Throws(retriableException);

Expand All @@ -300,7 +295,6 @@ public void ReceiveAsyncAppliesTheRetryPolicyForAmqpErrors(ServiceBusRetryOption
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()),
Times.Exactly(1 + retryOptions.MaxRetries));

Expand Down Expand Up @@ -342,7 +336,6 @@ public void ReceiveAsyncDetectsAnEmbeddedErrorForOperationCanceled()
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Throws(exception);

Expand All @@ -360,7 +353,6 @@ public void ReceiveAsyncDetectsAnEmbeddedErrorForOperationCanceled()
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()),
Times.Once());
}
Expand Down Expand Up @@ -398,7 +390,6 @@ public void ReceiveAsyncDetectsAnEmbeddedAmqpErrorForOperationCanceled()
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Throws(exception);

Expand All @@ -416,7 +407,6 @@ public void ReceiveAsyncDetectsAnEmbeddedAmqpErrorForOperationCanceled()
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()),
Times.Once());
}
Expand Down Expand Up @@ -453,7 +443,6 @@ public void ReceiveAsyncDoesntRetryOnTaskCanceled()
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Throws(exception);

Expand All @@ -471,7 +460,6 @@ public void ReceiveAsyncDoesntRetryOnTaskCanceled()
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()),
Times.Once());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ public async Task LogsTransactionEvents()
await sender.SendAsync(message);
ts.Complete();
}
// Adding delay since transaction Commit/Rollback is an asynchronous operation.
await Task.Delay(TimeSpan.FromSeconds(2));
_listener.SingleEventById(ServiceBusEventSource.TransactionDeclaredEvent);
_listener.SingleEventById(ServiceBusEventSource.TransactionDischargedEvent);
};
Expand Down