Skip to content

Commit

Permalink
Add SessionReceiverOptions/SessionProcessorOptions (#12383)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLove-msft authored Jun 4, 2020
1 parent a338062 commit d07dcc5
Show file tree
Hide file tree
Showing 50 changed files with 536 additions and 209 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
/sdk/search/ @brjohnstmsft @arv100kri @bleroy @tg-msft @heaths
/sdk/search/Microsoft.*/ @brjohnstmsft @arv100kri @bleroy

/sdk/servicebus/ @JoshLove-msft @ShivangiReja @jsquire
/sdk/servicebus/ @JoshLove-msft @ShivangiReja @jsquire @MiYanni
/sdk/servicebus/Microsoft.*/ @nemakam

/sdk/storage/ @amishra-dev @seanmcc-msft @amnguye @kasobol-msft @tg-msft
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ byte[] state = await receiver.GetSessionStateAsync();
// create a receiver specifying a particular session
ServiceBusSessionReceiver receiver = await client.CreateSessionReceiverAsync(
queueName,
sessionId: "Session2");
new ServiceBusSessionReceiverOptions
{
SessionId = "Session2"
});

// the received message is a different type as it contains some service set properties
ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ messageBatch.TryAdd(
await sender.SendAsync(messageBatch);

// get the options to use for configuring the processor
var options = new ServiceBusProcessorOptions
var options = new ServiceBusSessionProcessorOptions
{
// By default after the message handler returns, the processor will complete the message
// If I want more fine-grained control over settlement, I can set this to false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ messageBatch.TryAdd(
await sender.SendAsync(messageBatch);

// get the options to use for configuring the processor
var options = new ServiceBusProcessorOptions
var options = new ServiceBusSessionProcessorOptions
{
// By default after the message handler returns, the processor will complete the message
// If I want more fine-grained control over settlement, I can set this to false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ internal class AmqpConnectionScope : TransportConnectionScope
/// <summary>The URI scheme to apply when using web sockets for service communication.</summary>
private const string WebSocketsUriScheme = "wss";

/// <summary>The string formatting mask to apply to the service endpoint to consume events for a given consumer group and partition.</summary>
private const string ConsumerPathSuffixMask = "{0}/ConsumerGroups/{1}/Partitions/{2}";

/// <summary>The string formatting mask to apply to the service endpoint to publish events for a given partition.</summary>
private const string PartitionProducerPathSuffixMask = "{0}/Partitions/{1}";

/// <summary>
/// The version of AMQP to use within the scope.
/// </summary>
Expand Down Expand Up @@ -160,9 +154,11 @@ public AmqpConnectionScope(Uri serviceEndpoint,
Transport = transport;
Proxy = proxy;
TokenProvider = new CbsTokenProvider(new ServiceBusTokenCredential(credential, serviceEndpoint.ToString()), OperationCancellationSource.Token);
Id = identifier ?? $"{ ServiceEndpoint }-{ Guid.NewGuid().ToString("D").Substring(0, 8) }";
Id = identifier ?? $"{ ServiceEndpoint }-{ Guid.NewGuid().ToString("D", CultureInfo.InvariantCulture).Substring(0, 8) }";

#pragma warning disable CA2214 // Do not call overridable methods in constructors. This internal method is virtual for testing purposes.
Task<AmqpConnection> connectionFactory(TimeSpan timeout) => CreateAndOpenConnectionAsync(AmqpVersion, ServiceEndpoint, Transport, Proxy, Id, timeout);
#pragma warning restore CA2214 // Do not call overridable methods in constructors
ActiveConnection = new FaultTolerantAmqpObject<AmqpConnection>(
connectionFactory,
CloseConnection);
Expand Down Expand Up @@ -411,7 +407,9 @@ protected virtual async Task<AmqpConnection> CreateAndOpenConnectionAsync(
// Create the CBS link that will be used for authorization. The act of creating the link will associate
// it with the connection.

#pragma warning disable CA1806 // Do not ignore method results
new AmqpCbsLink(connection);
#pragma warning restore CA1806 // Do not ignore method results

// When the connection is closed, close each of the links associated with it.

Expand Down Expand Up @@ -1102,7 +1100,7 @@ private static void ValidateTransport(ServiceBusTransportType transport)
{
if ((transport != ServiceBusTransportType.AmqpTcp) && (transport != ServiceBusTransportType.AmqpWebSockets))
{
throw new ArgumentException(nameof(transport), string.Format(CultureInfo.CurrentCulture, Resources.UnknownConnectionType, transport));
throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, Resources.UnknownConnectionType, transport), nameof(transport));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ public static Exception ToMessagingContractException(this Error error, bool conn

public static Exception ToMessagingContractException(string condition, string message, bool connectionError = false)
{
if (string.Equals(condition, AmqpClientConstants.TimeoutError.Value))
if (string.Equals(condition, AmqpClientConstants.TimeoutError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.ServiceTimeout);
}

if (string.Equals(condition, AmqpErrorCode.NotFound.Value))
if (string.Equals(condition, AmqpErrorCode.NotFound.Value, StringComparison.InvariantCultureIgnoreCase))
{
if (connectionError)
{
Expand All @@ -108,68 +108,68 @@ public static Exception ToMessagingContractException(string condition, string me
return new ServiceBusException(message, ServiceBusException.FailureReason.MessagingEntityNotFound);
}

if (string.Equals(condition, AmqpErrorCode.NotImplemented.Value))
if (string.Equals(condition, AmqpErrorCode.NotImplemented.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new NotSupportedException(message);
}

if (string.Equals(condition, AmqpErrorCode.NotAllowed.Value))
if (string.Equals(condition, AmqpErrorCode.NotAllowed.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new InvalidOperationException(message);
}

if (string.Equals(condition, AmqpErrorCode.UnauthorizedAccess.Value) ||
string.Equals(condition, AmqpClientConstants.AuthorizationFailedError.Value))
if (string.Equals(condition, AmqpErrorCode.UnauthorizedAccess.Value, StringComparison.InvariantCultureIgnoreCase) ||
string.Equals(condition, AmqpClientConstants.AuthorizationFailedError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.Unauthorized);
}

if (string.Equals(condition, AmqpClientConstants.ServerBusyError.Value))
if (string.Equals(condition, AmqpClientConstants.ServerBusyError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.ServiceBusy);
}

if (string.Equals(condition, AmqpClientConstants.ArgumentError.Value))
if (string.Equals(condition, AmqpClientConstants.ArgumentError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ArgumentException(message);
}

if (string.Equals(condition, AmqpClientConstants.ArgumentOutOfRangeError.Value))
if (string.Equals(condition, AmqpClientConstants.ArgumentOutOfRangeError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ArgumentOutOfRangeException(message);
}

if (string.Equals(condition, AmqpClientConstants.EntityDisabledError.Value))
if (string.Equals(condition, AmqpClientConstants.EntityDisabledError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.MessagingEntityDisabled);
}

if (string.Equals(condition, AmqpClientConstants.MessageLockLostError.Value))
if (string.Equals(condition, AmqpClientConstants.MessageLockLostError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.MessageLockLost);
}

if (string.Equals(condition, AmqpClientConstants.SessionLockLostError.Value))
if (string.Equals(condition, AmqpClientConstants.SessionLockLostError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.SessionLockLost);
}

if (string.Equals(condition, AmqpErrorCode.ResourceLimitExceeded.Value))
if (string.Equals(condition, AmqpErrorCode.ResourceLimitExceeded.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.QuotaExceeded);
}

if (string.Equals(condition, AmqpErrorCode.MessageSizeExceeded.Value))
if (string.Equals(condition, AmqpErrorCode.MessageSizeExceeded.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.MessageSizeExceeded);
}

if (string.Equals(condition, AmqpClientConstants.MessageNotFoundError.Value))
if (string.Equals(condition, AmqpClientConstants.MessageNotFoundError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.MessageNotFound);
}

if (string.Equals(condition, AmqpClientConstants.SessionCannotBeLockedError.Value))
if (string.Equals(condition, AmqpClientConstants.SessionCannotBeLockedError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.SessionCannotBeLocked);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ internal static class AmqpMessageConverter
private const string SequenceNumberName = "x-opt-sequence-number";
private const string EnqueueSequenceNumberName = "x-opt-enqueue-sequence-number";
private const string LockedUntilName = "x-opt-locked-until";
private const string PublisherName = "x-opt-publisher";
private const string PartitionKeyName = "x-opt-partition-key";
private const string PartitionIdName = "x-opt-partition-id";
private const string ViaPartitionKeyName = "x-opt-via-partition-key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ namespace Azure.Messaging.ServiceBus.Amqp
/// </summary>
///
/// <seealso cref="Azure.Messaging.ServiceBus.Core.TransportReceiver" />
#pragma warning disable CA1001 // Types that own disposable fields should be disposable
internal class AmqpReceiver : TransportReceiver
#pragma warning restore CA1001 // Types that own disposable fields should be disposable
{
/// <summary>Indicates whether or not this instance has been closed.</summary>
private bool _closed = false;
Expand Down Expand Up @@ -181,13 +183,13 @@ private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
}
}

private void CloseLink(ReceivingAmqpLink link)
private static void CloseLink(ReceivingAmqpLink link)
{
link.Session?.SafeClose();
link.SafeClose();
}

private void CloseLink(RequestResponseAmqpLink link)
private static void CloseLink(RequestResponseAmqpLink link)
{
link.Session?.SafeClose();
link.SafeClose();
Expand Down Expand Up @@ -734,18 +736,18 @@ private async Task DisposeMessageRequestResponseAsync(
}
}

private Outcome GetAbandonOutcome(IDictionary<string, object> propertiesToModify) =>
private static Outcome GetAbandonOutcome(IDictionary<string, object> propertiesToModify) =>
GetModifiedOutcome(propertiesToModify, false);

private Outcome GetDeferOutcome(IDictionary<string, object> propertiesToModify) =>
private static Outcome GetDeferOutcome(IDictionary<string, object> propertiesToModify) =>
GetModifiedOutcome(propertiesToModify, true);

private List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumerable<Guid> lockTokens)
private static List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumerable<Guid> lockTokens)
{
return lockTokens.Select(lockToken => new ArraySegment<byte>(lockToken.ToByteArray())).ToList();
}

private Outcome GetModifiedOutcome(
private static Outcome GetModifiedOutcome(
IDictionary<string, object> propertiesToModify,
bool undeliverableHere)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

namespace Azure.Messaging.ServiceBus.Amqp
{
#pragma warning disable CA1001 // Types that own disposable fields should be disposable. AmqpRuleManager does not own connection scope.
internal class AmqpRuleManager : TransportRuleManager
#pragma warning restore CA1001 // Types that own disposable fields should be disposable
{
/// <summary>
/// The path of the Service Bus subscription to which the rule manager is bound.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -25,7 +26,9 @@ namespace Azure.Messaging.ServiceBus.Amqp
///
/// <seealso cref="Azure.Messaging.ServiceBus.Core.TransportSender" />
///
#pragma warning disable CA1001 // Types that own disposable fields should be disposable. The AmqpSender doesn't own the connection scope.
internal class AmqpSender : TransportSender
#pragma warning restore CA1001 // Types that own disposable fields should be disposable
{
/// <summary>Indicates whether or not this instance has been closed.</summary>
private bool _closed = false;
Expand Down Expand Up @@ -239,7 +242,7 @@ internal virtual async Task SendBatchInternalAsync(
using (AmqpMessage batchMessage = messageFactory())
{

string messageHash = batchMessage.GetHashCode().ToString();
string messageHash = batchMessage.GetHashCode().ToString(CultureInfo.InvariantCulture);

ArraySegment<byte> transactionId = AmqpConstants.NullBinary;
Transaction ambientTransaction = Transaction.Current;
Expand All @@ -258,7 +261,7 @@ internal virtual async Task SendBatchInternalAsync(

if (batchMessage.SerializedMessageSize > MaxMessageSize)
{
throw new ServiceBusException(string.Format(Resources.MessageSizeExceeded, messageHash, batchMessage.SerializedMessageSize, MaxMessageSize, _entityPath), ServiceBusException.FailureReason.MessageSizeExceeded);
throw new ServiceBusException(string.Format(CultureInfo.InvariantCulture, Resources.MessageSizeExceeded, messageHash, batchMessage.SerializedMessageSize, MaxMessageSize, _entityPath), ServiceBusException.FailureReason.MessageSizeExceeded);
}

// Attempt to send the message batch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpCorrelationFilterCodec : AmqpFilterCodec
{
public static readonly string Name = AmqpConstants.Vendor + ":correlation-filter:list";
public const string Name = AmqpConstants.Vendor + ":correlation-filter:list";
public const ulong Code = 0x000001370000009;
private const int Fields = 9;
private AmqpMap properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpEmptyRuleActionCodec : AmqpRuleActionCodec
{
public static readonly string Name = AmqpConstants.Vendor + ":empty-rule-action:list";
public const string Name = AmqpConstants.Vendor + ":empty-rule-action:list";
public const ulong Code = 0x0000013700000005;
private const int Fields = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpFalseFilterCodec : AmqpFilterCodec
{
public static readonly string Name = AmqpConstants.Vendor + ":false-filter:list";
public const string Name = AmqpConstants.Vendor + ":false-filter:list";
public const ulong Code = 0x000001370000008;

public AmqpFalseFilterCodec() : base(Name, Code) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpRuleDescriptionCodec : DescribedList
{
public static readonly string Name = AmqpConstants.Vendor + ":rule-description:list";
public const string Name = AmqpConstants.Vendor + ":rule-description:list";
public const ulong Code = 0x0000013700000004;
private const int Fields = 4;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpSqlFilterCodec : AmqpFilterCodec
{
public static readonly string Name = AmqpConstants.Vendor + ":sql-filter:list";
public const string Name = AmqpConstants.Vendor + ":sql-filter:list";
public const ulong Code = 0x000001370000006;
private const int Fields = 2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpSqlRuleActionCodec : AmqpRuleActionCodec
{
public static readonly string Name = AmqpConstants.Vendor + ":sql-rule-action:list";
public const string Name = AmqpConstants.Vendor + ":sql-rule-action:list";
public const ulong Code = 0x0000013700000006;
private const int Fields = 2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpTrueFilterCodec : AmqpFilterCodec
{
public static readonly string Name = AmqpConstants.Vendor + ":true-filter:list";
public const string Name = AmqpConstants.Vendor + ":true-filter:list";
public const ulong Code = 0x000001370000007;

public AmqpTrueFilterCodec() : base(Name, Code) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
<Version>7.0.0-preview.3</Version>
<PackageTags>Azure;Service Bus;ServiceBus;.NET;AMQP;$(PackageCommonTags)</PackageTags>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
<EnableFxCopAnalyzers>false</EnableFxCopAnalyzers>
<EnableApiCompat>false</EnableApiCompat>
<GenerateAPIListing>false</GenerateAPIListing>
</PropertyGroup>
Expand Down
Loading

0 comments on commit d07dcc5

Please sign in to comment.