Skip to content

Commit

Permalink
Add Stateful methods for circuitbreaker (#5650)
Browse files Browse the repository at this point in the history
* Add Stateful methods for circuitbreaker

* api docs

* fix api docs

Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
3 people authored Mar 7, 2022
1 parent 60d95ac commit 5296c64
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 10 deletions.
2 changes: 2 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4045,7 +4045,9 @@ namespace Akka.Pattern
public Akka.Pattern.CircuitBreaker OnOpen(System.Action callback) { }
public void Succeed() { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T>(System.Func<System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T, TState>(TState state, System.Func<TState, System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task WithCircuitBreaker(System.Func<System.Threading.Tasks.Task> body) { }
public System.Threading.Tasks.Task WithCircuitBreaker<TState>(TState state, System.Func<TState, System.Threading.Tasks.Task> body) { }
public Akka.Pattern.CircuitBreaker WithExponentialBackoff(System.TimeSpan maxResetTimeout) { }
public Akka.Pattern.CircuitBreaker WithRandomFactor(double randomFactor) { }
public void WithSyncCircuitBreaker(System.Action body) { }
Expand Down
14 changes: 12 additions & 2 deletions src/core/Akka/Pattern/CircuitBreaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ public Task<T> WithCircuitBreaker<T>(Func<Task<T>> body)
return CurrentState.Invoke(body);
}

public Task<T> WithCircuitBreaker<T, TState>(TState state,
Func<TState, Task<T>> body)
{
return CurrentState.InvokeState(state, body);
}

/// <summary>
/// Wraps invocation of asynchronous calls that need to be protected
/// </summary>
Expand All @@ -241,14 +247,18 @@ public Task WithCircuitBreaker(Func<Task> body)
{
return CurrentState.Invoke(body);
}
public Task WithCircuitBreaker<TState>(TState state, Func<TState, Task> body)
{
return CurrentState.InvokeState(state, body);
}

/// <summary>
/// The failure will be recorded farther down.
/// </summary>
/// <param name="body">TBD</param>
public void WithSyncCircuitBreaker(Action body)
{
var cbTask = WithCircuitBreaker(() => Task.Factory.StartNew(body));
var cbTask = WithCircuitBreaker(body,(b) => Task.Factory.StartNew(b));
if (!cbTask.Wait(CallTimeout))
{
//throw new TimeoutException( string.Format( "Execution did not complete within the time allotted {0} ms", CallTimeout.TotalMilliseconds ) );
Expand All @@ -275,7 +285,7 @@ public void WithSyncCircuitBreaker(Action body)
/// <returns><typeparamref name="T"/> or default(<typeparamref name="T"/>)</returns>
public T WithSyncCircuitBreaker<T>(Func<T> body)
{
var cbTask = WithCircuitBreaker(() => Task.Factory.StartNew(body));
var cbTask = WithCircuitBreaker(body,(b) => Task.Factory.StartNew(b));
return cbTask.Wait(CallTimeout) ? cbTask.Result : default(T);
}

Expand Down
52 changes: 44 additions & 8 deletions src/core/Akka/Pattern/CircuitBreakerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ private TimeSpan RemainingDuration()
public override Task<T> Invoke<T>(Func<Task<T>> body) =>
Task.FromException<T>(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration()));

public override Task
InvokeState<TState>(TState state, Func<TState, Task> body) =>
Task.FromException(
new OpenCircuitException(_breaker.LastCaughtException,
RemainingDuration()));

public override Task<T> InvokeState<T, TState>(TState state,
Func<TState, Task<T>> body) => Task.FromException<T>(
new OpenCircuitException(_breaker.LastCaughtException,
RemainingDuration()));

/// <summary>
/// Fail-fast on any invocation
/// </summary>
Expand Down Expand Up @@ -121,6 +132,14 @@ public HalfOpen(CircuitBreaker breaker)
_lock = new AtomicBoolean();
}

private void CheckState()
{
if (!_lock.CompareAndSet(true, false))
{
throw new OpenCircuitException("Circuit breaker is half open, only one call is allowed; this call is failing fast.", _breaker.LastCaughtException, TimeSpan.Zero);
}
}

/// <summary>
/// Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens.
/// If the call succeeds, the breaker closes.
Expand All @@ -130,12 +149,15 @@ public HalfOpen(CircuitBreaker breaker)
/// <returns><see cref="Task"/> containing result of protected call</returns>
public override async Task<T> Invoke<T>(Func<Task<T>> body)
{
if (!_lock.CompareAndSet(true, false))
{
throw new OpenCircuitException("Circuit breaker is half open, only one call is allowed; this call is failing fast.", _breaker.LastCaughtException, TimeSpan.Zero);
}
CheckState();
return await CallThrough(body);
}

public override async Task<T> InvokeState<T,TState>(TState state, Func<TState, Task<T>> body)
{
CheckState();
return await CallThrough(state,body);
}

/// <summary>
/// Allows a single call through, during which all other callers fail-fast. If the call fails, the breaker reopens.
Expand All @@ -145,13 +167,17 @@ public override async Task<T> Invoke<T>(Func<Task<T>> body)
/// <returns><see cref="Task"/> containing result of protected call</returns>
public override async Task Invoke(Func<Task> body)
{
if (!_lock.CompareAndSet(true, false))
{
throw new OpenCircuitException("Circuit breaker is half open, only one call is allowed; this call is failing fast.", _breaker.LastCaughtException, TimeSpan.Zero);
}
CheckState();
await CallThrough(body);
}

public override async Task InvokeState<TState>(TState state,
Func<TState, Task> body)
{
CheckState();
await CallThrough(state,body);
}

/// <summary>
/// Reopen breaker on failed call.
/// </summary>
Expand Down Expand Up @@ -216,6 +242,11 @@ public override Task<T> Invoke<T>(Func<Task<T>> body)
return CallThrough(body);
}

public override Task<T> InvokeState<T, TState>(TState state, Func<TState, Task<T>> body)
{
return CallThrough(state, body);
}

/// <summary>
/// Implementation of invoke, which simply attempts the call
/// </summary>
Expand All @@ -226,6 +257,11 @@ public override Task Invoke(Func<Task> body)
return CallThrough(body);
}

public override Task InvokeState<TState>(TState state, Func<TState, Task> body)
{
return CallThrough(state, body);
}

/// <summary>
/// On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and
/// the breaker is tripped if we have reached maxFailures.
Expand Down
68 changes: 68 additions & 0 deletions src/core/Akka/Util/Internal/AtomicState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,38 @@ public async Task<T> CallThrough<T>(Func<Task<T>> task)
}
return result;
}

