Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry pattern with delay calculation support #4895

Merged
merged 2 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3985,6 +3985,13 @@ namespace Akka.Pattern
public OpenCircuitException(System.Exception cause, System.TimeSpan remainingDuration) { }
public System.TimeSpan RemainingDuration { get; }
}
public class static RetrySupport
{
public static System.Threading.Tasks.Task<T> Retry<T>(System.Func<System.Threading.Tasks.Task<T>> attempt, int attempts) { }
public static System.Threading.Tasks.Task<T> Retry<T>(System.Func<System.Threading.Tasks.Task<T>> attempt, int attempts, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, int randomFactor, Akka.Actor.IScheduler scheduler) { }
public static System.Threading.Tasks.Task<T> Retry<T>(System.Func<System.Threading.Tasks.Task<T>> attempt, int attempts, System.TimeSpan delay, Akka.Actor.IScheduler scheduler) { }
public static System.Threading.Tasks.Task<T> Retry<T>(System.Func<System.Threading.Tasks.Task<T>> attempt, int attempts, System.Func<int, Akka.Util.Option<System.TimeSpan>> delayFunction, Akka.Actor.IScheduler scheduler) { }
}
public class UserCalledFailException : Akka.Actor.AkkaException
{
public UserCalledFailException() { }
Expand Down
204 changes: 204 additions & 0 deletions src/core/Akka.Tests/Pattern/RetrySpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
//-----------------------------------------------------------------------
// <copyright file="RetrySpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.TestKit;
using Xunit;
using static Akka.Pattern.RetrySupport;

namespace Akka.Tests.Pattern
{
public class RetrySpec : AkkaSpec
{
[Fact]
public Task Pattern_Retry_must_run_a_successful_task_immediately()
{
var retried = Retry(() => Task.FromResult(5), 5, TimeSpan.FromSeconds(1), Sys.Scheduler);

return WithinAsync(TimeSpan.FromSeconds(3), async () =>
{
var remaining = await retried;
Assert.Equal(5, remaining);
});
}

[Fact]
public Task Pattern_Retry_must_run_a_successful_task_only_once()
{
var counter = 0;
var retried = Retry(() =>
{
counter++;
return Task.FromResult(counter);
}, 5, TimeSpan.FromSeconds(1), Sys.Scheduler);

return WithinAsync(TimeSpan.FromSeconds(3), async () =>
{
var remaining = await retried;
Assert.Equal(1, remaining);
});
}

[Fact]
public Task Pattern_Retry_must_eventually_return_a_failure_for_a_task_that_will_never_succeed()
{
var retried = Retry(() => Task.FromException<int>(new InvalidOperationException("Mexico")), 5, TimeSpan.FromMilliseconds(100), Sys.Scheduler);

return WithinAsync(TimeSpan.FromSeconds(3), async () =>
{
var exception = await Assert.ThrowsAsync<InvalidOperationException>(() => retried);
Assert.Equal("Mexico", exception.Message);
});
}

[Fact]
public Task Pattern_Retry_must_return_a_success_for_a_task_that_succeeds_eventually()
{
var failCount = 0;

Task<int> Attempt()
{
if (failCount < 5)
{
failCount += 1;
return Task.FromException<int>(new InvalidOperationException(failCount.ToString()));
}
else
{
return Task.FromResult(5);
}
}

var retried = Retry(() => Attempt(), 10, TimeSpan.FromMilliseconds(100), Sys.Scheduler);

return WithinAsync(TimeSpan.FromSeconds(3), async () =>
{
var remaining = await retried;
Assert.Equal(5, remaining);
});
}

[Fact]
public Task Pattern_Retry_must_return_a_failure_for_a_task_that_would_have_succeeded_but_retries_were_exhausted()
{
var failCount = 0;

Task<int> Attempt()
{
if (failCount < 10)
{
failCount += 1;
return Task.FromException<int>(new InvalidOperationException(failCount.ToString()));
}
else
{
return Task.FromResult(5);
}
}

var retried = Retry(() => Attempt(), 5, TimeSpan.FromMilliseconds(100), Sys.Scheduler);

return WithinAsync(TimeSpan.FromSeconds(3), async () =>
{
var exception = await Assert.ThrowsAsync<InvalidOperationException>(() => retried);
Assert.Equal("6", exception.Message);
});
}

[Fact]
public Task Pattern_Retry_must_return_a_failure_for_a_task_that_would_have_succeeded_but_retries_were_exhausted_with_delay_function()
{
var failCount = 0;
var attemptedCount = 0;

Task<int> Attempt()
{
if (failCount < 10)
{
failCount += 1;
return Task.FromException<int>(new InvalidOperationException(failCount.ToString()));
}
else
{
return Task.FromResult(5);
}
}

var retried = Retry(() => Attempt(), 5, attempted =>
{
attemptedCount = attempted;
return TimeSpan.FromMilliseconds(100 + attempted);
}, Sys.Scheduler);

return WithinAsync(TimeSpan.FromSeconds(3), async () =>
{
var exception = await Assert.ThrowsAsync<InvalidOperationException>(() => retried);
Assert.Equal("6", exception.Message);
Assert.Equal(5, attemptedCount);
});
}

[Fact]
public Task Pattern_Retry_can_be_attempted_without_any_delay()
{
var failCount = 0;

Task<int> Attempt()
{
if (failCount < 1000)
{
failCount += 1;
return Task.FromException<int>(new InvalidOperationException(failCount.ToString()));
}
else
{
return Task.FromResult(1);
}
}

var start = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var retried = Retry(() => Attempt(), 999);

return WithinAsync(TimeSpan.FromSeconds(1), async () =>
{
var exception = await Assert.ThrowsAsync<InvalidOperationException>(() => retried);
Assert.Equal("1000", exception.Message);

var elapse = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - start;
Assert.True(elapse <= 100);
});
}

[Fact]
public Task Pattern_Retry_must_handle_thrown_exceptions_in_same_way_as_failed_task()
{
var failCount = 0;

Task<int> Attempt()
{
if (failCount < 5)
{
failCount += 1;
return Task.FromException<int>(new InvalidOperationException(failCount.ToString()));
}
else
{
return Task.FromResult(5);
}
}

var retried = Retry(() => Attempt(), 10, TimeSpan.FromMilliseconds(100), Sys.Scheduler);

return WithinAsync(TimeSpan.FromSeconds(3), async () =>
{
var remaining = await retried;
Assert.Equal(5, remaining);
});
}
}
}
138 changes: 138 additions & 0 deletions src/core/Akka/Pattern/RetrySupport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//-----------------------------------------------------------------------
// <copyright file="RetrySupport.cs" company="Akka.NET Project">
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Util;
using static Akka.Pattern.FutureTimeoutSupport;

namespace Akka.Pattern
{
/// <summary>
/// This class provides the retry utility functions.
/// </summary>
public static class RetrySupport
{
/// <summary>
/// <para>
/// Given a function, returns an internally retrying Task.
/// The first attempt will be made immediately, each subsequent attempt will be made immediately
/// if the previous attempt failed.
/// </para>
/// If attempts are exhausted the returned Task is simply the result of invoking attempt.
/// </summary>
/// <param name="attempt">TBD</param>
/// <param name="attempts">TBD</param>
public static Task<T> Retry<T>(Func<Task<T>> attempt, int attempts) =>
Retry(attempt, attempts, attempted: 0);

/// <summary>
/// <para>
/// Given a function, returns an internally retrying Task.
/// The first attempt will be made immediately, each subsequent attempt will be made with a backoff time,
/// if the previous attempt failed.
/// </para>
/// If attempts are exhausted the returned Task is simply the result of invoking attempt.
/// </summary>
/// <param name="attempt">TBD</param>
/// <param name="attempts">TBD</param>
/// <param name="minBackoff">minimum (initial) duration until the child actor will started again, if it is terminated.</param>
/// <param name="maxBackoff">the exponential back-off is capped to this duration.</param>
/// <param name="randomFactor">after calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`.</param>
/// <param name="scheduler">The scheduler instance to use.</param>
public static Task<T> Retry<T>(Func<Task<T>> attempt, int attempts, TimeSpan minBackoff, TimeSpan maxBackoff, int randomFactor, IScheduler scheduler)
{
if (attempt == null) throw new ArgumentNullException("Parameter attempt should not be null.");
if (minBackoff <= TimeSpan.Zero) throw new ArgumentException("Parameter minBackoff must be > 0");
if (maxBackoff < minBackoff) throw new ArgumentException("Parameter maxBackoff must be >= minBackoff");
if (randomFactor < 0.0 || randomFactor > 1.0) throw new ArgumentException("RandomFactor must be between 0.0 and 1.0");

return Retry(attempt, attempts, attempted => BackoffSupervisor.CalculateDelay(attempted, minBackoff, maxBackoff, randomFactor), scheduler);
}

/// <summary>
/// <para>
/// Given a function, returns an internally retrying Task.
/// The first attempt will be made immediately, each subsequent attempt will be made after 'delay'.
/// A scheduler (eg Context.System.Scheduler) must be provided to delay each retry.
/// </para>
/// If attempts are exhausted the returned future is simply the result of invoking attempt.
/// </summary>
/// <param name="attempt">TBD</param>
/// <param name="attempts">TBD</param>
/// <param name="delay">TBD</param>
/// <param name="scheduler">The scheduler instance to use.</param>
public static Task<T> Retry<T>(Func<Task<T>> attempt, int attempts, TimeSpan delay, IScheduler scheduler) =>
Retry(attempt, attempts, _ => delay, scheduler);

/// <summary>
/// <para>
/// Given a function, returns an internally retrying Task.
/// The first attempt will be made immediately, each subsequent attempt will be made after
/// the 'delay' return by `delayFunction`(the input next attempt count start from 1).
/// Returns <see cref="Option{TimeSpan}.None"/> for no delay.
/// A scheduler (eg Context.System.Scheduler) must be provided to delay each retry.
/// You could provide a function to generate the next delay duration after first attempt,
/// this function should never return `null`, otherwise an <see cref="InvalidOperationException"/> will be through.
/// </para>
/// If attempts are exhausted the returned Task is simply the result of invoking attempt.
/// </summary>
/// <param name="attempt">TBD</param>
/// <param name="attempts">TBD</param>
/// <param name="delayFunction">TBD</param>
/// <param name="scheduler">The scheduler instance to use.</param>
public static Task<T> Retry<T>(Func<Task<T>> attempt, int attempts, Func<int, Option<TimeSpan>> delayFunction, IScheduler scheduler) =>
Retry(attempt, attempts, delayFunction, attempted: 0, scheduler);

private static Task<T> Retry<T>(Func<Task<T>> attempt, int maxAttempts, int attempted) =>
Retry(attempt, maxAttempts, _ => Option<TimeSpan>.None, attempted);

private static Task<T> Retry<T>(Func<Task<T>> attempt, int maxAttempts, Func<int, Option<TimeSpan>> delayFunction, int attempted, IScheduler scheduler = null)
{
Task<T> tryAttempt()
{
try
{
return attempt();
}
catch (Exception ex)
{
return Task.FromException<T>(ex); // in case the `attempt` function throws
}
}

if (maxAttempts < 0) throw new ArgumentException("Parameter maxAttempts must >= 0.");
if (attempt == null) throw new ArgumentNullException(nameof(attempt), "Parameter attempt should not be null.");

if (maxAttempts - attempted > 0)
{
return tryAttempt().ContinueWith(t =>
{
if (t.IsFaulted)
{
var nextAttempt = attempted + 1;
switch (delayFunction(nextAttempt))
{
case Option<TimeSpan> delay when delay.HasValue:
return delay.Value.Ticks < 1
? Retry(attempt, maxAttempts, delayFunction, nextAttempt, scheduler)
: After(delay.Value, scheduler, () => Retry(attempt, maxAttempts, delayFunction, nextAttempt, scheduler));
case Option<TimeSpan> _:
return Retry(attempt, maxAttempts, delayFunction, nextAttempt, scheduler);
default:
throw new InvalidOperationException("The delayFunction of Retry should not return null.");
}
}
return t;
}).Unwrap();
}

return tryAttempt();
}
}
}