Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Connected/Disconnected events #28379

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ public ServiceBusClient(string fullyQualifiedNamespace, Azure.Core.TokenCredenti
public ServiceBusClient(string connectionString, Azure.Messaging.ServiceBus.ServiceBusClientOptions options) { }
public virtual string FullyQualifiedNamespace { get { throw null; } }
public virtual bool IsClosed { get { throw null; } }
public virtual bool IsConnected { get { throw null; } }
public Azure.Messaging.ServiceBus.ServiceBusTransportType TransportType { get { throw null; } }
public event System.Func<Azure.Messaging.ServiceBus.ServiceBusConnectionEventArgs, System.Threading.Tasks.Task> ConnectedAsync { add { } remove { } }
public event System.Func<Azure.Messaging.ServiceBus.ServiceBusConnectionEventArgs, System.Threading.Tasks.Task> DisconnectedAsync { add { } remove { } }
public virtual System.Threading.Tasks.Task<Azure.Messaging.ServiceBus.ServiceBusSessionReceiver> AcceptNextSessionAsync(string queueName, Azure.Messaging.ServiceBus.ServiceBusSessionReceiverOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Messaging.ServiceBus.ServiceBusSessionReceiver> AcceptNextSessionAsync(string topicName, string subscriptionName, Azure.Messaging.ServiceBus.ServiceBusSessionReceiverOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Messaging.ServiceBus.ServiceBusSessionReceiver> AcceptSessionAsync(string queueName, string sessionId, Azure.Messaging.ServiceBus.ServiceBusSessionReceiverOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand All @@ -92,6 +95,8 @@ public ServiceBusClient(string connectionString, Azure.Messaging.ServiceBus.Serv
public virtual Azure.Messaging.ServiceBus.ServiceBusSessionProcessor CreateSessionProcessor(string topicName, string subscriptionName, Azure.Messaging.ServiceBus.ServiceBusSessionProcessorOptions options = null) { throw null; }
public virtual System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusTransportMetrics GetTransportMetrics() { throw null; }
protected internal virtual System.Threading.Tasks.Task OnConnectedAsync(Azure.Messaging.ServiceBus.ServiceBusConnectionEventArgs args) { throw null; }
protected internal virtual System.Threading.Tasks.Task OnDisconnectedAsync(Azure.Messaging.ServiceBus.ServiceBusConnectionEventArgs args) { throw null; }
}
public partial class ServiceBusClientOptions
{
Expand All @@ -108,6 +113,13 @@ public ServiceBusClientOptions() { }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
}
public partial class ServiceBusConnectionEventArgs : System.EventArgs
{
public ServiceBusConnectionEventArgs(string fullyQualifiedNamespace, Azure.Messaging.ServiceBus.ServiceBusTransportType transportType, System.Net.IWebProxy proxy) { }
public string FullyQualifiedNamespace { get { throw null; } }
public System.Net.IWebProxy Proxy { get { throw null; } }
public Azure.Messaging.ServiceBus.ServiceBusTransportType TransportType { get { throw null; } }
}
public partial class ServiceBusConnectionStringProperties
{
public ServiceBusConnectionStringProperties() { }
Expand Down
37 changes: 25 additions & 12 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Azure.Core;
using Azure.Messaging.ServiceBus.Authorization;
using Azure.Messaging.ServiceBus.Core;
using Microsoft.Azure.Amqp;

namespace Azure.Messaging.ServiceBus.Amqp
{
Expand Down Expand Up @@ -43,11 +44,7 @@ internal class AmqpClient : TransportClient
///
public override bool IsClosed => _closed;

/// <summary>
/// The endpoint for the Service Bus service to which the client is associated.
/// </summary>
///
public override Uri ServiceEndpoint { get; }
public override bool IsConnected => ConnectionScope.ActiveConnection.TryGetOpenedObject(out AmqpConnection _);

/// <summary>
/// Gets the credential to use for authorization with the Service Bus service.
Expand All @@ -63,6 +60,27 @@ internal class AmqpClient : TransportClient

public override ServiceBusTransportMetrics TransportMetrics { get; }

/// <summary>
/// An event that can be subscribed to for notification when the client connects to the service.
/// </summary>
public override event Func<ServiceBusConnectionEventArgs, Task> ConnectedAsync
{
add => ConnectionScope.ConnectedAsync += value;
remove => ConnectionScope.ConnectedAsync -= value;
}

/// <summary>
/// An event that can be subscribed to for notification when the client disconnects from the service.
/// No action is required when the client temporarily disconnects due to a transient network or service issue, as the client will attempt
/// to re-establish the connection automatically on the next operation. If <see cref="ServiceBusClient.DisposeAsync"/> is
/// called, then the connection will not be re-established.
/// </summary>
public override event Func<ServiceBusConnectionEventArgs, Task> DisconnectedAsync
{
add => ConnectionScope.DisconnectedAsync += value;
remove => ConnectionScope.DisconnectedAsync -= value;
}

/// <summary>
/// Initializes a new instance of the <see cref="AmqpClient"/> class.
/// </summary>
Expand All @@ -87,21 +105,16 @@ internal AmqpClient(
{
Argument.AssertNotNullOrEmpty(host, nameof(host));
Argument.AssertNotNull(credential, nameof(credential));
Argument.AssertNotNull(options, nameof(options));

ServiceEndpoint = new UriBuilder
{
Scheme = options.TransportType.GetUriScheme(),
Host = host
}.Uri;
Argument.AssertNotNull(options, nameof(options));

Credential = credential;
if (options.EnableTransportMetrics)
{
TransportMetrics = new ServiceBusTransportMetrics();
}
ConnectionScope = new AmqpConnectionScope(
ServiceEndpoint,
host,
credential,
options.TransportType,
options.WebProxy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,15 @@ public override bool IsDisposed
/// <summary>
/// The AMQP connection that is active for the current scope.
/// </summary>
private FaultTolerantAmqpObject<AmqpConnection> ActiveConnection { get; }
internal FaultTolerantAmqpObject<AmqpConnection> ActiveConnection { get; }

/// <summary>
/// The controller responsible for managing transactions.
/// </summary>
internal FaultTolerantAmqpObject<Controller> TransactionController { get; }

private readonly string _fullyQualifiedNamespace;

private readonly bool _useSingleSession;

private readonly FaultTolerantAmqpObject<AmqpSession> _singletonSession;
Expand All @@ -169,33 +171,43 @@ public override bool IsDisposed
private readonly object _syncLock = new();
private readonly TimeSpan _operationTimeout;

public event Func<ServiceBusConnectionEventArgs, Task> ConnectedAsync;
public event Func<ServiceBusConnectionEventArgs, Task> DisconnectedAsync;

/// <summary>
/// Initializes a new instance of the <see cref="AmqpConnectionScope"/> class.
/// </summary>
/// <param name="serviceEndpoint">Endpoint for the Service Bus service to which the scope is associated.</param>
/// <param name="fullyQualifiedNamespace">Endpoint for the Service Bus service to which the scope is associated.</param>
/// <param name="credential">The credential to use for authorization with the Service Bus service.</param>
/// <param name="transport">The transport to use for communication.</param>
/// <param name="proxy">The proxy, if any, to use for communication.</param>
/// <param name="useSingleSession">If true, all links will use a single session.</param>
/// <param name="operationTimeout">The timeout for operations associated with the connection.</param>
/// <param name="metrics">The metrics instance to populate transport metrics. May be null.</param>
public AmqpConnectionScope(
Uri serviceEndpoint,
string fullyQualifiedNamespace,
ServiceBusTokenCredential credential,
ServiceBusTransportType transport,
IWebProxy proxy,
bool useSingleSession,
TimeSpan operationTimeout,
ServiceBusTransportMetrics metrics)
{
Argument.AssertNotNull(serviceEndpoint, nameof(serviceEndpoint));
Argument.AssertNotNull(fullyQualifiedNamespace, nameof(fullyQualifiedNamespace));
Argument.AssertNotNull(credential, nameof(credential));

ValidateTransport(transport);

_operationTimeout = operationTimeout;
ServiceEndpoint = serviceEndpoint;
_fullyQualifiedNamespace = fullyQualifiedNamespace;
Transport = transport;
ServiceEndpoint = new UriBuilder
{
Scheme = Transport.GetUriScheme(),
Host = _fullyQualifiedNamespace
}.Uri;
Proxy = proxy;

Id = $"{ ServiceEndpoint }-{ Guid.NewGuid().ToString("D", CultureInfo.InvariantCulture).Substring(0, 8) }";
TokenProvider = new CbsTokenProvider(new ServiceBusTokenCredential(credential), AuthorizationTokenExpirationBuffer, OperationCancellationSource.Token);
_useSingleSession = useSingleSession;
Expand Down Expand Up @@ -483,18 +495,20 @@ protected virtual async Task<AmqpConnection> CreateAndOpenConnectionAsync(
// When the connection is closed, close each of the links associated with it.

EventHandler closeHandler = null;

closeHandler = (snd, args) =>
var connectionArgs = new ServiceBusConnectionEventArgs(_fullyQualifiedNamespace, transportType, proxy);
closeHandler = async (snd, args) =>
{
foreach (var link in ActiveLinks.Keys)
{
link.SafeClose();
}

// async void is okay in EventHandlers
await DisconnectedAsync(connectionArgs).ConfigureAwait(false);
connection.Closed -= closeHandler;
};

connection.Closed += closeHandler;
await ConnectedAsync(connectionArgs).ConfigureAwait(false);
return connection;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License.

using System;
using System.Data;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -25,10 +26,9 @@ public class ServiceBusClient : IAsyncDisposable
private volatile bool _closed;

/// <summary>
/// The fully qualified Service Bus namespace that the connection is associated with. This is likely
/// The fully qualified Service Bus namespace that the connection is associated with. This is likely
/// to be similar to <c>{yournamespace}.servicebus.windows.net</c>.
/// </summary>
///
public virtual string FullyQualifiedNamespace => Connection.FullyQualifiedNamespace;

/// <summary>
Expand All @@ -44,6 +44,67 @@ public virtual bool IsClosed
private set => _closed = value;
}

/// <summary>
/// An event that can be subscribed to for notification when the client connects to the service.
/// </summary>
#pragma warning disable AZC0002
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Comments for what these warnings are would be helpful.

#pragma warning disable AZC0003
public event Func<ServiceBusConnectionEventArgs, Task> ConnectedAsync;
#pragma warning restore AZC0003
#pragma warning restore AZC0002

/// <summary>
/// An event that can be subscribed to for notification when the client disconnects from the service.
/// No action is required when the client temporarily disconnects due to a transient network or service issue, as the client will attempt
/// to re-establish the connection automatically on the next operation. If <see cref="DisposeAsync"/> is
/// called, then the connection will not be re-established.
/// </summary>
#pragma warning disable AZC0003
#pragma warning disable AZC0002
public event Func<ServiceBusConnectionEventArgs, Task> DisconnectedAsync;
#pragma warning restore AZC0002
#pragma warning restore AZC0003

/// <summary>
/// Gets whether or not the client is currently connected to the service. No action is required when the client temporarily
/// disconnects due to a transient network or service issue, as the client will attempt to re-establish the connection
/// automatically on the next operation. As such, it is not recommended that this property is checked before performing operations with the <see cref="ServiceBusClient"/>
/// or any of the other Service Bus types. This property should be used only for diagnostic information.
/// </summary>
public virtual bool IsConnected => Connection.IsConnected;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also consider noting that this property does not indicate that read/publish operations are actively taking place.


/// <summary>
/// Invokes the <see cref="ConnectedAsync"/> event handler when the client connects to the service.
/// This method can be overridden to raise an event manually for testing purposes.
/// </summary>
/// <param name="args">The event args containing information related to the connection.</param>
protected virtual async Task OnConnectedAsync(ServiceBusConnectionEventArgs args)
{
var handler = ConnectedAsync;
if (handler != null)
{
await handler(args).ConfigureAwait(false);
}
}

/// <summary>
/// Invokes the <see cref="DisconnectedAsync"/> event handler when the client disconnects from the service.
/// This method can be overridden to raise an event manually for testing purposes.
/// </summary>
/// <param name="args">The event args containing information related to the connection.</param>
protected virtual async Task OnDisconnectedAsync(ServiceBusConnectionEventArgs args)
{
var handler = DisconnectedAsync;
if (handler != null)
{
await handler(args).ConfigureAwait(false);
}
}

private async Task DisconnectHandlerAsync(ServiceBusConnectionEventArgs args) => await OnDisconnectedAsync(args).ConfigureAwait(false);

private async Task ConnectedHandlerAsync(ServiceBusConnectionEventArgs args) => await OnConnectedAsync(args).ConfigureAwait(false);

/// <summary>
/// The transport type used for this <see cref="ServiceBusClient"/>.
/// </summary>
Expand Down Expand Up @@ -153,6 +214,8 @@ public ServiceBusClient(string connectionString, ServiceBusClientOptions options
Logger.ClientCreateStart(typeof(ServiceBusClient), FullyQualifiedNamespace);
Identifier = DiagnosticUtilities.GenerateIdentifier(FullyQualifiedNamespace);
TransportType = _options.TransportType;
Connection.InnerClient.ConnectedAsync += ConnectedHandlerAsync;
Connection.InnerClient.DisconnectedAsync += DisconnectHandlerAsync;
Logger.ClientCreateComplete(typeof(ServiceBusClient), Identifier);
}

Expand Down Expand Up @@ -237,6 +300,8 @@ private ServiceBusClient(
credential,
_options);
TransportType = _options.TransportType;
Connection.InnerClient.ConnectedAsync += ConnectedHandlerAsync;
Connection.InnerClient.DisconnectedAsync += DisconnectHandlerAsync;
Logger.ClientCreateComplete(typeof(ServiceBusClient), Identifier);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Net;

namespace Azure.Messaging.ServiceBus
{
/// <summary>
/// The event args specified when the state of the connection with the service changes.
/// </summary>
public class ServiceBusConnectionEventArgs : EventArgs
{
/// <inheritdoc cref="ServiceBusClient.FullyQualifiedNamespace"/>
public string FullyQualifiedNamespace { get; }

/// <summary>
/// The proxy being used when communicating with the service.
/// </summary>
public IWebProxy Proxy { get; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proxy and TransportType are interesting additions. What's the scenario where they'd be useful? I'd see the client identifier being more helpful to differentiate between multiple instances.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I considered that but was thinking it might be better to wait until we actually expose Identifier on the client. Mostly, I just wanted to add the event args type now so that we can evolve it in the future.


/// <inheritdoc cref="ServiceBusClient.TransportType"/>
public ServiceBusTransportType TransportType { get; }

/// <summary>
/// Initializes a new instance of <see cref="ServiceBusConnectionEventArgs"/>.
/// </summary>
///
/// <param name="fullyQualifiedNamespace">The fully qualified namespace.</param>
/// <param name="transportType">The transport type in use when communicating with the service.</param>
/// <param name="proxy">The proxy being used when communicating with the service.</param>
public ServiceBusConnectionEventArgs(string fullyQualifiedNamespace, ServiceBusTransportType transportType, IWebProxy proxy)
{
FullyQualifiedNamespace = fullyQualifiedNamespace;
TransportType = transportType;
Proxy = proxy;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Amqp;
using Azure.Messaging.ServiceBus.Primitives;

namespace Azure.Messaging.ServiceBus.Core
{
Expand All @@ -29,6 +26,16 @@ internal abstract class TransportClient : IAsyncDisposable
///
public virtual bool IsClosed { get; }

/// <summary>
/// Indicates whether or not this client is currently connected to the service endpoint.
/// </summary>
///
/// <value>
/// <c>true</c> if the client is closed; otherwise, <c>false</c>.
/// </value>
///
public virtual bool IsConnected { get; }

/// <summary>
/// The endpoint for the Service Bus service to which the client is associated.
/// </summary>
Expand All @@ -40,6 +47,19 @@ internal abstract class TransportClient : IAsyncDisposable
/// </summary>
public virtual ServiceBusTransportMetrics TransportMetrics { get; }

/// <summary>
/// An event that can be subscribed to for notification when the client connects to the service.
/// </summary>
public abstract event Func<ServiceBusConnectionEventArgs, Task> ConnectedAsync;

/// <summary>
/// An event that can be subscribed to for notification when the client disconnects from the service.
/// No action is required when the client temporarily disconnects due to a transient network or service issue, as the client will attempt
/// to re-establish the connection automatically on the next operation. If <see cref="DisposeAsync"/> is
/// called, then the connection will not be re-established.
/// </summary>
public abstract event Func<ServiceBusConnectionEventArgs, Task> DisconnectedAsync;

/// <summary>
/// Creates a sender strongly aligned with the active protocol and transport,
/// responsible for sending <see cref="ServiceBusMessage" /> to the entity.
Expand Down
Loading