public async Task<T> CallThrough<T,TState>(TState state, Func<TState,Task<T>> task)
{
var deadline = DateTime.UtcNow.Add(_callTimeout);
ExceptionDispatchInfo capturedException = null;
T result = default(T);
try
{
result = await task(state).ConfigureAwait(false);
}
catch (Exception ex)
{
capturedException = ExceptionDispatchInfo.Capture(ex);
}

// Need to make sure that timeouts are reported as timeouts
if (capturedException != null)
{
CallFails(capturedException.SourceException);
capturedException.Throw();
}
else if (DateTime.UtcNow.CompareTo(deadline) >= 0)
{
CallFails(new TimeoutException(
$"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms"));
}
else
{
CallSucceeds();
}
return result;
}

/// <summary>
/// Shared implementation of call across all states. Thrown exception or execution of the call beyond the allowed
Expand Down Expand Up @@ -154,8 +186,37 @@ public async Task CallThrough(Func<Task> task)
{
CallSucceeds();
}
}

public async Task CallThrough<TState>(TState state, Func<TState, Task> task)
{
var deadline = DateTime.UtcNow.Add(_callTimeout);
ExceptionDispatchInfo capturedException = null;

try
{
await task(state).ConfigureAwait(false);
}
catch (Exception ex)
{
capturedException = ExceptionDispatchInfo.Capture(ex);
}

// Need to make sure that timeouts are reported as timeouts
if (capturedException != null)
{
CallFails(capturedException?.SourceException);
capturedException.Throw();
}
else if (DateTime.UtcNow.CompareTo(deadline) >= 0)
{
CallFails(new TimeoutException(
$"Execution did not complete within the time allotted {_callTimeout.TotalMilliseconds} ms"));
}
else
{
CallSucceeds();
}
}

/// <summary>
Expand All @@ -166,13 +227,20 @@ public async Task CallThrough(Func<Task> task)
/// <returns><see cref="Task"/> containing result of protected call</returns>
public abstract Task<T> Invoke<T>(Func<Task<T>> body);

public abstract Task<T> InvokeState<T, TState>(TState state,
Func<TState, Task<T>> body);

/// <summary>
/// Abstract entry point for all states
/// </summary>
/// <param name="body">Implementation of the call that needs protected</param>
/// <returns><see cref="Task"/> containing result of protected call</returns>
public abstract Task Invoke(Func<Task> body);

public abstract Task InvokeState<TState>(TState state,
Func<TState, Task> body);


/// <summary>
/// Invoked when call fails
/// </summary>
Expand Down

0 comments on commit 5296c64

Please sign in to comment.