Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Added UnauthorizedException, MessageNotFoundException and SessionCann…
Browse files Browse the repository at this point in the history
…otBeLockedException (#444)

Introduction of `MessageNotFoundException` and `SessionCannotBeLockedException` brings more parity of this client with the older .net client.

`UnauthorizedException` is being introduced to replace `System.UnauthorizedAccessException` so that all the exceptions are derived from `ServiceBusException`. This is a breaking change.
  • Loading branch information
nemakam authored Apr 18, 2018
1 parent e693fa6 commit bb3339a
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 49 deletions.
16 changes: 14 additions & 2 deletions src/Microsoft.Azure.ServiceBus/Amqp/AmqpExceptionHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static class AmqpExceptionHelper
{ AmqpClientConstants.PartitionNotOwnedError.Value, AmqpResponseStatusCode.Gone },
{ AmqpClientConstants.EntityDisabledError.Value, AmqpResponseStatusCode.BadRequest },
{ AmqpClientConstants.PublisherRevokedError.Value, AmqpResponseStatusCode.Unauthorized },
{ AmqpClientConstants.AuthorizationFailedError.Value, AmqpResponseStatusCode.Unauthorized},
{ AmqpErrorCode.Stolen.Value, AmqpResponseStatusCode.Gone }
};

Expand Down Expand Up @@ -115,9 +116,10 @@ static Exception ToMessagingContractException(string condition, string message,
return new InvalidOperationException(message);
}

if (string.Equals(condition, AmqpErrorCode.UnauthorizedAccess.Value))
if (string.Equals(condition, AmqpErrorCode.UnauthorizedAccess.Value) ||
string.Equals(condition, AmqpClientConstants.AuthorizationFailedError.Value))
{
return new UnauthorizedAccessException(message);
return new UnauthorizedException(message);
}

if (string.Equals(condition, AmqpClientConstants.ServerBusyError.Value))
Expand Down Expand Up @@ -160,6 +162,16 @@ static Exception ToMessagingContractException(string condition, string message,
return new MessageSizeExceededException(message);
}

if (string.Equals(condition, AmqpClientConstants.MessageNotFoundError.Value))
{
return new MessageNotFoundException(message);
}

if (string.Equals(condition, AmqpClientConstants.SessionCannotBeLockedError.Value))
{
return new SessionCannotBeLockedException(message);
}

return new ServiceBusException(true, message);
}

Expand Down
94 changes: 55 additions & 39 deletions src/Microsoft.Azure.ServiceBus/Core/MessageSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -526,50 +526,58 @@ async Task<long> OnScheduleMessageAsync(Message message)
using (var amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message))
{
var request = AmqpRequestMessage.CreateRequest(
ManagementConstants.Operations.ScheduleMessageOperation,
this.OperationTimeout,
null);
ManagementConstants.Operations.ScheduleMessageOperation,
this.OperationTimeout,
null);

SendingAmqpLink sendLink;
if(this.SendLinkManager.TryGetOpenedObject(out sendLink))
{
request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = sendLink.Name;
}
SendingAmqpLink sendLink = null;

ArraySegment<byte>[] payload = amqpMessage.GetPayload();
var buffer = new BufferListStream(payload);
ArraySegment<byte> value = buffer.ReadBytes((int)buffer.Length);

var entry = new AmqpMap();
try
{
entry[ManagementConstants.Properties.Message] = value;
entry[ManagementConstants.Properties.MessageId] = message.MessageId;

if (!string.IsNullOrWhiteSpace(message.SessionId))
if (this.SendLinkManager.TryGetOpenedObject(out sendLink))
{
entry[ManagementConstants.Properties.SessionId] = message.SessionId;
request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = sendLink.Name;
}

if (!string.IsNullOrWhiteSpace(message.PartitionKey))
ArraySegment<byte>[] payload = amqpMessage.GetPayload();
var buffer = new BufferListStream(payload);
ArraySegment<byte> value = buffer.ReadBytes((int)buffer.Length);

var entry = new AmqpMap();
{
entry[ManagementConstants.Properties.PartitionKey] = message.PartitionKey;
entry[ManagementConstants.Properties.Message] = value;
entry[ManagementConstants.Properties.MessageId] = message.MessageId;

if (!string.IsNullOrWhiteSpace(message.SessionId))
{
entry[ManagementConstants.Properties.SessionId] = message.SessionId;
}

if (!string.IsNullOrWhiteSpace(message.PartitionKey))
{
entry[ManagementConstants.Properties.PartitionKey] = message.PartitionKey;
}
}
}

request.Map[ManagementConstants.Properties.Messages] = new List<AmqpMap> { entry };
request.Map[ManagementConstants.Properties.Messages] = new List<AmqpMap> { entry };

