-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for optional snapshots #7444
Changes from 4 commits
243a494
51d7d18
4da3bce
0db3ecf
6afdd48
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -292,4 +292,41 @@ public void PersistentActor_with_a_failing_snapshot_should_receive_failure_messa | |
m.Cause.Message.Contains("Failed to delete")); | ||
} | ||
} | ||
|
||
public class SnapshotIsOptionalSpec : PersistenceSpec | ||
{ | ||
public SnapshotIsOptionalSpec() : base(Configuration("SnapshotIsOptionalSpec", serialization: "off", | ||
extraConfig: @" | ||
akka.persistence.snapshot-store.local.snapshot-is-optional = true | ||
akka.persistence.snapshot-store.local.class = ""Akka.Persistence.Tests.SnapshotFailureRobustnessSpec+FailingLocalSnapshotStore, Akka.Persistence.Tests"" | ||
")) | ||
{ | ||
} | ||
|
||
[Fact] | ||
public void PersistentActor_with_a_failing_snapshot_with_snapshot_is_optional_true_falls_back_to_events() | ||
{ | ||
var spref = Sys.ActorOf(Props.Create(() => new SnapshotFailureRobustnessSpec.SaveSnapshotTestActor(Name, TestActor))); | ||
|
||
ExpectMsg<RecoveryCompleted>(); | ||
spref.Tell(new SnapshotFailureRobustnessSpec.Cmd("boom")); | ||
ExpectMsg(1L); | ||
|
||
Sys.EventStream.Subscribe(TestActor, typeof(Error)); | ||
try | ||
{ | ||
|
||
var lpref = Sys.ActorOf(Props.Create(() => new SnapshotFailureRobustnessSpec.LoadSnapshotTestActor(Name, TestActor))); | ||
ExpectMsg<Error>(m => m.Message.ToString().StartsWith("Error loading snapshot")); | ||
ExpectMsg("boom-1"); | ||
ExpectMsg<RecoveryCompleted>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LGTM |
||
|
||
} | ||
finally | ||
{ | ||
Sys.EventStream.Unsubscribe(TestActor, typeof(Error)); | ||
} | ||
|
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
|
||
using System; | ||
using Akka.Actor; | ||
using Akka.Event; | ||
using Akka.Persistence.Internal; | ||
|
||
namespace Akka.Persistence | ||
|
@@ -61,7 +62,9 @@ private EventsourcedState RecoveryStarted(long maxReplays) | |
// protect against snapshot stalling forever because journal overloaded and such | ||
var timeout = Extension.JournalConfigFor(JournalPluginId).GetTimeSpan("recovery-event-timeout", null, false); | ||
var timeoutCancelable = Context.System.Scheduler.ScheduleTellOnceCancelable(timeout, Self, new RecoveryTick(true), Self); | ||
|
||
|
||
var snapshotIsOptional = Extension.SnapshotStoreConfigFor(SnapshotPluginId).GetBoolean("snapshot-is-optional", false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LGTM |
||
|
||
bool RecoveryBehavior(object message) | ||
{ | ||
Receive receiveRecover = ReceiveRecover; | ||
|
@@ -120,15 +123,24 @@ bool RecoveryBehavior(object message) | |
} | ||
case LoadSnapshotFailed failed: | ||
timeoutCancelable.Cancel(); | ||
try | ||
if (snapshotIsOptional) | ||
{ | ||
OnRecoveryFailure(failed.Cause); | ||
Log.Info("Snapshot load error for persistenceId [{0}]. Replaying all events since snapshot-is-optional=true", PersistenceId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LGTM - might consider making this a |
||
ChangeState(Recovering(RecoveryBehavior, timeout)); | ||
Journal.Tell(new ReplayMessages(LastSequenceNr +1L, long.MaxValue, maxReplays, PersistenceId, Self)); | ||
} | ||
finally | ||
else | ||
{ | ||
Context.Stop(Self); | ||
try | ||
{ | ||
OnRecoveryFailure(failed.Cause); | ||
} | ||
finally | ||
{ | ||
Context.Stop(Self); | ||
} | ||
ReturnRecoveryPermit(); | ||
} | ||
ReturnRecoveryPermit(); | ||
break; | ||
case RecoveryTick { Snapshot: true }: | ||
try | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -181,6 +181,14 @@ akka.persistence { | |
call-timeout = 20s | ||
reset-timeout = 60s | ||
} | ||
|
||
# Set this to true if successful loading of snapshot is not necessary. | ||
# This can be useful when it is alright to ignore snapshot in case of | ||
# for example deserialization errors. When snapshot loading fails it will instead | ||
# recover by replaying all events. | ||
# Don't set to true if events are deleted because that would | ||
# result in wrong recovered state if snapshot load fails. | ||
snapshot-is-optional = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LGTM |
||
} | ||
|
||
fsm { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM