From 345c601a0d0263aa3ba0e069af2f9188d1261ece Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Tue, 17 Mar 2020 15:05:39 +0100 Subject: [PATCH] Expose success and fail methods in CircuitBreaker (#4339) --- docs/articles/utilities/circuit-breaker.md | 9 +++ .../CoreAPISpec.ApproveCore.approved.txt | 5 ++ .../Utilities/CircuitBreakerDocSpec.cs | 43 +++++++++++- .../Akka.Tests/Pattern/CircuitBreakerSpec.cs | 65 +++++++++++++++++++ src/core/Akka/Pattern/CircuitBreaker.cs | 38 +++++++++++ src/core/Akka/Pattern/CircuitBreakerState.cs | 19 +++--- src/core/Akka/Util/Internal/AtomicState.cs | 4 +- 7 files changed, 172 insertions(+), 11 deletions(-) diff --git a/docs/articles/utilities/circuit-breaker.md b/docs/articles/utilities/circuit-breaker.md index 24a26114410..669dd3c72a9 100644 --- a/docs/articles/utilities/circuit-breaker.md +++ b/docs/articles/utilities/circuit-breaker.md @@ -74,3 +74,12 @@ dangerousActor.Tell("is my middle name"); // My CircuitBreaker is now closed // This really isn't that dangerous of a call after all ``` + +### Tell Pattern + +The above ``Call Protection`` pattern works well when the return from a remote call is wrapped in a ``Future``. However, when a remote call sends back a message or timeout to the caller ``Actor``, the ``Call Protection`` pattern is awkward. CircuitBreaker doesn't support it natively at the moment, so you need to use below low-level power-user APIs, ``succeed`` and ``fail`` methods, as well as ``isClose``, ``isOpen``, ``isHalfOpen``. + +>[!NOTE] +>The below examples doesn't make a remote call when the state is `HalfOpen`. Using the power-user APIs, it is your responsibility to judge when to make remote calls in `HalfOpen`. + +[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs?name=circuit-breaker-tell-pattern)] \ No newline at end of file diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 5521dafe2c0..ef3f1302f11 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -3813,12 +3813,17 @@ namespace Akka.Pattern public CircuitBreaker(int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { } public System.TimeSpan CallTimeout { get; } public long CurrentFailureCount { get; } + public bool IsClosed { get; } + public bool IsHalfOpen { get; } + public bool IsOpen { get; } public int MaxFailures { get; } public System.TimeSpan ResetTimeout { get; } public static Akka.Pattern.CircuitBreaker Create(int maxFailures, System.TimeSpan callTimeout, System.TimeSpan resetTimeout) { } + public void Fail() { } public Akka.Pattern.CircuitBreaker OnClose(System.Action callback) { } public Akka.Pattern.CircuitBreaker OnHalfOpen(System.Action callback) { } public Akka.Pattern.CircuitBreaker OnOpen(System.Action callback) { } + public void Succeed() { } public System.Threading.Tasks.Task WithCircuitBreaker(System.Func> body) { } public System.Threading.Tasks.Task WithCircuitBreaker(System.Func body) { } public void WithSyncCircuitBreaker(System.Action body) { } diff --git a/src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs b/src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs index ddd3ac18dde..70a183f748a 100644 --- a/src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs +++ b/src/core/Akka.Docs.Tests/Utilities/CircuitBreakerDocSpec.cs @@ -13,7 +13,7 @@ namespace DocsExamples.Utilities.CircuitBreakers { - #region circuit-breaker-usage + #region circuit-breaker-usage public class DangerousActor : ReceiveActor { private readonly ILoggingAdapter _log = Context.GetLogger(); @@ -66,4 +66,45 @@ private void NotifyMeOnOpen() } } #endregion + + public class TellPatternActor : UntypedActor + { + private readonly IActorRef _recipient; + private readonly CircuitBreaker _breaker; + private readonly ILoggingAdapter _log = Context.GetLogger(); + + public TellPatternActor(IActorRef recipient ) + { + _recipient = recipient; + _breaker = new CircuitBreaker( + maxFailures: 5, + callTimeout: TimeSpan.FromSeconds(10), + resetTimeout: TimeSpan.FromMinutes(1)).OnOpen(NotifyMeOnOpen); + } + + private void NotifyMeOnOpen() => _log.Warning("My CircuitBreaker is now open, and will not close for one minute"); + + #region circuit-breaker-tell-pattern + + protected override void OnReceive(object message) + { + switch (message) + { + case "call" when _breaker.IsClosed: + _recipient.Tell("message"); + break; + case "response": + _breaker.Succeed(); + break; + case Exception _: + _breaker.Fail(); + break; + case ReceiveTimeout _: + _breaker.Fail(); + break; + } + } + + #endregion + } } diff --git a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs index 54d23728c9f..0ad0e4eb044 100644 --- a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs +++ b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs @@ -63,6 +63,27 @@ public void Should_Increment_FailureCount_When_Call_Times_Out( ) Assert.True( CheckLatch( breaker.OpenLatch ) ); Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); } + + [Fact(DisplayName = "A synchronous circuit breaker that is closed must increment failure count on fail method")] + public void Must_increment_failure_count_on_fail_method() + { + var breaker = LongCallTimeoutCb(); + Assert.True(breaker.Instance.CurrentFailureCount == 0); + breaker.Instance.Fail(); + Assert.True(CheckLatch(breaker.OpenLatch)); + Assert.True(breaker.Instance.CurrentFailureCount == 1); + } + + [Fact(DisplayName = "A synchronous circuit breaker that is closed must reset failure count after success method")] + public void Must_reset_failure_count_after_success_method() + { + var breaker = MultiFailureCb(); + Assert.True(breaker.Instance.CurrentFailureCount == 0); + Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + Assert.True(breaker.Instance.CurrentFailureCount == 1); + breaker.Instance.Succeed(); + Assert.True(breaker.Instance.CurrentFailureCount == 0); + } } public class ASynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase @@ -92,6 +113,28 @@ public void Should_Pass_Call_And_Transition_To_Open_On_Exception( ) Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); Assert.True( CheckLatch( breaker.OpenLatch ) ); } + + [Fact(DisplayName = "A synchronous circuit breaker that is half open must open on calling fail method")] + public void Must_open_on_calling_fail_method() + { + var breaker = ShortCallTimeoutCb(); + + Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); + breaker.Instance.Fail(); + Assert.True(CheckLatch(breaker.OpenLatch)); + } + + [Fact(DisplayName = "A synchronous circuit breaker that is half open must close on calling success method")] + public void Must_close_on_calling_success_method() + { + var breaker = ShortCallTimeoutCb(); + + Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); + breaker.Instance.Succeed(); + Assert.True(CheckLatch(breaker.ClosedLatch)); + } } public class ASynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase @@ -114,6 +157,28 @@ public void Should_Transition_To_Half_Open_When_Reset_Times_Out( ) Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); } + + [Fact(DisplayName = "A synchronous circuit breaker that is open must still be in open state after calling success method")] + public void Must_still_be_in_open_state_after_calling_success_method() + { + var breaker = LongCallTimeoutCb(); + + Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.Succeed(); + Assert.True(CheckLatch(breaker.OpenLatch)); + } + + [Fact(DisplayName = "A synchronous circuit breaker that is open must still be in open state after calling fail method")] + public void Must_still_be_in_open_state_after_calling_fail_method() + { + var breaker = LongCallTimeoutCb(); + + Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.Fail(); + Assert.True(CheckLatch(breaker.OpenLatch)); + } } public class AnAsynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase diff --git a/src/core/Akka/Pattern/CircuitBreaker.cs b/src/core/Akka/Pattern/CircuitBreaker.cs index 9dbc1aa4359..ddc6aba8362 100644 --- a/src/core/Akka/Pattern/CircuitBreaker.cs +++ b/src/core/Akka/Pattern/CircuitBreaker.cs @@ -190,6 +190,44 @@ public T WithSyncCircuitBreaker(Func body) return cbTask.Wait(CallTimeout) ? cbTask.Result : default(T); } + /// + /// Mark a successful call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the + /// caller Actor. In such a case, it is convenient to mark a successful call instead of using Future + /// via + /// + public void Succeed() => _currentState.CallSucceeds(); + + /// + /// Mark a failed call through CircuitBreaker. Sometimes the callee of CircuitBreaker sends back a message to the + /// caller Actor. In such a case, it is convenient to mark a failed call instead of using Future + /// via + /// + public void Fail() => _currentState.CallFails(); + + /// + /// Return true if the internal state is Closed. WARNING: It is a "power API" call which you should use with care. + /// Ordinal use cases of CircuitBreaker expects a remote call to return Future, as in . + /// So, if you check the state by yourself, and make a remote call outside CircuitBreaker, you should + /// manage the state yourself. + /// + public bool IsClosed => _currentState is Closed; + + /// + /// Return true if the internal state is Open. WARNING: It is a "power API" call which you should use with care. + /// Ordinal use cases of CircuitBreaker expects a remote call to return Future, as in . + /// So, if you check the state by yourself, and make a remote call outside CircuitBreaker, you should + /// manage the state yourself. + /// + public bool IsOpen => _currentState is Open; + + /// + /// Return true if the internal state is HalfOpen. WARNING: It is a "power API" call which you should use with care. + /// Ordinal use cases of CircuitBreaker expects a remote call to return Future, as in withCircuitBreaker. + /// So, if you check the state by yourself, and make a remote call outside CircuitBreaker, you should + /// manage the state yourself. + /// + public bool IsHalfOpen => _currentState is HalfOpen; + /// /// Adds a callback to execute when circuit breaker opens /// diff --git a/src/core/Akka/Pattern/CircuitBreakerState.cs b/src/core/Akka/Pattern/CircuitBreakerState.cs index af7bc97cf8b..53064bce917 100644 --- a/src/core/Akka/Pattern/CircuitBreakerState.cs +++ b/src/core/Akka/Pattern/CircuitBreakerState.cs @@ -56,17 +56,15 @@ public override async Task Invoke(Func body) /// /// No-op for open, calls are never executed so cannot succeed or fail /// - protected override void CallFails() + protected internal override void CallFails() { - //throw new NotImplementedException(); } /// /// No-op for open, calls are never executed so cannot succeed or fail /// - protected override void CallSucceeds() + protected internal override void CallSucceeds() { - //throw new NotImplementedException(); } /// @@ -77,6 +75,11 @@ protected override void EnterInternal() { Task.Delay(_breaker.ResetTimeout).ContinueWith(task => _breaker.AttemptReset()); } + + /// + /// Override for more descriptive toString + /// + public override string ToString() => "Open"; } /// @@ -134,7 +137,7 @@ public override async Task Invoke(Func body) /// /// Reopen breaker on failed call. /// - protected override void CallFails() + protected internal override void CallFails() { _breaker.TripBreaker(this); } @@ -142,7 +145,7 @@ protected override void CallFails() /// /// Reset breaker on successful call. /// - protected override void CallSucceeds() + protected internal override void CallSucceeds() { _breaker.ResetBreaker(); } @@ -207,7 +210,7 @@ public override Task Invoke(Func body) /// On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and /// the breaker is tripped if we have reached maxFailures. /// - protected override void CallFails() + protected internal override void CallFails() { if (IncrementAndGet() == _breaker.MaxFailures) { @@ -218,7 +221,7 @@ protected override void CallFails() /// /// On successful call, the failure count is reset to 0 /// - protected override void CallSucceeds() + protected internal override void CallSucceeds() { Reset(); } diff --git a/src/core/Akka/Util/Internal/AtomicState.cs b/src/core/Akka/Util/Internal/AtomicState.cs index 38022a36325..fc6673e82b1 100644 --- a/src/core/Akka/Util/Internal/AtomicState.cs +++ b/src/core/Akka/Util/Internal/AtomicState.cs @@ -167,12 +167,12 @@ public async Task CallThrough(Func task) /// /// Invoked when call fails /// - protected abstract void CallFails(); + protected internal abstract void CallFails(); /// /// Invoked when call succeeds /// - protected abstract void CallSucceeds(); + protected internal abstract void CallSucceeds(); /// /// Invoked on the transitioned-to state during transition. Notifies listeners after invoking subclass template method _enter