IEnumerable<long> sequenceNumbers = null;
var response = await this.ExecuteRequestResponseAsync(request).ConfigureAwait(false);
if (response.StatusCode == AmqpResponseStatusCode.OK)
{
sequenceNumbers = response.GetValue<long[]>(ManagementConstants.Properties.SequenceNumbers);
IEnumerable<long> sequenceNumbers = null;
var response = await this.ExecuteRequestResponseAsync(request).ConfigureAwait(false);
if (response.StatusCode == AmqpResponseStatusCode.OK)
{
sequenceNumbers = response.GetValue<long[]>(ManagementConstants.Properties.SequenceNumbers);
}
else
{
response.ToMessagingContractException();
}

return sequenceNumbers?.FirstOrDefault() ?? 0;
}
else
catch (Exception exception)
{
response.ToMessagingContractException();
throw AmqpExceptionHelper.GetClientException(exception, sendLink?.GetTrackingId(), null, sendLink?.Session.IsClosing() ?? false);
}

return sequenceNumbers?.FirstOrDefault() ?? 0;
}
}

Expand All @@ -581,19 +589,27 @@ async Task OnCancelScheduledMessageAsync(long sequenceNumber)
this.OperationTimeout,
null);

SendingAmqpLink sendLink;
if (this.SendLinkManager.TryGetOpenedObject(out sendLink))
SendingAmqpLink sendLink = null;

try
{
request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = sendLink.Name;
}
if (this.SendLinkManager.TryGetOpenedObject(out sendLink))
{
request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = sendLink.Name;
}

request.Map[ManagementConstants.Properties.SequenceNumbers] = new[] { sequenceNumber };
request.Map[ManagementConstants.Properties.SequenceNumbers] = new[] { sequenceNumber };

var response = await this.ExecuteRequestResponseAsync(request).ConfigureAwait(false);
var response = await this.ExecuteRequestResponseAsync(request).ConfigureAwait(false);

if (response.StatusCode != AmqpResponseStatusCode.OK)
if (response.StatusCode != AmqpResponseStatusCode.OK)
{
throw response.ToMessagingContractException();
}
}
catch (Exception exception)
{
throw response.ToMessagingContractException();
throw AmqpExceptionHelper.GetClientException(exception, sendLink?.GetTrackingId(), null, sendLink?.Session.IsClosing() ?? false);
}
}

