Skip to content

Commit

Permalink
akkadotnet#7068 - Examining CircuitBreaker timings
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Feb 1, 2024
1 parent a7da6a6 commit 5157e05
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions src/core/Akka.Persistence/Snapshot/SnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
using Akka.Pattern;

namespace Akka.Persistence.Snapshot
Expand All @@ -20,6 +21,8 @@ public abstract class SnapshotStore : ActorBase
private readonly TaskContinuationOptions _continuationOptions = TaskContinuationOptions.ExecuteSynchronously;
private readonly bool _publish;
private readonly CircuitBreaker _breaker;
private readonly ILoggingAdapter _log;
private readonly bool _debugEnabled;

/// <summary>
/// Initializes a new instance of the <see cref="SnapshotStore"/> class.
Expand All @@ -43,6 +46,9 @@ protected SnapshotStore()
config.GetInt("circuit-breaker.max-failures", 10),
config.GetTimeSpan("circuit-breaker.call-timeout", TimeSpan.FromSeconds(10)),
config.GetTimeSpan("circuit-breaker.reset-timeout", TimeSpan.FromSeconds(30)));

_debugEnabled = config.GetBoolean("debug");
_log = Context.GetLogger();
}

/// <inheritdoc/>
Expand All @@ -59,6 +65,8 @@ private bool ReceiveSnapshotStore(object message)
switch (message)
{
case LoadSnapshot loadSnapshot:
if(_debugEnabled)
_log.Info($"{nameof(LoadSnapshot)} message received.");

LoadSnapshotAsync(loadSnapshot, self, senderPersistentActor);
break;
Expand Down Expand Up @@ -214,9 +222,20 @@ private async Task LoadSnapshotAsync(LoadSnapshot loadSnapshot, IActorRef self,
{
try
{
if(_debugEnabled)
_log.Info($"Starting {nameof(LoadSnapshotAsync)} circuit breaker.");

var result = await _breaker.WithCircuitBreaker((msg: loadSnapshot, ss: this),
state => state.ss.LoadAsync(state.msg.PersistenceId,
state.msg.Criteria.Limit(state.msg.ToSequenceNr)));
state =>
{
if(_debugEnabled)
_log.Info($"Invoking {nameof(LoadAsync)} inside circuit breaker.");

return state.ss.LoadAsync(state.msg.PersistenceId, state.msg.Criteria.Limit(state.msg.ToSequenceNr));
});

if(_debugEnabled)
_log.Info($"{nameof(LoadSnapshotAsync)} circuit breaker completed.");

senderPersistentActor.Tell(new LoadSnapshotResult(result, loadSnapshot.ToSequenceNr), self);
}
Expand Down

0 comments on commit 5157e05

Please sign in to comment.