Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Allow clients to report if they own or share the underlying connectio…
Browse files Browse the repository at this point in the history
…n string (Part 1) (#485)

Fixes #482 

- Allow clients to report if they own or share the underlying connection string.
- Remove unnecessary `ownsConnection` field from all clients

This is part 1 out of 2 parts PRs. Follow up PR #486 will be completing this change in the next major release.
  • Loading branch information
SeanFeldman authored and nemakam committed Jun 18, 2018
1 parent 58a8e7c commit e3b1b93
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 31 deletions.
5 changes: 5 additions & 0 deletions src/Microsoft.Azure.ServiceBus/ClientEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ internal set
/// </summary>
public abstract ServiceBusConnection ServiceBusConnection { get; }

/// <summary>
/// Returns true if connection is owned and false if connection is shared.
/// </summary>
public bool OwnsConnection { get; internal set; }

/// <summary>
/// Gets the name of the entity.
/// </summary>
Expand Down
9 changes: 4 additions & 5 deletions src/Microsoft.Azure.ServiceBus/Core/MessageReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class MessageReceiver : ClientEntity, IMessageReceiver
readonly ConcurrentExpiringSet<Guid> requestResponseLockedMessages;
readonly bool isSessionReceiver;
readonly object messageReceivePumpSyncLock;
readonly bool ownsConnection;
readonly ActiveClientLinkManager clientLinkManager;
readonly ServiceBusDiagnosticSource diagnosticSource;

Expand Down Expand Up @@ -99,7 +98,7 @@ public MessageReceiver(
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}

this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -124,7 +123,7 @@ public MessageReceiver(
int prefetchCount = Constants.DefaultClientPrefetchCount)
: this(entityPath, null, receiveMode, new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, null, retryPolicy, prefetchCount)
{
this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -145,7 +144,7 @@ public MessageReceiver(
int prefetchCount = Constants.DefaultClientPrefetchCount)
: this(entityPath, null, receiveMode, serviceBusConnection, null, retryPolicy, prefetchCount)
{
this.ownsConnection = false;
this.OwnsConnection = false;
}

internal MessageReceiver(
Expand Down Expand Up @@ -1009,7 +1008,7 @@ protected override async Task OnClosingAsync()
await this.ReceiveLinkManager.CloseAsync().ConfigureAwait(false);
await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);

if (this.ownsConnection)
if (this.OwnsConnection)
{
await this.ServiceBusConnection.CloseAsync().ConfigureAwait(false);
}
Expand Down
11 changes: 5 additions & 6 deletions src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ namespace Microsoft.Azure.ServiceBus.Core
public class MessageSender : ClientEntity, IMessageSender
{
int deliveryCount;
readonly bool ownsConnection;
readonly ActiveClientLinkManager clientLinkManager;
readonly ServiceBusDiagnosticSource diagnosticSource;
readonly bool isViaSender;
Expand Down Expand Up @@ -74,7 +73,7 @@ public MessageSender(
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}

this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -94,7 +93,7 @@ public MessageSender(
RetryPolicy retryPolicy = null)
: this(entityPath, null, null, new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, null, retryPolicy)
{
this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -109,7 +108,7 @@ public MessageSender(
RetryPolicy retryPolicy = null)
: this(entityPath, null, null, serviceBusConnection, null, retryPolicy)
{
this.ownsConnection = false;
this.OwnsConnection = false;
}

/// <summary>
Expand All @@ -132,7 +131,7 @@ public MessageSender(
RetryPolicy retryPolicy = null)
:this(viaEntityPath, entityPath, null, serviceBusConnection, null, retryPolicy)
{
this.ownsConnection = false;
this.OwnsConnection = false;
}

internal MessageSender(
Expand Down Expand Up @@ -444,7 +443,7 @@ protected override async Task OnClosingAsync()
await this.SendLinkManager.CloseAsync().ConfigureAwait(false);
await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);

if (this.ownsConnection)
if (this.OwnsConnection)
{
await this.ServiceBusConnection.CloseAsync().ConfigureAwait(false);
}
Expand Down
9 changes: 4 additions & 5 deletions src/Microsoft.Azure.ServiceBus/QueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ namespace Microsoft.Azure.ServiceBus
/// It uses AMQP protocol for communicating with servicebus.</remarks>
public class QueueClient : ClientEntity, IQueueClient
{
readonly bool ownsConnection;
readonly object syncLock;

int prefetchCount;
Expand Down Expand Up @@ -91,7 +90,7 @@ public QueueClient(string connectionString, string entityPath, ReceiveMode recei
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}

this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -113,7 +112,7 @@ public QueueClient(
RetryPolicy retryPolicy = null)
: this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, entityPath, receiveMode, retryPolicy)
{
this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -137,7 +136,7 @@ public QueueClient(ServiceBusConnection serviceBusConnection, string entityPath,
this.syncLock = new object();
this.QueueName = entityPath;
this.ReceiveMode = receiveMode;
this.ownsConnection = false;
this.OwnsConnection = false;
if (this.ServiceBusConnection.TokenProvider != null)
{
this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBusConnection.OperationTimeout);
Expand Down Expand Up @@ -535,7 +534,7 @@ protected override async Task OnClosingAsync()
await this.sessionClient.CloseAsync().ConfigureAwait(false);
}

if (this.ownsConnection)
if (this.OwnsConnection)
{
await this.ServiceBusConnection.CloseAsync().ConfigureAwait(false);
}
Expand Down
9 changes: 4 additions & 5 deletions src/Microsoft.Azure.ServiceBus/SessionClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ namespace Microsoft.Azure.ServiceBus
public sealed class SessionClient : ClientEntity, ISessionClient
{
const int DefaultPrefetchCount = 0;
readonly bool ownsConnection;
readonly ServiceBusDiagnosticSource diagnosticSource;

/// <summary>
Expand Down Expand Up @@ -97,7 +96,7 @@ public SessionClient(
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}

this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand Down Expand Up @@ -130,7 +129,7 @@ public SessionClient(
retryPolicy,
null)
{
this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand Down Expand Up @@ -158,7 +157,7 @@ public SessionClient(
retryPolicy,
null)
{
this.ownsConnection = false;
this.OwnsConnection = false;
}

internal SessionClient(
Expand Down Expand Up @@ -397,7 +396,7 @@ public override void UnregisterPlugin(string serviceBusPluginName)

protected override async Task OnClosingAsync()
{
if (this.ownsConnection)
if (this.OwnsConnection)
{
await this.ServiceBusConnection.CloseAsync().ConfigureAwait(false);
}
Expand Down
9 changes: 4 additions & 5 deletions src/Microsoft.Azure.ServiceBus/SubscriptionClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class SubscriptionClient : ClientEntity, ISubscriptionClient
{
int prefetchCount;
readonly object syncLock;
readonly bool ownsConnection;
readonly ServiceBusDiagnosticSource diagnosticSource;

IInnerSubscriptionClient innerSubscriptionClient;
Expand Down Expand Up @@ -86,7 +85,7 @@ public SubscriptionClient(string connectionString, string topicPath, string subs
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}

this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -110,7 +109,7 @@ public SubscriptionClient(
RetryPolicy retryPolicy = null)
: this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, topicPath, subscriptionName, receiveMode, retryPolicy)
{
this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand Down Expand Up @@ -142,7 +141,7 @@ public SubscriptionClient(ServiceBusConnection serviceBusConnection, string topi
this.Path = EntityNameHelper.FormatSubscriptionPath(this.TopicPath, this.SubscriptionName);
this.ReceiveMode = receiveMode;
this.diagnosticSource = new ServiceBusDiagnosticSource(this.Path, serviceBusConnection.Endpoint);
this.ownsConnection = false;
this.OwnsConnection = false;
if (this.ServiceBusConnection.TokenProvider != null)
{
this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBusConnection.OperationTimeout);
Expand Down Expand Up @@ -637,7 +636,7 @@ protected override async Task OnClosingAsync()
await this.sessionClient.CloseAsync().ConfigureAwait(false);
}

if (this.ownsConnection)
if (this.OwnsConnection)
{
await this.ServiceBusConnection.CloseAsync().ConfigureAwait(false);
}
Expand Down
9 changes: 4 additions & 5 deletions src/Microsoft.Azure.ServiceBus/TopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ namespace Microsoft.Azure.ServiceBus
/// <remarks>It uses AMQP protocol for communicating with servicebus.</remarks>
public class TopicClient : ClientEntity, ITopicClient
{
readonly bool ownsConnection;
readonly object syncLock;
MessageSender innerSender;

Expand Down Expand Up @@ -61,7 +60,7 @@ public TopicClient(string connectionString, string entityPath, RetryPolicy retry
throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
}

this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -81,7 +80,7 @@ public TopicClient(
RetryPolicy retryPolicy = null)
: this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, entityPath, retryPolicy)
{
this.ownsConnection = true;
this.OwnsConnection = true;
}

/// <summary>
Expand All @@ -102,7 +101,7 @@ public TopicClient(ServiceBusConnection serviceBusConnection, string entityPath,
this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnection));
this.syncLock = new object();
this.TopicName = entityPath;
this.ownsConnection = false;
this.OwnsConnection = false;
if (this.ServiceBusConnection.TokenProvider != null)
{
this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBusConnection.OperationTimeout);
Expand Down Expand Up @@ -236,7 +235,7 @@ protected override async Task OnClosingAsync()
await this.innerSender.CloseAsync().ConfigureAwait(false);
}

if (this.ownsConnection)
if (this.OwnsConnection)
{
await this.ServiceBusConnection.CloseAsync().ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Microsoft.Azure.ServiceBus
public string ClientId { get; }
public bool IsClosedOrClosing { get; }
public abstract System.TimeSpan OperationTimeout { get; set; }
public bool OwnsConnection { get; }
public abstract string Path { get; }
public abstract System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin> RegisteredPlugins { get; }
public Microsoft.Azure.ServiceBus.RetryPolicy RetryPolicy { get; }
Expand Down

0 comments on commit e3b1b93

Please sign in to comment.