Expand Down
23 changes: 23 additions & 0 deletions src/Microsoft.Azure.ServiceBus/MessageNotFoundException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Azure.ServiceBus
{
using System;

/// <summary>
/// The exception that is thrown when the requested message is not found.
/// </summary>
public sealed class MessageNotFoundException : ServiceBusException
{
public MessageNotFoundException(string message)
: this(message, null)
{
}

public MessageNotFoundException(string message, Exception innerException)
: base(false, message, innerException)
{
}
}
}
23 changes: 23 additions & 0 deletions src/Microsoft.Azure.ServiceBus/SessionCannotBeLockedException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Azure.ServiceBus
{
using System;

/// <summary>
/// The exception that is thrown when a session cannot be locked.
/// </summary>
public sealed class SessionCannotBeLockedException : ServiceBusException
{
public SessionCannotBeLockedException(string message)
: this(message, null)
{
}

public SessionCannotBeLockedException(string message, Exception innerException)
: base(false, message, innerException)
{
}
}
}
23 changes: 23 additions & 0 deletions src/Microsoft.Azure.ServiceBus/UnauthorizedException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Azure.ServiceBus
{
using System;

/// <summary>
/// The exception that is thrown when user doesn't have access to the entity.
/// </summary>
public sealed class UnauthorizedException : ServiceBusException
{
public UnauthorizedException(string message)
: this(message, null)
{
}

public UnauthorizedException(string message, Exception innerException)
: base(false, message, innerException)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ namespace Microsoft.Azure.ServiceBus
public MessageLockLostException(string message) { }
public MessageLockLostException(string message, System.Exception innerException) { }
}
public sealed class MessageNotFoundException : Microsoft.Azure.ServiceBus.ServiceBusException
{
public MessageNotFoundException(string message) { }
public MessageNotFoundException(string message, System.Exception innerException) { }
}
public sealed class MessageSizeExceededException : Microsoft.Azure.ServiceBus.ServiceBusException
{
public MessageSizeExceededException(string message) { }
Expand Down Expand Up @@ -321,6 +326,11 @@ namespace Microsoft.Azure.ServiceBus
public ServiceBusTimeoutException(string message) { }
public ServiceBusTimeoutException(string message, System.Exception innerException) { }
}
public sealed class SessionCannotBeLockedException : Microsoft.Azure.ServiceBus.ServiceBusException
{
public SessionCannotBeLockedException(string message) { }
public SessionCannotBeLockedException(string message, System.Exception innerException) { }
}
public sealed class SessionClient : Microsoft.Azure.ServiceBus.ClientEntity, Microsoft.Azure.ServiceBus.IClientEntity, Microsoft.Azure.ServiceBus.ISessionClient
{
public SessionClient(Microsoft.Azure.ServiceBus.ServiceBusConnectionStringBuilder connectionStringBuilder, Microsoft.Azure.ServiceBus.ReceiveMode receiveMode = 0, Microsoft.Azure.ServiceBus.RetryPolicy retryPolicy = null, int prefetchCount = 0) { }
Expand Down Expand Up @@ -428,6 +438,11 @@ namespace Microsoft.Azure.ServiceBus
public TrueFilter() { }
public override string ToString() { }
}
public sealed class UnauthorizedException : Microsoft.Azure.ServiceBus.ServiceBusException
{
public UnauthorizedException(string message) { }
public UnauthorizedException(string message, System.Exception innerException) { }
}
}
namespace Microsoft.Azure.ServiceBus.Core
{
Expand Down
28 changes: 24 additions & 4 deletions test/Microsoft.Azure.ServiceBus.UnitTests/QueueSessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public sealed class QueueSessionTests
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
async Task SessionTest(string queueName)
public async Task SessionTest(string queueName)
{
var sender = new MessageSender(TestUtility.NamespaceConnectionString, queueName);
var sessionClient = new SessionClient(TestUtility.NamespaceConnectionString, queueName);
Expand Down Expand Up @@ -62,7 +62,7 @@ async Task SessionTest(string queueName)
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
async Task GetAndSetSessionStateTest(string queueName)
public async Task GetAndSetSessionStateTest(string queueName)
{
var sender = new MessageSender(TestUtility.NamespaceConnectionString, queueName);
var sessionClient = new SessionClient(TestUtility.NamespaceConnectionString, queueName);
Expand Down Expand Up @@ -120,7 +120,7 @@ await sender.SendAsync(new Message
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
async Task SessionRenewLockTest(string queueName)
public async Task SessionRenewLockTest(string queueName)
{
var sender = new MessageSender(TestUtility.NamespaceConnectionString, queueName);
var sessionClient = new SessionClient(TestUtility.NamespaceConnectionString, queueName);
Expand Down Expand Up @@ -173,7 +173,7 @@ async Task SessionRenewLockTest(string queueName)
[Theory]
[MemberData(nameof(TestPermutations))]
[DisplayTestMethodName]
async Task PeekSessionAsyncTest(string queueName)
public async Task PeekSessionAsyncTest(string queueName)
{
var sender = new MessageSender(TestUtility.NamespaceConnectionString, queueName);
var sessionClient = new SessionClient(TestUtility.NamespaceConnectionString, queueName, ReceiveMode.ReceiveAndDelete);
Expand Down Expand Up @@ -203,6 +203,26 @@ async Task PeekSessionAsyncTest(string queueName)
}
}

[Fact]
[DisplayTestMethodName]
public async Task AcceptSessionThrowsSessionCannotBeLockedException()
{
var sessionClient = new SessionClient(TestUtility.NamespaceConnectionString, TestConstants.SessionNonPartitionedQueueName);
var someSessionId = "someSessionId";
IMessageSession session = null;

try
{
session = await sessionClient.AcceptMessageSessionAsync(someSessionId);
await Assert.ThrowsAsync<SessionCannotBeLockedException>(async () =>
session = await sessionClient.AcceptMessageSessionAsync(someSessionId));
}
finally
{
await sessionClient.CloseAsync().ConfigureAwait(false);
await session?.CloseAsync();
}
}

async Task AcceptAndCompleteSessionsAsync(SessionClient sessionClient, string sessionId, string messageId)
{
Expand Down
43 changes: 43 additions & 0 deletions test/Microsoft.Azure.ServiceBus.UnitTests/SenderReceiverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,5 +316,48 @@ public async Task DispositionWithUpdatedPropertiesShouldPropagateToReceivedMessa
await receiver.CloseAsync();
}
}

[Fact]
[DisplayTestMethodName]
public async Task CancelScheduledMessageShouldThrowMessageNotFoundException()
{
var sender = new MessageSender(TestUtility.NamespaceConnectionString, TestConstants.NonPartitionedQueueName);

try
{
long nonExistingSequenceNumber = 1000;
await Assert.ThrowsAsync<MessageNotFoundException>(
async () => await sender.CancelScheduledMessageAsync(nonExistingSequenceNumber));
}
finally
{
await sender.CloseAsync().ConfigureAwait(false);
}
}

[Fact]
[DisplayTestMethodName]
public async Task ClientThrowsUnauthorizedExceptionWhenUserDoesntHaveAccess()
{
var csb = new ServiceBusConnectionStringBuilder(TestUtility.NamespaceConnectionString);
csb.SasKeyName = "nonExistingKey";
csb.EntityPath = TestConstants.NonPartitionedQueueName;

var sender = new MessageSender(csb);

try
{
await Assert.ThrowsAsync<UnauthorizedException>(
async () => await sender.SendAsync(new Message()));

long nonExistingSequenceNumber = 1000;
await Assert.ThrowsAsync<UnauthorizedException>(
async () => await sender.CancelScheduledMessageAsync(nonExistingSequenceNumber));
}
finally
{
await sender.CloseAsync().ConfigureAwait(false);
}
}
}
}
Loading

0 comments on commit bb3339a

Please sign in to comment.