Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Do not retry when there's an ambient transaction #621

Merged
merged 4 commits into from
Dec 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Microsoft.Azure.ServiceBus/RetryExponential.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ namespace Microsoft.Azure.ServiceBus

/// <summary>
/// RetryPolicy implementation where the delay between retries will grow in a staggered exponential manner.
/// RetryIntervals will be computed using a retryFactor which is a function of deltaBackOff (MaximumBackoff - MinimumBackoff) and MaximumRetryCount
/// RetryIntervals will be computed using a retryFactor which is a function of deltaBackOff (MaximumBackoff - MinimumBackoff) and MaximumRetryCount.
/// <remarks>RetryPolicy will not be applied when an ambient transaction is found.</remarks>
/// </summary>
public sealed class RetryExponential : RetryPolicy
{
Expand Down
6 changes: 4 additions & 2 deletions src/Microsoft.Azure.ServiceBus/RetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ namespace Microsoft.Azure.ServiceBus
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Transactions;
using Primitives;

/// <summary>
/// Represents an abstraction for retrying messaging operations. Users should not
/// implement this class, and instead should use one of the provided implementations.
/// <remarks>RetryPolicy will not be applied when an ambient transaction is found.</remarks>
/// </summary>
public abstract class RetryPolicy
{
Expand Down Expand Up @@ -124,9 +126,9 @@ public virtual bool IsRetryableException(Exception exception)

internal bool ShouldRetry(TimeSpan remainingTime, int currentRetryCount, Exception lastException, out TimeSpan retryInterval)
{
if (lastException == null)
// There is no exception information or there's there's an ambient transaction - should not retry
if (lastException == null || Transaction.Current != null)
{
// there are no exceptions.
retryInterval = TimeSpan.Zero;
return false;
}
Expand Down
59 changes: 59 additions & 0 deletions test/Microsoft.Azure.ServiceBus.UnitTests/RetryPolicyTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
namespace Microsoft.Azure.ServiceBus.UnitTests
{
using System;
using System.Threading.Tasks;
using System.Transactions;
using Xunit;

public class RetryPolicyTests
{
[Fact]
[DisplayTestMethodName]
public async Task Should_retry_when_throttled_and_no_ambient_transaction_is_detected()
{
var retryPolicy = RetryPolicy.Default;

var numberOfExecutions = 0;

await retryPolicy.RunOperation(() =>
{
if (numberOfExecutions > 1)
{
return Task.CompletedTask;
}

numberOfExecutions++;

throw new ServerBusyException("Rico KABOOM!");
}, TimeSpan.FromSeconds(30));

Assert.Equal(2, numberOfExecutions);
}

[Fact]
[DisplayTestMethodName]
public async Task Should_not_retry_when_throttled_and_ambient_transaction_is_detected()
{
var retryPolicy = RetryPolicy.Default;
var numberOfExecutions = 0;

using (var tx = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled))
{
await Assert.ThrowsAsync<ServerBusyException>(() =>
retryPolicy.RunOperation(() =>
{
if (numberOfExecutions > 1)
{
return Task.CompletedTask;
}

numberOfExecutions++;

throw new ServerBusyException("Rico KABOOM!");
}, TimeSpan.FromSeconds(30)));
}

Assert.Equal(1, numberOfExecutions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ internal async Task PeekLockTestCase(IMessageSender messageSender, IMessageRecei
internal async Task ReceiveDeleteTestCase(IMessageSender messageSender, IMessageReceiver messageReceiver, int messageCount)
{
await TestUtility.SendMessagesAsync(messageSender, messageCount);
var receivedMessages = await TestUtility.ReceiveMessagesAsync(messageReceiver, messageCount);
Assert.True(messageCount == receivedMessages.Count);
var receivedMessages = await TestUtility.ReceiveMessagesAsync(messageReceiver, messageCount, TimeSpan.FromSeconds(10));
Assert.Equal(receivedMessages.Count, messageCount);
}

internal async Task PeekLockWithAbandonTestCase(IMessageSender messageSender, IMessageReceiver messageReceiver, int messageCount)
Expand Down
3 changes: 3 additions & 0 deletions test/Microsoft.Azure.ServiceBus.UnitTests/TestConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@

namespace Microsoft.Azure.ServiceBus.UnitTests
{
using System;

static class TestConstants
{
internal const int MaxAttemptsCount = 5;
internal readonly static TimeSpan WaitTimeBetweenAttempts = TimeSpan.FromSeconds(1);

internal const string ConnectionStringEnvironmentVariable = "azure-service-bus-dotnet/connectionstring";

Expand Down
19 changes: 14 additions & 5 deletions test/Microsoft.Azure.ServiceBus.UnitTests/TestUtility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,27 @@ internal static async Task SendMessagesAsync(IMessageSender messageSender, int m
Log($"Sent {messageCount} messages");
}

internal static async Task<IList<Message>> ReceiveMessagesAsync(IMessageReceiver messageReceiver, int messageCount)
internal static async Task<IList<Message>> ReceiveMessagesAsync(IMessageReceiver messageReceiver, int messageCount, TimeSpan timeout = default)
{
var receiveAttempts = 0;
var messagesToReturn = new List<Message>();
var stopwatch = Stopwatch.StartNew();

while (receiveAttempts++ < TestConstants.MaxAttemptsCount && messagesToReturn.Count < messageCount)
if (timeout == default)
{
timeout = TimeSpan.Zero;
}

while (messagesToReturn.Count < messageCount && (receiveAttempts++ < TestConstants.MaxAttemptsCount || stopwatch.Elapsed < timeout))
{
var messages = await messageReceiver.ReceiveAsync(messageCount - messagesToReturn.Count);
if (messages != null)
if (messages == null)
{
messagesToReturn.AddRange(messages);
await Task.Delay(TestConstants.WaitTimeBetweenAttempts);
continue;
}

messagesToReturn.AddRange(messages);
}

VerifyUniqueMessages(messagesToReturn);
Expand All @@ -98,7 +107,7 @@ internal static async Task<IList<Message>> ReceiveDeferredMessagesAsync(IMessage
var msg = await messageReceiver.ReceiveDeferredMessageAsync(sequenceNumber);
if (msg != null)
{
messagesToReturn.Add(msg);
messagesToReturn.Add(msg);
}
}

Expand Down