diff --git a/src/core/Akka.TestKit/TestKitBase_AwaitConditions.cs b/src/core/Akka.TestKit/TestKitBase_AwaitConditions.cs index 74db4a658d7..d90abe77c6b 100644 --- a/src/core/Akka.TestKit/TestKitBase_AwaitConditions.cs +++ b/src/core/Akka.TestKit/TestKitBase_AwaitConditions.cs @@ -38,7 +38,7 @@ public void AwaitCondition(Func conditionIsFulfilled, CancellationToken ca AwaitConditionAsync(async () => conditionIsFulfilled(), cancellationToken) .WaitAndUnwrapException(); } - + public async Task AwaitConditionAsync(Func> conditionIsFulfilled, CancellationToken cancellationToken = default) { var maxDur = RemainingOrDefault; @@ -71,7 +71,7 @@ public void AwaitCondition(Func conditionIsFulfilled, TimeSpan? max, Cance AwaitConditionAsync(async () => conditionIsFulfilled(), max, cancellationToken) .WaitAndUnwrapException(cancellationToken); } - + public async Task AwaitConditionAsync(Func> conditionIsFulfilled, TimeSpan? max, CancellationToken cancellationToken = default) { var maxDur = RemainingOrDilated(max); @@ -79,7 +79,7 @@ public async Task AwaitConditionAsync(Func> conditionIsFulfilled, Tim var logger = _testState.TestKitSettings.LogTestKitCalls ? _testState.Log : null; await InternalAwaitConditionAsync(conditionIsFulfilled, maxDur, interval, (format, args) => _assertions.Fail(format, args), logger, cancellationToken); } - + /// /// Await until the given condition evaluates to true or the timeout /// expires, whichever comes first. @@ -105,7 +105,7 @@ public void AwaitCondition(Func conditionIsFulfilled, TimeSpan? max, strin AwaitConditionAsync(async () => conditionIsFulfilled(), max, message, cancellationToken) .WaitAndUnwrapException(); } - + public async Task AwaitConditionAsync(Func> conditionIsFulfilled, TimeSpan? max, string message, CancellationToken cancellationToken = default) { var maxDur = RemainingOrDilated(max); @@ -143,16 +143,16 @@ public async Task AwaitConditionAsync(Func> conditionIsFulfilled, Tim /// The message used if the timeout expires. /// public void AwaitCondition(Func conditionIsFulfilled, TimeSpan? max, TimeSpan? interval, string message = null, CancellationToken cancellationToken = default) - { + { AwaitConditionAsync(async () => conditionIsFulfilled(), max, interval, message, cancellationToken) .WaitAndUnwrapException(cancellationToken); } - + public async Task AwaitConditionAsync(Func> conditionIsFulfilled, TimeSpan? max, TimeSpan? interval, string message = null, CancellationToken cancellationToken = default) { var maxDur = RemainingOrDilated(max); var logger = _testState.TestKitSettings.LogTestKitCalls ? _testState.Log : null; - await InternalAwaitConditionAsync(conditionIsFulfilled, maxDur, interval, + await InternalAwaitConditionAsync(conditionIsFulfilled, maxDur, interval, (format, args) => AssertionsFail(format, args, message), logger, cancellationToken); } @@ -179,7 +179,7 @@ public bool AwaitConditionNoThrow(Func conditionIsFulfilled, TimeSpan max, return AwaitConditionNoThrowAsync(async () => conditionIsFulfilled(), max, interval, cancellationToken) .WaitAndUnwrapException(cancellationToken); } - + public Task AwaitConditionNoThrowAsync(Func> conditionIsFulfilled, TimeSpan max, TimeSpan? interval = null, CancellationToken cancellationToken = default) { var intervalDur = interval.GetValueOrDefault(TimeSpan.FromMilliseconds(100)); @@ -218,7 +218,7 @@ protected static bool InternalAwaitCondition(Func conditionIsFulfilled, Ti { return InternalAwaitCondition(conditionIsFulfilled, max, interval, fail, null, cancellationToken); } - + protected static Task InternalAwaitConditionAsync(Func> conditionIsFulfilled, TimeSpan max, TimeSpan? interval, Action fail , CancellationToken cancellationToken = default) { @@ -258,7 +258,7 @@ protected static bool InternalAwaitCondition(Func conditionIsFulfilled, Ti { return InternalAwaitConditionAsync(async () => conditionIsFulfilled(), max, interval, fail, logger, cancellationToken) .WaitAndUnwrapException(cancellationToken); - + } protected static async Task InternalAwaitConditionAsync(Func> conditionIsFulfilled, TimeSpan max, TimeSpan? interval, Action fail, ILoggingAdapter logger, CancellationToken cancellationToken = default) @@ -293,10 +293,59 @@ protected static async Task InternalAwaitConditionAsync(Func> c return true; } - private static void ConditionalLog(ILoggingAdapter logger, string format, params object[] args) + protected void AwaitCond(Func p, TimeSpan? max = null, TimeSpan? interval = null, string message = "") { - if (logger != null) - logger.Debug(format, args); + if (interval == null) interval = TimeSpan.FromMilliseconds(100); + + var dilatedMax = RemainingOrDilated(max); + var stop = Now + dilatedMax; + + void Poll(TimeSpan t) + { + if (!p()) + { + _assertions.AssertTrue(Now < stop, $"timeout {dilatedMax} expired: {message}"); + Thread.Sleep(t); + Poll((stop - Now).Min(interval.Value)); + } + } + + Poll(dilatedMax.Min(interval.Value)); } + + protected void Within(TimeSpan max, Action f) => + Within(TimeSpan.Zero, max, f); + + protected void Within(TimeSpan min, TimeSpan max, Action f) + { + var dilatedMax = Dilated(max); + var start = Now; + var rem = _testState.End.HasValue ? _testState.End.Value - start : Timeout.InfiniteTimeSpan; + _assertions.AssertTrue(rem.IsInfiniteTimeout() || rem >= min, "Required min time {0} not possible, only {1} left.", min, rem); + + _testState.LastWasNoMsg = false; + + var maxDiff = dilatedMax.Min(rem); + var prevEnd = _testState.End; + _testState.End = start + maxDiff; + + try + { + f(); + } + finally + { + _testState.End = prevEnd; + } + + var diff = Now - start; + _assertions.AssertTrue(min <= diff, $"block took {diff}, should at least have been {min}"); + if (!_testState.LastWasNoMsg) + { + _assertions.AssertTrue(diff <= maxDiff, $"block took {diff}, exceeding {maxDiff}"); + } + } + + private static void ConditionalLog(ILoggingAdapter logger, string format, params object[] args) => logger?.Info(format, args); } } diff --git a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs index fe7608cbcde..034773f83a7 100644 --- a/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs +++ b/src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs @@ -84,7 +84,7 @@ public void Must_increment_failure_count_on_callTimeout_before_call_finishes() var breaker = ShortCallTimeoutCb(); Task.Run(() => breaker.Instance.WithSyncCircuitBreaker(() => Thread.Sleep(Dilated(TimeSpan.FromSeconds(1))))); Within(TimeSpan.FromMilliseconds(900), - () => AwaitCondition(() => breaker.Instance.CurrentFailureCount == 1, Dilated(TimeSpan.FromMilliseconds(100)))); + () => AwaitCond(() => breaker.Instance.CurrentFailureCount == 1, Dilated(TimeSpan.FromMilliseconds(100)))); } } @@ -194,7 +194,7 @@ public class AnAsynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase public async Task Must_allow_calls_through() { var breaker = LongCallTimeoutCb(); - var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout); + var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync1(AwaitTimeout); Assert.Equal("hi", result); } @@ -203,7 +203,7 @@ public async Task Must_increment_failure_count_on_exception() { var breaker = LongCallTimeoutCb(); await InterceptException(() => - breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout)); + breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync1(AwaitTimeout)); Assert.True(CheckLatch(breaker.OpenLatch)); breaker.Instance.CurrentFailureCount.ShouldBe(1); } @@ -244,7 +244,7 @@ public async Task Must_increment_failure_count_on_callTimeout() // Since the timeout should have happened before the inner code finishes // we expect a timeout, not TestException - await InterceptException(() => future.WaitAsync(AwaitTimeout)); + await InterceptException(() => future.WaitAsync1(AwaitTimeout)); } [Fact(DisplayName = "An asynchronous circuit breaker that is closed must invoke onOpen if call fails and breaker transits to open state")] @@ -264,7 +264,7 @@ public async Task Must_pass_through_next_call_and_close_on_success() var breaker = ShortResetTimeoutCb(); _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); Assert.True(CheckLatch(breaker.HalfOpenLatch)); - var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout); + var result = await breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync1(AwaitTimeout); Assert.Equal("hi", result); Assert.True(CheckLatch(breaker.ClosedLatch)); } @@ -277,7 +277,7 @@ public async Task Must_reopen_on_exception_in_call() Assert.True(CheckLatch(breaker.HalfOpenLatch)); breaker.OpenLatch.Reset(); await InterceptException(() => - breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync(AwaitTimeout)); + breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).WaitAsync1(AwaitTimeout)); Assert.True(CheckLatch(breaker.OpenLatch)); } @@ -305,7 +305,7 @@ public async Task Must_throw_exceptions_when_called_before_reset_timeout() Assert.True(CheckLatch(breaker.OpenLatch)); await InterceptException( - () => breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync(AwaitTimeout)); + () => breaker.Instance.WithCircuitBreaker(() => Task.Run(SayHi)).WaitAsync1(AwaitTimeout)); } [Fact(DisplayName = "An asynchronous circuit breaker that is open must transition to half-open on reset timeout")] @@ -323,7 +323,7 @@ public async Task Must_increase_reset_timeout_after_it_transits_to_open_again() _ = breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)); Assert.True(CheckLatch(breaker.OpenLatch)); - var e1 = await InterceptException( + var e1 = await InterceptException( () => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException))); var shortRemainingDuration = e1.RemainingDuration; @@ -379,23 +379,23 @@ protected static async Task InterceptException(Func actionThatThrows throw new ThrowsException(typeof(T)); } - public TestBreaker ShortCallTimeoutCb() => + public TestBreaker ShortCallTimeoutCb() => new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(50)), Dilated(TimeSpan.FromMilliseconds(500)))); - public TestBreaker ShortResetTimeoutCb() => + public TestBreaker ShortResetTimeoutCb() => new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(1000)), Dilated(TimeSpan.FromMilliseconds(50)))); - public TestBreaker LongCallTimeoutCb() => + public TestBreaker LongCallTimeoutCb() => new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, TimeSpan.FromSeconds(5), Dilated(TimeSpan.FromMilliseconds(500)))); public TimeSpan LongResetTimeout = TimeSpan.FromSeconds(5); public TestBreaker LongResetTimeoutCb() => new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(100)), Dilated(LongResetTimeout))); - public TestBreaker MultiFailureCb() => + public TestBreaker MultiFailureCb() => new TestBreaker(new CircuitBreaker(Sys.Scheduler, 5, Dilated(TimeSpan.FromMilliseconds(200)), Dilated(TimeSpan.FromMilliseconds(500)))); - public TestBreaker NonOneFactorCb() => + public TestBreaker NonOneFactorCb() => new TestBreaker(new CircuitBreaker(Sys.Scheduler, 1, Dilated(TimeSpan.FromMilliseconds(2000)), Dilated(TimeSpan.FromMilliseconds(1000)), Dilated(TimeSpan.FromDays(1)), 5, 0)); } diff --git a/src/core/Akka/Util/Extensions/TaskExtensions.cs b/src/core/Akka/Util/Extensions/TaskExtensions.cs index f134e76f87a..c8a743fd423 100644 --- a/src/core/Akka/Util/Extensions/TaskExtensions.cs +++ b/src/core/Akka/Util/Extensions/TaskExtensions.cs @@ -9,8 +9,8 @@ namespace System.Threading.Tasks { internal static class TaskExtensions { -#if NETSTANDARD - public static async Task WaitAsync(this Task task, TimeSpan timeout) +//#if NETSTANDARD + public static async Task WaitAsync1(this Task task, TimeSpan timeout) { var cts = new CancellationTokenSource(); try @@ -29,7 +29,7 @@ public static async Task WaitAsync(this Task task, TimeSpan timeout) } } - public static async Task WaitAsync(this Task task, TimeSpan timeout) + public static async Task WaitAsync1(this Task task, TimeSpan timeout) { var cts = new CancellationTokenSource(); try @@ -46,6 +46,6 @@ public static async Task WaitAsync(this Task task, Ti cts.Dispose(); } } -#endif +//#endif } } diff --git a/src/core/Akka/Util/Internal/AtomicState.cs b/src/core/Akka/Util/Internal/AtomicState.cs index 44172d5c262..75a5301e453 100644 --- a/src/core/Akka/Util/Internal/AtomicState.cs +++ b/src/core/Akka/Util/Internal/AtomicState.cs @@ -81,7 +81,7 @@ public async Task CallThrough(Func> task) var result = default(T); try { - result = await task().WaitAsync(_callTimeout).ConfigureAwait(false); + result = await task().WaitAsync1(_callTimeout).ConfigureAwait(false); CallSucceeds(); } catch (Exception ex) @@ -99,7 +99,7 @@ public async Task CallThrough(TState state, Func> var result = default(T); try { - result = await task(state).WaitAsync(_callTimeout).ConfigureAwait(false); + result = await task(state).WaitAsync1(_callTimeout).ConfigureAwait(false); CallSucceeds(); } catch (Exception ex) @@ -122,7 +122,7 @@ public async Task CallThrough(Func task) { try { - await task().WaitAsync(_callTimeout).ConfigureAwait(false); + await task().WaitAsync1(_callTimeout).ConfigureAwait(false); CallSucceeds(); } catch (Exception ex) @@ -137,7 +137,7 @@ public async Task CallThrough(TState state, Func task) { try { - await task(state).WaitAsync(_callTimeout).ConfigureAwait(false); + await task(state).WaitAsync1(_callTimeout).ConfigureAwait(false); CallSucceeds(); } catch (Exception ex)