Skip to content

Commit

Permalink
Allow clients to report if they own or share the underlying connectio…
Browse files Browse the repository at this point in the history
…n string (Part 1) (Azure#485)

Fixes Azure#482 

- Allow clients to report if they own or share the underlying connection string.
- Remove unnecessary `ownsConnection` field from all clients

This is part 1 out of 2 parts PRs. Follow up PR Azure#486 will be completing this change in the next major release.
  • Loading branch information
SeanFeldman authored and nemakam committed Jun 18, 2018
1 parent 58a8e7c commit e3b1b93
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 31 deletions.
5 changes: 5 additions & 0 deletions src/Microsoft.Azure.ServiceBus/ClientEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ internal set
/// </summary>
public abstract ServiceBusConnection ServiceBusConnection { get; }

/// <summary>
/// Returns true if connection is owned and false if connection is shared.
/// </summary>
public bool OwnsConnection { get; internal set; }

/// <summary>
/// Gets the name of the entity.
/// </summary>
Expand Down
9 changes: 4 additions & 5 deletions src/Microsoft.Azure.ServiceBus/Core/MessageReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class MessageReceiver : ClientEntity, IMessageReceiver
readonly ConcurrentExpiringSet<Guid> requestResponseLockedMessages;
readonly bool isSessionReceiver;
readonly object messageReceivePumpSyncLock;
readonly bool ownsConnection;
readonly ActiveClientLinkManager clientLinkManager;
readonly ServiceBusDiagnosticSource diagnosticSource;

Expand Down Expand Up @@ -99,7 +98,7 @@ public MessageReceiver(
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}

this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -124,7 +123,7 @@ public MessageReceiver(
int prefetchCount = Constants.DefaultClientPrefetchCount)
: this(entityPath, null, receiveMode, new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, null, retryPolicy, prefetchCount)
{
this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -145,7 +144,7 @@ public MessageReceiver(
int prefetchCount = Constants.DefaultClientPrefetchCount)
: this(entityPath, null, receiveMode, serviceBusConnection, null, retryPolicy, prefetchCount)
{
this.ownsConnection = false;
this.OwnsConnection = false;
}

internal MessageReceiver(
Expand Down Expand Up @@ -1009,7 +1008,7 @@ protected override async Task OnClosingAsync()
await this.ReceiveLinkManager.CloseAsync().ConfigureAwait(false);
await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);

if (this.ownsConnection)
if (this.OwnsConnection)
{
await this.ServiceBusConnection.CloseAsync().ConfigureAwait(false);
}
Expand Down
11 changes: 5 additions & 6 deletions src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ namespace Microsoft.Azure.ServiceBus.Core
public class MessageSender : ClientEntity, IMessageSender
{
int deliveryCount;
readonly bool ownsConnection;
readonly ActiveClientLinkManager clientLinkManager;
readonly ServiceBusDiagnosticSource diagnosticSource;
readonly bool isViaSender;
Expand Down Expand Up @@ -74,7 +73,7 @@ public MessageSender(
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}

this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -94,7 +93,7 @@ public MessageSender(
RetryPolicy retryPolicy = null)
: this(entityPath, null, null, new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, null, retryPolicy)
{
this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -109,7 +108,7 @@ public MessageSender(
RetryPolicy retryPolicy = null)
: this(entityPath, null, null, serviceBusConnection, null, retryPolicy)
{
this.ownsConnection = false;
this.OwnsConnection = false;
}

/// <summary>
Expand All @@ -132,7 +131,7 @@ public MessageSender(
RetryPolicy retryPolicy = null)
:this(viaEntityPath, entityPath, null, serviceBusConnection, null, retryPolicy)
{
this.ownsConnection = false;
this.OwnsConnection = false;
}

internal MessageSender(
Expand Down Expand Up @@ -444,7 +443,7 @@ protected override async Task OnClosingAsync()
await this.SendLinkManager.CloseAsync().ConfigureAwait(false);
await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);

if (this.ownsConnection)
if (this.OwnsConnection)
{
await this.ServiceBusConnection.CloseAsync().ConfigureAwait(false);
}
Expand Down
9 changes: 4 additions & 5 deletions src/Microsoft.Azure.ServiceBus/QueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ namespace Microsoft.Azure.ServiceBus
/// It uses AMQP protocol for communicating with servicebus.</remarks>
public class QueueClient : ClientEntity, IQueueClient
{
readonly bool ownsConnection;
readonly object syncLock;

int prefetchCount;
Expand Down Expand Up @@ -91,7 +90,7 @@ public QueueClient(string connectionString, string entityPath, ReceiveMode recei
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}

this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -113,7 +112,7 @@ public QueueClient(
RetryPolicy retryPolicy = null)
: this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, entityPath, receiveMode, retryPolicy)
{
this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -137,7 +136,7 @@ public QueueClient(ServiceBusConnection serviceBusConnection, string entityPath,
this.syncLock = new object();
this.QueueName = entityPath;
this.ReceiveMode = receiveMode;
this.ownsConnection = false;
this.OwnsConnection = false;
if (this.ServiceBusConnection.TokenProvider != null)
{
this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBusConnection.OperationTimeout);
Expand Down Expand Up @@ -535,7 +534,7 @@ protected override async Task OnClosingAsync()
await this.sessionClient.CloseAsync().ConfigureAwait(false);
}

if (this.ownsConnection)
if (this.OwnsConnection)
{
await this.ServiceBusConnection.CloseAsync().ConfigureAwait(false);
}
Expand Down
9 changes: 4 additions & 5 deletions src/Microsoft.Azure.ServiceBus/SessionClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ namespace Microsoft.Azure.ServiceBus
public sealed class SessionClient : ClientEntity, ISessionClient
{
const int DefaultPrefetchCount = 0;
readonly bool ownsConnection;
readonly ServiceBusDiagnosticSource diagnosticSource;

/// <summary>
Expand Down Expand Up @@ -97,7 +96,7 @@ public SessionClient(
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}

this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand Down Expand Up @@ -130,7 +129,7 @@ public SessionClient(
retryPolicy,
null)
{
this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand Down Expand Up @@ -158,7 +157,7 @@ public SessionClient(
retryPolicy,
null)
{
this.ownsConnection = false;
this.OwnsConnection = false;
}

internal SessionClient(
Expand Down Expand Up @@ -397,7 +396,7 @@ public override void UnregisterPlugin(string serviceBusPluginName)

protected override async Task OnClosingAsync()
{
if (this.ownsConnection)
if (this.OwnsConnection)
{
await this.ServiceBusConnection.CloseAsync().ConfigureAwait(false);
}
Expand Down
9 changes: 4 additions & 5 deletions src/Microsoft.Azure.ServiceBus/SubscriptionClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class SubscriptionClient : ClientEntity, ISubscriptionClient
{
int prefetchCount;
readonly object syncLock;
readonly bool ownsConnection;
readonly ServiceBusDiagnosticSource diagnosticSource;

IInnerSubscriptionClient innerSubscriptionClient;
Expand Down Expand Up @@ -86,7 +85,7 @@ public SubscriptionClient(string connectionString, string topicPath, string subs
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}

this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -110,7 +109,7 @@ public SubscriptionClient(
RetryPolicy retryPolicy = null)
: this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, topicPath, subscriptionName, receiveMode, retryPolicy)
{
this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand Down Expand Up @@ -142,7 +141,7 @@ public SubscriptionClient(ServiceBusConnection serviceBusConnection, string topi
this.Path = EntityNameHelper.FormatSubscriptionPath(this.TopicPath, this.SubscriptionName);
this.ReceiveMode = receiveMode;
this.diagnosticSource = new ServiceBusDiagnosticSource(this.Path, serviceBusConnection.Endpoint);
this.ownsConnection = false;
this.OwnsConnection = false;
if (this.ServiceBusConnection.TokenProvider != null)
{
this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBusConnection.OperationTimeout);
Expand Down Expand Up @@ -637,7 +636,7 @@ protected override async Task OnClosingAsync()
await this.sessionClient.CloseAsync().ConfigureAwait(false);
}

if (this.ownsConnection)
if (this.OwnsConnection)
{
await this.ServiceBusConnection.CloseAsync().ConfigureAwait(false);
}
Expand Down
9 changes: 4 additions & 5 deletions src/Microsoft.Azure.ServiceBus/TopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ namespace Microsoft.Azure.ServiceBus
/// <remarks>It uses AMQP protocol for communicating with servicebus.</remarks>
public class TopicClient : ClientEntity, ITopicClient
{
readonly bool ownsConnection;
readonly object syncLock;
MessageSender innerSender;

Expand Down Expand Up @@ -61,7 +60,7 @@ public TopicClient(string connectionString, string entityPath, RetryPolicy retry
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}

this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -81,7 +80,7 @@ public TopicClient(
RetryPolicy retryPolicy = null)
: this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, entityPath, retryPolicy)
{
this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -102,7 +101,7 @@ public TopicClient(ServiceBusConnection serviceBusConnection, string entityPath,
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.syncLock = new object();
this.TopicName = entityPath;
this.ownsConnection = false;
this.OwnsConnection = false;
if (this.ServiceBusConnection.TokenProvider != null)
{
this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBusConnection.OperationTimeout);
Expand Down Expand Up @@ -236,7 +235,7 @@ protected override async Task OnClosingAsync()
await this.innerSender.CloseAsync().ConfigureAwait(false);
}

if (this.ownsConnection)
if (this.OwnsConnection)
{
await this.ServiceBusConnection.CloseAsync().ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Microsoft.Azure.ServiceBus
public string ClientId { get; }
public bool IsClosedOrClosing { get; }
public abstract System.TimeSpan OperationTimeout { get; set; }
public bool OwnsConnection { get; }
public abstract string Path { get; }
public abstract System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public Microsoft.Azure.ServiceBus.RetryPolicy RetryPolicy { get; }
Expand Down

0 comments on commit e3b1b93

Please sign in to comment.