From 692ff7f8bb9b4e19db825048c8745b0454fb29d4 Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Tue, 27 Sep 2022 08:05:28 +0200 Subject: [PATCH] Fixed unit tests --- .../Akka.Tests/Pattern/CircuitBreakerSpec.cs | 605 +++++++----------- .../Pattern/CircuitBreakerStressSpec.cs | 16 +- src/core/Akka/Pattern/CircuitBreakerState.cs | 69 +- .../Akka/Util/Extensions/TaskExtensions.cs | 6 +- src/core/Akka/Util/Internal/AtomicState.cs | 4 +- 5 files changed, 263 insertions(+), 437 deletions(-) diff --git a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs index aa5e1683b49..fe7608cbcde 100644 --- a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs +++ b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs @@ -7,174 +7,137 @@ using System; using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Runtime.Serialization; using System.Threading; using System.Threading.Tasks; using Akka.Pattern; using Akka.TestKit; -using Akka.TestKit.Extensions; -using Akka.Tests.Util; +using Akka.Util.Internal; using Xunit; +using Xunit.Sdk; namespace Akka.Tests.Pattern { public class ASynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase { - [Fact(DisplayName = "A synchronous circuit breaker that is closed should allow call through")] - public void Should_Allow_Call_Through( ) + [Fact(DisplayName = "A synchronous circuit breaker that is closed must allow calls through")] + public void Must_allow_calls_through() { - var breaker = LongCallTimeoutCb( ); - var result = breaker.Instance.WithSyncCircuitBreaker( ( ) => "Test" ); - - Assert.Equal( "Test", result ); + var breaker = LongCallTimeoutCb(); + breaker.Instance.WithSyncCircuitBreaker(SayHi).ShouldBe("hi"); } - [Fact( DisplayName = "A synchronous circuit breaker that is closed should increment failure count when call fails" )] - public void Should_Increment_FailureCount_When_Call_Fails( ) + [Fact(DisplayName = "A synchronous circuit breaker that is closed must increment failure count on failure")] + public void Must_increment_failure_count_on_failure() { - var breaker = LongCallTimeoutCb( ); - - Assert.Equal(0, breaker.Instance.CurrentFailureCount); - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); - Assert.True( CheckLatch( breaker.OpenLatch ) ); - Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); + var breaker = LongCallTimeoutCb(); + breaker.Instance.CurrentFailureCount.ShouldBe(0); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); } - [Fact( DisplayName = "A synchronous circuit breaker that is closed should reset failure count when call succeeds" )] - public void Should_Reset_FailureCount_When_Call_Succeeds( ) + [Fact(DisplayName = "A synchronous circuit breaker that is closed must increment failure count on fail method")] + public void Must_increment_failure_count_on_fail_method() { - var breaker = MultiFailureCb( ); - - Assert.Equal(0, breaker.Instance.CurrentFailureCount); - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); - Assert.Equal(1, breaker.Instance.CurrentFailureCount); - - breaker.Instance.WithSyncCircuitBreaker( ( ) => "Test" ); - - Assert.Equal( 0, breaker.Instance.CurrentFailureCount ); + var breaker = LongCallTimeoutCb(); + breaker.Instance.CurrentFailureCount.ShouldBe(0); + breaker.Instance.Fail(); + Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); } - [Fact(DisplayName = "A synchronous circuit breaker that is closed should increment failure count when call times out")] - public void Should_Increment_FailureCount_When_Call_Times_Out( ) + [Fact(DisplayName = "A synchronous circuit breaker that is closed must invoke onOpen if call fails and breaker transits to open state")] + public void Must_invoke_onOpen_if_call_fails_and_breaker_transits_to_open_state() { - var breaker = ShortCallTimeoutCb( ); - - breaker.Instance.WithSyncCircuitBreaker( ( ) => Thread.Sleep( 500 ) ); - - Assert.True( CheckLatch( breaker.OpenLatch ) ); - Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); + var breaker = ShortCallTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); } - [Fact(DisplayName = "A synchronous circuit breaker that is closed must increment failure count on fail method")] - public void Must_increment_failure_count_on_fail_method() + [Fact(DisplayName = "A synchronous circuit breaker that is closed must reset failure count after success")] + public void Must_reset_failure_count_after_success() { - var breaker = LongCallTimeoutCb(); - Assert.True(breaker.Instance.CurrentFailureCount == 0); - breaker.Instance.Fail(); - Assert.True(CheckLatch(breaker.OpenLatch)); - Assert.True(breaker.Instance.CurrentFailureCount == 1); + var breaker = MultiFailureCb(); + breaker.Instance.CurrentFailureCount.ShouldBe(0); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); + breaker.Instance.WithSyncCircuitBreaker(SayHi); + breaker.Instance.CurrentFailureCount.ShouldBe(0); } - [Fact(DisplayName = "A synchronous circuit breaker that is closed must reset failure count and clears cached last exception after success method")] + [Fact(DisplayName = "A synchronous circuit breaker that is closed must reset failure count after success method")] public void Must_reset_failure_count_after_success_method() { var breaker = MultiFailureCb(); - Assert.True(breaker.Instance.CurrentFailureCount == 0); - Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); - Assert.True(breaker.Instance.CurrentFailureCount == 1); - Assert.True(breaker.Instance.LastCaughtException is TestException); + breaker.Instance.CurrentFailureCount.ShouldBe(0); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); breaker.Instance.Succeed(); - Assert.True(breaker.Instance.CurrentFailureCount == 0); - Assert.True(breaker.Instance.LastCaughtException is null); + breaker.Instance.CurrentFailureCount.ShouldBe(0); } - } - public class ASynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase - { - [Fact(DisplayName = "A synchronous circuit breaker that is half open should pass call and transition to close on success")] - public void Should_Pass_Call_And_Transition_To_Close_On_Success( ) + [Fact(DisplayName = "A synchronous circuit breaker that is closed must increment failure count on callTimeout before call finishes")] + public void Must_increment_failure_count_on_callTimeout_before_call_finishes() { - var breaker = ShortResetTimeoutCb( ); - InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); - - var result = breaker.Instance.WithSyncCircuitBreaker( ( ) => SayTest( ) ); - - Assert.True( CheckLatch( breaker.ClosedLatch ) ); - Assert.Equal( SayTest( ), result ); + var breaker = ShortCallTimeoutCb(); + Task.Run(() => breaker.Instance.WithSyncCircuitBreaker(() => Thread.Sleep(Dilated(TimeSpan.FromSeconds(1))))); + Within(TimeSpan.FromMilliseconds(900), + () => AwaitCondition(() => breaker.Instance.CurrentFailureCount == 1, Dilated(TimeSpan.FromMilliseconds(100)))); } + } - [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass only one call until it closes")] - public async Task Should_Pass_Only_One_Call_And_Transition_To_Close_On_Success() + public class ASynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase + { + [Fact(DisplayName = "A synchronous circuit breaker that is half open must pass through next call and close on success")] + public void Must_pass_through_call_and_close_on_success() { var breaker = ShortResetTimeoutCb(); - InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); Assert.True(CheckLatch(breaker.HalfOpenLatch)); - - var task1 = breaker.Instance.WithCircuitBreaker(() => DelayedSayTest(TimeSpan.FromSeconds(0.1))); - var task2 = breaker.Instance.WithCircuitBreaker(() => DelayedSayTest(TimeSpan.FromSeconds(0.1))); - var combined = Task.WhenAny(task1, task2).Unwrap(); - - // One of the 2 tasks will throw, because the circuit breaker is half open - Exception caughtException = null; - try - { - await combined; - } - catch (Exception e) - { - caughtException = e; - } - Assert.True(caughtException is OpenCircuitException); - Assert.StartsWith("Circuit breaker is half open", caughtException.Message); - - // Wait until one of task completes - await Task.Delay(TimeSpan.FromSeconds(0.25)); + Assert.True("hi" == breaker.Instance.WithSyncCircuitBreaker(SayHi)); Assert.True(CheckLatch(breaker.ClosedLatch)); - - // We don't know which one of the task got faulted - string result = null; - if (task1.IsCompleted && !task1.IsFaulted) - result = task1.Result; - else if (task2.IsCompleted && !task2.IsFaulted) - result = task2.Result; - - Assert.Equal(SayTest(), result); } - - [Fact(DisplayName = "A synchronous circuit breaker that is half open should pass call and transition to open on exception")] - public void Should_Pass_Call_And_Transition_To_Open_On_Exception( ) + [Fact(DisplayName = "A synchronous circuit breaker that is half open must open on exception in call")] + public void Must_open_on_exception_in_call() { - var breaker = ShortResetTimeoutCb( ); - - - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); - - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); - Assert.True( CheckLatch( breaker.OpenLatch ) ); + var breaker = ShortResetTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); + breaker.OpenLatch.Reset(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); } - [Fact(DisplayName = "A synchronous circuit breaker that is half open must open on calling fail method")] + [Fact(DisplayName = "A synchronous circuit breaker that is half open on calling fail method")] public void Must_open_on_calling_fail_method() { - var breaker = ShortCallTimeoutCb(); - - Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + var breaker = ShortResetTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); Assert.True(CheckLatch(breaker.HalfOpenLatch)); + breaker.OpenLatch.Reset(); breaker.Instance.Fail(); Assert.True(CheckLatch(breaker.OpenLatch)); } + [Fact(DisplayName = "A synchronous circuit breaker that is half open on calling success method")] + public void Must_open_on_calling_success_method() + { + var breaker = ShortResetTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); + breaker.Instance.Succeed(); + Assert.True(CheckLatch(breaker.ClosedLatch)); + } + [Fact(DisplayName = "A synchronous circuit breaker that is half open must close on calling success method")] public void Must_close_on_calling_success_method() { - var breaker = ShortCallTimeoutCb(); + var breaker = ShortResetTimeoutCb(); - Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); Assert.True(CheckLatch(breaker.HalfOpenLatch)); breaker.Instance.Succeed(); Assert.True(CheckLatch(breaker.ClosedLatch)); @@ -183,380 +146,278 @@ public void Must_close_on_calling_success_method() public class ASynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase { - [Fact(DisplayName = "A synchronous circuit breaker that is open should throw exceptions before reset timeout")] - public void Should_Throw_Exceptions_Before_Reset_Timeout( ) + [Fact(DisplayName = "A synchronous circuit breaker that is open must throw exceptions when called before reset timeout")] + public void Must_throw_exceptions_before_reset_timeout() { - var breaker = LongResetTimeoutCb( ); + var breaker = LongResetTimeoutCb(); + + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); - Assert.True( CheckLatch( breaker.OpenLatch ) ); - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); + var e = InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + e.RemainingDuration.ShouldBeGreaterThan(TimeSpan.Zero); + e.RemainingDuration.ShouldBeLessOrEqualTo(LongResetTimeout); } - [Fact(DisplayName = "A synchronous circuit breaker that is open should transition to half open when reset times out")] - public void Should_Transition_To_Half_Open_When_Reset_Times_Out( ) + [Fact(DisplayName = "A synchronous circuit breaker that is open must transition to half-open on reset timeout")] + public void Must_transition_to_half_open_on_reset_timeout() { - var breaker = ShortResetTimeoutCb( ); - - Assert.True( InterceptExceptionType( ( ) => breaker.Instance.WithSyncCircuitBreaker( ThrowException ) ) ); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + var breaker = ShortResetTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); } [Fact(DisplayName = "A synchronous circuit breaker that is open must still be in open state after calling success method")] public void Must_still_be_in_open_state_after_calling_success_method() { - var breaker = LongCallTimeoutCb(); - - Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + var breaker = LongResetTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); Assert.True(CheckLatch(breaker.OpenLatch)); breaker.Instance.Succeed(); - Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.IsOpen.ShouldBeTrue(); } [Fact(DisplayName = "A synchronous circuit breaker that is open must still be in open state after calling fail method")] public void Must_still_be_in_open_state_after_calling_fail_method() { - var breaker = LongCallTimeoutCb(); - - Assert.True(InterceptExceptionType(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException))); + var breaker = LongResetTimeoutCb(); + InterceptException(() => breaker.Instance.WithSyncCircuitBreaker(ThrowException)); Assert.True(CheckLatch(breaker.OpenLatch)); breaker.Instance.Fail(); - Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.IsOpen.ShouldBeTrue(); } } public class AnAsynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase { - [Fact(DisplayName = "An asynchronous circuit breaker that is closed should allow call through")] - public async Task Should_Allow_Call_Through( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is closed must allow calls through")] + public async Task Must_allow_calls_through() { - var breaker = LongCallTimeoutCb( ); - var result = await breaker.Instance.WithCircuitBreaker( () => Task.Run( ( ) => SayTest( ) ) ); - - Assert.Equal( SayTest( ), result ); + var breaker = LongCallTimeoutCb(); + var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout); + Assert.Equal("hi", result); } - [Fact(DisplayName = "An asynchronous circuit breaker that is closed should increment failure count when call fails")] - public async Task Should_Increment_Failure_Count_When_Call_Fails( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is closed must increment failure count on exception")] + public async Task Must_increment_failure_count_on_exception() { - var breaker = LongCallTimeoutCb( ); - - Assert.Equal(0, breaker.Instance.CurrentFailureCount); - Assert.True( await InterceptExceptionTypeAsync( async () => - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Run( ThrowException ) ).AwaitWithTimeout(AwaitTimeout) ) ); - Assert.True( CheckLatch( breaker.OpenLatch ) ); - Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); + var breaker = LongCallTimeoutCb(); + await InterceptException(() => + breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout)); + Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); } - [Fact(DisplayName = "An asynchronous circuit breaker that is closed should reset failure count when call succeeds after failure")] - public async Task Should_Reset_Failure_Count_When_Call_Succeeds_After_Failure( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is closed must increment failure count on async failure")] + public void Must_increment_failure_count_on_async_failure() { - var breaker = MultiFailureCb( ); - - Assert.Equal(0, breaker.Instance.CurrentFailureCount); + var breaker = LongCallTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); + } - var whenall = Task.WhenAll( - breaker.Instance.WithCircuitBreaker(() => Task.Factory.StartNew(ThrowException)) - , breaker.Instance.WithCircuitBreaker(() => Task.Factory.StartNew(ThrowException)) - , breaker.Instance.WithCircuitBreaker(() => Task.Factory.StartNew(ThrowException)) - , breaker.Instance.WithCircuitBreaker(() => Task.Factory.StartNew(ThrowException))); + [Fact(DisplayName = "An asynchronous circuit breaker that is closed must reset failure count after success")] + public async Task Must_reset_failure_count_after_success() + { + var breaker = MultiFailureCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)); + Enumerable.Range(1, 4).ForEach(_ => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException))); + await AwaitAssertAsync(() => breaker.Instance.CurrentFailureCount.ShouldBe(4), AwaitTimeout); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)); + await AwaitAssertAsync(() => breaker.Instance.CurrentFailureCount.ShouldBe(0), AwaitTimeout); + } - Assert.True( await InterceptExceptionTypeAsync( async ( ) => - await whenall.AwaitWithTimeout(AwaitTimeout) ) ); + [Fact(DisplayName = "An asynchronous circuit breaker that is closed must increment failure count on callTimeout")] + public async Task Must_increment_failure_count_on_callTimeout() + { + var breaker = ShortCallTimeoutCb(); - Assert.Equal(4, breaker.Instance.CurrentFailureCount); + var future = breaker.Instance.WithCircuitBreaker(async () => + { + await Task.Delay(150); + ThrowException(); + }); - var result = await breaker.Instance.WithCircuitBreaker(async () => await Task.Run( SayTest ) ); + Assert.True(CheckLatch(breaker.OpenLatch)); + breaker.Instance.CurrentFailureCount.ShouldBe(1); - Assert.Equal( SayTest( ), result ); - Assert.Equal( 0, breaker.Instance.CurrentFailureCount ); + // Since the timeout should have happened before the inner code finishes + // we expect a timeout, not TestException + await InterceptException(() => future.WaitAsync(AwaitTimeout)); } - [Fact(DisplayName = "An asynchronous circuit breaker that is closed should increment failure count when call times out")] - public async Task Should_Increment_Failure_Count_When_Call_Times_Out( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is closed must invoke onOpen if call fails and breaker transits to open state")] + public void Must_invoke_onOpen_if_call_fails_and_breaker_transits_to_open_state() { - var breaker = ShortCallTimeoutCb( ); - - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Run( async () => - { - await Task.Delay(500); - return SayTest( ); - } )); - - Assert.True( CheckLatch( breaker.OpenLatch ) ); - Assert.Equal( 1, breaker.Instance.CurrentFailureCount ); - Assert.True(breaker.Instance.LastCaughtException is TimeoutException); + var breaker = ShortCallTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); } } public class AnAsynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase { - [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to close on success")] - public async Task Should_Pass_Call_And_Transition_To_Close_On_Success( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is half open must pass through next call and close on success")] + public async Task Must_pass_through_next_call_and_close_on_success() { - var breaker = ShortResetTimeoutCb( ); - await InterceptExceptionTypeAsync( async ( ) => - await breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) ) ); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); - - var result = await breaker.Instance.WithCircuitBreaker( async - () => await Task.Factory.StartNew( SayTest ) ); - - Assert.True( CheckLatch( breaker.ClosedLatch ) ); - Assert.Equal( SayTest( ), result ); + var breaker = ShortResetTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); + var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout); + Assert.Equal("hi", result); + Assert.True(CheckLatch(breaker.ClosedLatch)); } - [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to open on exception")] - public async Task Should_Pass_Call_And_Transition_To_Open_On_Exception( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is half open must re-open on exception in call")] + public async Task Must_reopen_on_exception_in_call() { - var breaker = ShortResetTimeoutCb( ); - - Assert.True( await InterceptExceptionTypeAsync( async () => - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Factory.StartNew( ThrowException )))); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); - - Assert.True( await InterceptExceptionTypeAsync( async () => - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Factory.StartNew( ThrowException )))); - Assert.True( CheckLatch( breaker.OpenLatch ) ); + var breaker = ShortResetTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); + breaker.OpenLatch.Reset(); + await InterceptException(() => + breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout)); + Assert.True(CheckLatch(breaker.OpenLatch)); } - [Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to open on async failure")] - public void Should_Pass_Call_And_Transition_To_Open_On_Async_Failure( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is half open must re-open on async failure")] + public void Must_reopen_on_async_failure() { - var breaker = ShortResetTimeoutCb( ); - - breaker.Instance.WithCircuitBreaker( async () => await Task.Run( ThrowException ) ); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + var breaker = ShortResetTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); - breaker.Instance.WithCircuitBreaker( async () => await Task.Run( ThrowException ) ); - Assert.True( CheckLatch( breaker.OpenLatch ) ); + breaker.OpenLatch.Reset(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.OpenLatch)); } } public class AnAsynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase { - [Fact(DisplayName = "An asynchronous circuit breaker that is open should throw exceptions when called before reset timeout")] - public async Task Should_Throw_Exceptions_When_Called_Before_Reset_Timeout( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is open must throw exceptions when called before reset timeout")] + public async Task Must_throw_exceptions_when_called_before_reset_timeout() { - var breaker = LongResetTimeoutCb( ); + var breaker = LongResetTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + + Assert.True(CheckLatch(breaker.OpenLatch)); - Assert.True( await InterceptExceptionTypeAsync(async ( ) => - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Factory.StartNew( ThrowException ) ) ) ); - Assert.True( CheckLatch( breaker.OpenLatch ) ); - Assert.True( await InterceptExceptionTypeAsync(async ( ) => - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Factory.StartNew( ThrowException ) ) ) ); + await InterceptException( + () => breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout)); } - [Fact(DisplayName = "An asynchronous circuit breaker that is open should transition to half open when reset timeout")] - public async Task Should_Transition_To_Half_Open_When_Reset_Timeout( ) + [Fact(DisplayName = "An asynchronous circuit breaker that is open must transition to half-open on reset timeout")] + public void Must_transition_to_half_open_on_reset_timeout() { - var breaker = ShortResetTimeoutCb( ); - - Assert.True( await InterceptExceptionTypeAsync( async () => - await breaker.Instance.WithCircuitBreaker( async () => - await Task.Factory.StartNew( ThrowException ) ) ) ); - Assert.True( CheckLatch( breaker.HalfOpenLatch ) ); + var breaker = ShortResetTimeoutCb(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); + Assert.True(CheckLatch(breaker.HalfOpenLatch)); } - - [Fact(DisplayName = "An asynchronous circuit breaker that is open should increase the reset timeout after it transits to open again")] - public async Task Should_Reset_Timeout_After_It_Transits_To_Open_Again() + + [Fact(DisplayName = "An asynchronous circuit breaker that is open must increase the reset timeout after it transits to open again")] + public async Task Must_increase_reset_timeout_after_it_transits_to_open_again() { var breaker = NonOneFactorCb(); - Assert.True(await InterceptExceptionTypeAsync(async () => - await breaker.Instance.WithCircuitBreaker(async () => - await Task.Run(ThrowException)))); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); Assert.True(CheckLatch(breaker.OpenLatch)); - var e1 = await InterceptExceptionAsync(async () => - await breaker.Instance.WithCircuitBreaker(() => Task.FromResult(SayTest()))); + var e1 = await InterceptException( + () => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException))); var shortRemainingDuration = e1.RemainingDuration; - await Task.Delay(1000); + await Task.Delay(Dilated(TimeSpan.FromMilliseconds(1000))); Assert.True(CheckLatch(breaker.HalfOpenLatch)); // transit to open again - Assert.True(await InterceptExceptionTypeAsync(async () => - await breaker.Instance.WithCircuitBreaker(async () => - await Task.Run(ThrowException)))); + breaker.OpenLatch.Reset(); + _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); Assert.True(CheckLatch(breaker.OpenLatch)); - var e2 = await InterceptExceptionAsync(async () => - await breaker.Instance.WithCircuitBreaker(() => Task.FromResult(SayTest()))); + var e2 = await InterceptException(() => + breaker.Instance.WithCircuitBreaker(() => Task.FromResult(SayHi()))); var longRemainingDuration = e2.RemainingDuration; - Assert.True(shortRemainingDuration < longRemainingDuration); + shortRemainingDuration.ShouldBeLessThan(longRemainingDuration); } } public class CircuitBreakerSpecBase : AkkaSpec { - public TimeSpan AwaitTimeout { get; } = TimeSpan.FromSeconds(2); - - public bool CheckLatch( CountdownEvent latch ) - { - return latch.Wait( AwaitTimeout ); - } + public TimeSpan AwaitTimeout => Dilated(TimeSpan.FromSeconds(2)); - public Task Delay( TimeSpan toDelay, CancellationToken? token ) - { - return token.HasValue ? Task.Delay( toDelay, token.Value ) : Task.Delay( toDelay ); - } - - public async Task DelayedSayTest(TimeSpan delay) - { - await Task.Delay(delay); - return "Test"; - } + public bool CheckLatch(CountdownEvent latch) => latch.Wait(AwaitTimeout); [DebuggerStepThrough] - public void ThrowException() => throw new TestException("Test Exception"); + public static void ThrowException() => throw new TestException("Test Exception"); - public string SayTest( ) => "Test"; - - protected T InterceptException(Action actionThatThrows) where T : Exception - { - return Assert.Throws(() => - { - try - { - actionThatThrows(); - } - catch (AggregateException ex) - { - foreach (var e in ex.Flatten().InnerExceptions.Where(e => e is T).Select(e => e)) - throw e; - } - }); - } + public static string SayHi() => "hi"; - protected async Task InterceptExceptionAsync(Func actionThatThrows) where T : Exception - { - return await Assert.ThrowsAsync(async () => - { - try - { - await actionThatThrows(); - } - catch (AggregateException ex) - { - foreach (var e in ex.Flatten().InnerExceptions.Where(e => e is T).Select(e => e)) - throw e; - } - }); - } - - [SuppressMessage( "Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter" )] - public bool InterceptExceptionType( Action action ) where T : Exception - { - try - { - action.Invoke( ); - return false; - } - catch ( Exception ex ) - { - if (ex is AggregateException aggregate) - { - // ReSharper disable once UnusedVariable - foreach (var temp in aggregate - .InnerExceptions - .Where(t => !(t is T))) - { - throw; - } - } else if (!(ex is T)) - { - throw; - } - } - return true; - } + protected T InterceptException(Action actionThatThrows) where T : Exception => + Intercept(actionThatThrows); - public async Task InterceptExceptionTypeAsync(Func action) where T : Exception + protected static async Task InterceptException(Func actionThatThrows) + where T : Exception { try { - await action(); - return false; + await actionThatThrows(); } catch (Exception ex) { - if (ex is AggregateException aggregate) - { - // ReSharper disable once UnusedVariable - foreach (var temp in aggregate - .InnerExceptions - .Where(t => !(t is T))) - { - throw; - } - } - else if (!(ex is T)) - { - throw; - } + var exception = ex is AggregateException aggregateException + ? aggregateException.Flatten().InnerExceptions[0] + : ex; + + var exceptionType = typeof(T); + return exceptionType == exception.GetType() + ? (T)exception + : throw new ThrowsException(exceptionType, exception); } - return true; - } - public TestBreaker ShortCallTimeoutCb( ) - { - return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 50 ), TimeSpan.FromMilliseconds( 500 ) ) ); + throw new ThrowsException(typeof(T)); } - public TestBreaker ShortResetTimeoutCb( ) - { - return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 1000 ), TimeSpan.FromMilliseconds( 50 ) ) ); - } + public TestBreaker ShortCallTimeoutCb() => + new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(50)), Dilated(TimeSpan.FromMilliseconds(500)))); - public TestBreaker LongCallTimeoutCb( ) - { - return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 5000 ), TimeSpan.FromMilliseconds( 500 ) ) ); - } + public TestBreaker ShortResetTimeoutCb() => + new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(1000)), Dilated(TimeSpan.FromMilliseconds(50)))); - public TestBreaker LongResetTimeoutCb( ) - { - return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds( 100 ), TimeSpan.FromMilliseconds( 5000 ) ) ); - } + public TestBreaker LongCallTimeoutCb() => + new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromSeconds(5), Dilated(TimeSpan.FromMilliseconds(500)))); - public TestBreaker MultiFailureCb( ) - { - return new TestBreaker( new CircuitBreaker(Sys.Scheduler, 5, TimeSpan.FromMilliseconds( 200 ), TimeSpan.FromMilliseconds( 500 ) ) ); - } - - public TestBreaker NonOneFactorCb() - { - return new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromMilliseconds(2000), TimeSpan.FromMilliseconds(1000), TimeSpan.FromDays(1), 5, 0)); - } - } + public TimeSpan LongResetTimeout = TimeSpan.FromSeconds(5); + public TestBreaker LongResetTimeoutCb() => + new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(100)), Dilated(LongResetTimeout))); + + public TestBreaker MultiFailureCb() => + new TestBreaker(new CircuitBreaker(Sys.Scheduler, 5, Dilated(TimeSpan.FromMilliseconds(200)), Dilated(TimeSpan.FromMilliseconds(500)))); + public TestBreaker NonOneFactorCb() => + new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(2000)), Dilated(TimeSpan.FromMilliseconds(1000)), Dilated(TimeSpan.FromDays(1)), 5, 0)); + } internal class TestException : Exception { - public TestException( ) + public TestException() { } - public TestException( string message ) - : base( message ) + public TestException(string message) + : base(message) { } - public TestException( string message, Exception innerException ) - : base( message, innerException ) + public TestException(string message, Exception innerException) + : base(message, innerException) { } - protected TestException( SerializationInfo info, StreamingContext context ) - : base( info, context ) + protected TestException(SerializationInfo info, StreamingContext context) + : base(info, context) { } } - } diff --git a/src/core/Akka.Tests/Pattern/CircuitBreakerStressSpec.cs b/src/core/Akka.Tests/Pattern/CircuitBreakerStressSpec.cs index 244e47b75d3..e3a84dad7ac 100644 --- a/src/core/Akka.Tests/Pattern/CircuitBreakerStressSpec.cs +++ b/src/core/Akka.Tests/Pattern/CircuitBreakerStressSpec.cs @@ -7,11 +7,11 @@ using System; using System.Linq; -using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Pattern; using Akka.TestKit; +using Akka.Util; using Xunit; using Xunit.Abstractions; @@ -73,11 +73,11 @@ protected override void OnReceive(object message) case JobDone _: _doneCount++; break; - case Status.Failure failure when failure.Cause is OpenCircuitException: + case Status.Failure { Cause: OpenCircuitException _ }: _circCount++; _breaker.WithCircuitBreaker(Job).PipeTo(Self); break; - case Status.Failure failure when failure.Cause is TimeoutException: + case Status.Failure { Cause: TimeoutException _ }: _timeoutCount++; _breaker.WithCircuitBreaker(Job).PipeTo(Self); break; @@ -94,9 +94,9 @@ protected override void OnReceive(object message) } } - private async Task Job() + private static async Task Job() { - await Task.Delay(TimeSpan.FromMilliseconds(300)); + await Task.Delay(TimeSpan.FromMilliseconds(ThreadLocalRandom.Current.Next(300))); return JobDone.Instance; } } @@ -106,7 +106,7 @@ public CircuitBreakerStressSpec(ITestOutputHelper output) { } [Fact] - public void A_CircuitBreaker_stress_test() + public async Task A_CircuitBreaker_stress_test() { var breaker = new CircuitBreaker(Sys.Scheduler, 5, TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(200)); var stressActors = Enumerable.Range(0, 3).Select(i => Sys.ActorOf(Props.Create(breaker))).ToList(); @@ -118,13 +118,13 @@ public void A_CircuitBreaker_stress_test() } // let them work for a while - Thread.Sleep(3000); + await Task.Delay(3000); foreach (var stressActor in stressActors) { stressActor.Tell(GetResult.Instance); var result = ExpectMsg(); - result.DoneCount.ShouldBe(0); + result.FailCount.ShouldBe(0); Output.WriteLine("FailCount:{0}, DoneCount:{1}, CircCount:{2}, TimeoutCount:{3}", result.FailCount, result.DoneCount, result.CircCount, result.TimeoutCount); diff --git a/src/core/Akka/Pattern/CircuitBreakerState.cs b/src/core/Akka/Pattern/CircuitBreakerState.cs index 77994aed743..7f5ec107677 100644 --- a/src/core/Akka/Pattern/CircuitBreakerState.cs +++ b/src/core/Akka/Pattern/CircuitBreakerState.cs @@ -30,7 +30,7 @@ public Open(CircuitBreaker breaker) { _breaker = breaker; } - + /// /// Calculate remaining duration until reset to inform the caller in case a backoff algorithm is useful /// @@ -48,28 +48,23 @@ private TimeSpan RemainingDuration() /// N/A /// Implementation of the call that needs protected /// containing result of protected call - public override Task Invoke(Func> body) => + public override Task Invoke(Func> body) => Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); - public override Task - InvokeState(TState state, Func body) => - Task.FromException( - new OpenCircuitException(_breaker.LastCaughtException, - RemainingDuration())); - - public override Task InvokeState(TState state, - Func> body) => Task.FromException( - new OpenCircuitException(_breaker.LastCaughtException, - RemainingDuration())); + public override Task InvokeState(TState state, Func body) => + Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); /// /// Fail-fast on any invocation /// /// Implementation of the call that needs protected /// containing result of protected call - public override Task Invoke(Func body) => + public override Task Invoke(Func body) => Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); + public override Task InvokeState(TState state, Func> body) => + Task.FromException(new OpenCircuitException(_breaker.LastCaughtException, RemainingDuration())); + /// /// No-op for open, calls are never executed so cannot succeed or fail /// @@ -152,11 +147,11 @@ public override async Task Invoke(Func> body) CheckState(); return await CallThrough(body); } - - public override async Task InvokeState(TState state, Func> body) + + public override async Task InvokeState(TState state, Func> body) { CheckState(); - return await CallThrough(state,body); + return await CallThrough(state, body); } /// @@ -171,11 +166,10 @@ public override async Task Invoke(Func body) await CallThrough(body); } - public override async Task InvokeState(TState state, - Func body) + public override async Task InvokeState(TState state, Func body) { CheckState(); - await CallThrough(state,body); + await CallThrough(state, body); } /// @@ -208,10 +202,8 @@ protected override void EnterInternal() /// Override for more descriptive toString /// /// TBD - public override string ToString() - { - return string.Format(CultureInfo.InvariantCulture, "Half-Open currently testing call for success = {0}", (_lock == true)); - } + public override string ToString() => + string.Format(CultureInfo.InvariantCulture, "Half-Open currently testing call for success = {0}", _lock == true); } /// @@ -237,30 +229,18 @@ public Closed(CircuitBreaker breaker) /// TBD /// Implementation of the call that needs protected /// containing result of protected call - public override Task Invoke(Func> body) - { - return CallThrough(body); - } + public override Task Invoke(Func> body) => CallThrough(body); - public override Task InvokeState(TState state, Func> body) - { - return CallThrough(state, body); - } + public override Task InvokeState(TState state, Func> body) => CallThrough(state, body); /// /// Implementation of invoke, which simply attempts the call /// /// Implementation of the call that needs protected /// containing result of protected call - public override Task Invoke(Func body) - { - return CallThrough(body); - } + public override Task Invoke(Func body) => CallThrough(body); - public override Task InvokeState(TState state, Func body) - { - return CallThrough(state, body); - } + public override Task InvokeState(TState state, Func body) => CallThrough(state, body); /// /// On failed call, the failure count is incremented. The count is checked against the configured maxFailures, and @@ -293,15 +273,6 @@ protected override void EnterInternal() _breaker.SwapStateResetTimeout(_breaker.CurrentResetTimeout, _breaker.ResetTimeout); } - /// - /// Returns a that represents this instance. - /// - /// - /// A that represents this instance. - /// - public override string ToString() - { - return $"Closed with failure count = {Current}"; - } + public override string ToString() => $"Closed with failure count = {Current}"; } } diff --git a/src/core/Akka/Util/Extensions/TaskExtensions.cs b/src/core/Akka/Util/Extensions/TaskExtensions.cs index 5a3aea9acf7..f134e76f87a 100644 --- a/src/core/Akka/Util/Extensions/TaskExtensions.cs +++ b/src/core/Akka/Util/Extensions/TaskExtensions.cs @@ -5,11 +5,7 @@ // //----------------------------------------------------------------------- -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Akka.Util.Extensions +namespace System.Threading.Tasks { internal static class TaskExtensions { diff --git a/src/core/Akka/Util/Internal/AtomicState.cs b/src/core/Akka/Util/Internal/AtomicState.cs index ca5c8e8a6af..44172d5c262 100644 --- a/src/core/Akka/Util/Internal/AtomicState.cs +++ b/src/core/Akka/Util/Internal/AtomicState.cs @@ -9,7 +9,6 @@ using System.Collections.Concurrent; using System.Runtime.ExceptionServices; using System.Threading.Tasks; -using Akka.Util.Extensions; namespace Akka.Util.Internal { @@ -157,8 +156,7 @@ public async Task CallThrough(TState state, Func task) /// containing result of protected call public abstract Task Invoke(Func> body); - public abstract Task InvokeState(TState state, - Func> body); + public abstract Task InvokeState(TState state, Func> body); /// /// Abstract entry point for all states