Skip to content

Commit

Permalink
Expose success and fail methods in CircuitBreaker (#4339)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed authored Mar 17, 2020
1 parent 0017dd2 commit 345c601
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 11 deletions.
9 changes: 9 additions & 0 deletions docs/articles/utilities/circuit-breaker.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,12 @@ dangerousActor.Tell("is my middle name");
// My CircuitBreaker is now closed
// This really isn't that dangerous of a call after all
```

### Tell Pattern

The above ``Call Protection`` pattern works well when the return from a remote call is wrapped in a ``Future``. However, when a remote call sends back a message or timeout to the caller ``Actor``, the ``Call Protection`` pattern is awkward. CircuitBreaker doesn't support it natively at the moment, so you need to use below low-level power-user APIs, ``succeed`` and ``fail`` methods, as well as ``isClose``, ``isOpen``, ``isHalfOpen``.

>[!NOTE]
>The below examples doesn't make a remote call when the state is `HalfOpen`. Using the power-user APIs, it is your responsibility to judge when to make remote calls in `HalfOpen`.
[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs?name=circuit-breaker-tell-pattern)]
5 changes: 5 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3813,12 +3813,17 @@ namespace Akka.Pattern
public CircuitBreaker(int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { }
public System.TimeSpan CallTimeout { get; }
public long CurrentFailureCount { get; }
public bool IsClosed { get; }
public bool IsHalfOpen { get; }
public bool IsOpen { get; }
public int MaxFailures { get; }
public System.TimeSpan ResetTimeout { get; }
public static Akka.Pattern.CircuitBreaker Create(int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { }
public void Fail() { }
public Akka.Pattern.CircuitBreaker OnClose(System.Action callback) { }
public Akka.Pattern.CircuitBreaker OnHalfOpen(System.Action callback) { }
public Akka.Pattern.CircuitBreaker OnOpen(System.Action callback) { }
public void Succeed() { }
public System.Threading.Tasks.Task<T> WithCircuitBreaker<T>(System.Func<System.Threading.Tasks.Task<T>> body) { }
public System.Threading.Tasks.Task WithCircuitBreaker(System.Func<System.Threading.Tasks.Task> body) { }
public void WithSyncCircuitBreaker(System.Action body) { }
Expand Down
43 changes: 42 additions & 1 deletion src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace DocsExamples.Utilities.CircuitBreakers
{
#region circuit-breaker-usage
#region circuit-breaker-usage
public class DangerousActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
Expand Down Expand Up @@ -66,4 +66,45 @@ private void NotifyMeOnOpen()
}
}
#endregion

public class TellPatternActor : UntypedActor
{
private readonly IActorRef _recipient;
private readonly CircuitBreaker _breaker;
private readonly ILoggingAdapter _log = Context.GetLogger();

public TellPatternActor(IActorRef recipient )
{
_recipient = recipient;
_breaker = new CircuitBreaker(
maxFailures: 5,
callTimeout: TimeSpan.FromSeconds(10),
resetTimeout: TimeSpan.FromMinutes(1)).OnOpen(NotifyMeOnOpen);
}

private void NotifyMeOnOpen() => _log.Warning("My CircuitBreaker is now open, and will not close for one minute");

#region circuit-breaker-tell-pattern

protected override void OnReceive(object message)
{
switch (message)
{
case "call" when _breaker.IsClosed:
_recipient.Tell("message");
break;
case "response":
_breaker.Succeed();
break;
case Exception _:
_breaker.Fail();
break;
case ReceiveTimeout _:
_breaker.Fail();
break;
}
}

#endregion
}
}
65 changes: 65 additions & 0 deletions src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,27 @@ public void Should_Increment_FailureCount_When_Call_Times_Out( )
Assert.True( CheckLatch( breaker.OpenLatch ) );
Assert.Equal( 1, breaker.Instance.CurrentFailureCount );
}

[Fact(DisplayName = "A synchronous circuit breaker that is closed must increment failure count on fail method")]
public void Must_increment_failure_count_on_fail_method()
{
var breaker = LongCallTimeoutCb();
Assert.True(breaker.Instance.CurrentFailureCount == 0);
breaker.Instance.Fail();
Assert.True(CheckLatch(breaker.OpenLatch));
Assert.True(breaker.Instance.CurrentFailureCount == 1);
}

[Fact(DisplayName = "A synchronous circuit breaker that is closed must reset failure count after success method")]
public void Must_reset_failure_count_after_success_method()
{
var breaker = MultiFailureCb();
Assert.True(breaker.Instance.CurrentFailureCount == 0);
Assert.True(InterceptExceptionType<TestException>(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)));
Assert.True(breaker.Instance.CurrentFailureCount == 1);
breaker.Instance.Succeed();
Assert.True(breaker.Instance.CurrentFailureCount == 0);
}
}

public class ASynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase
Expand Down Expand Up @@ -92,6 +113,28 @@ public void Should_Pass_Call_And_Transition_To_Open_On_Exception( )
Assert.True( InterceptExceptionType<TestException>( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) );
Assert.True( CheckLatch( breaker.OpenLatch ) );
}

[Fact(DisplayName = "A synchronous circuit breaker that is half open must open on calling fail method")]
public void Must_open_on_calling_fail_method()
{
var breaker = ShortCallTimeoutCb();

Assert.True(InterceptExceptionType<TestException>(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)));
Assert.True(CheckLatch(breaker.HalfOpenLatch));
breaker.Instance.Fail();
Assert.True(CheckLatch(breaker.OpenLatch));
}

[Fact(DisplayName = "A synchronous circuit breaker that is half open must close on calling success method")]
public void Must_close_on_calling_success_method()
{
var breaker = ShortCallTimeoutCb();

Assert.True(InterceptExceptionType<TestException>(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)));
Assert.True(CheckLatch(breaker.HalfOpenLatch));
breaker.Instance.Succeed();
Assert.True(CheckLatch(breaker.ClosedLatch));
}
}

public class ASynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase
Expand All @@ -114,6 +157,28 @@ public void Should_Transition_To_Half_Open_When_Reset_Times_Out( )
Assert.True( InterceptExceptionType<TestException>( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) );
Assert.True( CheckLatch( breaker.HalfOpenLatch ) );
}

[Fact(DisplayName = "A synchronous circuit breaker that is open must still be in open state after calling success method")]
public void Must_still_be_in_open_state_after_calling_success_method()
{
var breaker = LongCallTimeoutCb();

Assert.True(InterceptExceptionType<TestException>(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)));
Assert.True(CheckLatch(breaker.OpenLatch));
breaker.Instance.Succeed();
Assert.True(CheckLatch(breaker.OpenLatch));
}

[Fact(DisplayName = "A synchronous circuit breaker that is open must still be in open state after calling fail method")]
public void Must_still_be_in_open_state_after_calling_fail_method()
{
var breaker = LongCallTimeoutCb();

Assert.True(InterceptExceptionType<TestException>(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)));
Assert.True(CheckLatch(breaker.OpenLatch));
breaker.Instance.Fail();
Assert.True(CheckLatch(breaker.OpenLatch));
}
}

public class AnAsynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase
Expand Down
38 changes: 38 additions & 0 deletions src/core/Akka/Pattern/CircuitBreaker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,44 @@ public T WithSyncCircuitBreaker<T>(Func<T> body)
return cbTask.Wait(CallTimeout) ? cbTask.Result : default(T);
}

/// <summary>
/// Mark a successful call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the
/// caller Actor. In such a case, it is convenient to mark a successful call instead of using Future
/// via <see cref="WithCircuitBreaker"/>
/// </summary>
public void Succeed() => _currentState.CallSucceeds();

/// <summary>
/// Mark a failed call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the
/// caller Actor. In such a case, it is convenient to mark a failed call instead of using Future
/// via <see cref="WithCircuitBreaker"/>
/// </summary>
public void Fail() => _currentState.CallFails();

/// <summary>
/// Return true if the internal state is Closed. WARNING: It is a "power API" call which you should use with care.
/// Ordinal use cases of CircuitBreaker expects a remote call to return Future, as in <see cref="WithCircuitBreaker"/>.
/// So, if you check the state by yourself, and make a remote call outside CircuitBreaker, you should
/// manage the state yourself.
/// </summary>
public bool IsClosed => _currentState is Closed;

/// <summary>
/// Return true if the internal state is Open. WARNING: It is a "power API" call which you should use with care.
/// Ordinal use cases of CircuitBreaker expects a remote call to return Future, as in <see cref="WithCircuitBreaker"/>.
/// So, if you check the state by yourself, and make a remote call outside CircuitBreaker, you should
/// manage the state yourself.
/// </summary>
public bool IsOpen => _currentState is Open;

/// <summary>
/// Return true if the internal state is HalfOpen. WARNING: It is a "power API" call which you should use with care.
/// Ordinal use cases of CircuitBreaker expects a remote call to return Future, as in withCircuitBreaker.
/// So, if you check the state by yourself, and make a remote call outside CircuitBreaker, you should
/// manage the state yourself.
/// </summary>
public bool IsHalfOpen => _currentState is HalfOpen;

/// <summary>
/// Adds a callback to execute when circuit breaker opens
/// </summary>
Expand Down
19 changes: 11 additions & 8 deletions src/core/Akka/Pattern/CircuitBreakerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,15 @@ public override async Task Invoke(Func<Task> body)
/// <summary>
/// No-op for open, calls are never executed so cannot succeed or fail
/// </summary>
protected override void CallFails()
protected internal override void CallFails()
{
//throw new NotImplementedException();
}

/// <summary>
/// No-op for open, calls are never executed so cannot succeed or fail
/// </summary>
protected override void CallSucceeds()
protected internal override void CallSucceeds()
{
//throw new NotImplementedException();
}

/// <summary>
Expand All @@ -77,6 +75,11 @@ protected override void EnterInternal()
{
Task.Delay(_breaker.ResetTimeout).ContinueWith(task => _breaker.AttemptReset());
}

/// <summary>
/// Override for more descriptive toString
/// </summary>
public override string ToString() => "Open";
}

/// <summary>
Expand Down Expand Up @@ -134,15 +137,15 @@ public override async Task Invoke(Func<Task> body)
/// <summary>
/// Reopen breaker on failed call.
/// </summary>
protected override void CallFails()
protected internal override void CallFails()
{
_breaker.TripBreaker(this);
}

/// <summary>
/// Reset breaker on successful call.
/// </summary>
protected override void CallSucceeds()
protected internal override void CallSucceeds()
{
_breaker.ResetBreaker();
}
Expand Down Expand Up @@ -207,7 +210,7 @@ public override Task Invoke(Func<Task> body)
/// On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and
/// the breaker is tripped if we have reached maxFailures.
/// </summary>
protected override void CallFails()
protected internal override void CallFails()
{
if (IncrementAndGet() == _breaker.MaxFailures)
{
Expand All @@ -218,7 +221,7 @@ protected override void CallFails()
/// <summary>
/// On successful call, the failure count is reset to 0
/// </summary>
protected override void CallSucceeds()
protected internal override void CallSucceeds()
{
Reset();
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka/Util/Internal/AtomicState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,12 @@ public async Task CallThrough(Func<Task> task)
/// <summary>
/// Invoked when call fails
/// </summary>
protected abstract void CallFails();
protected internal abstract void CallFails();

/// <summary>
/// Invoked when call succeeds
/// </summary>
protected abstract void CallSucceeds();
protected internal abstract void CallSucceeds();

/// <summary>
/// Invoked on the transitioned-to state during transition. Notifies listeners after invoking subclass template method _enter
Expand Down

0 comments on commit 345c601

Please sign in to comment.