Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support taking snapshots of aggregates (ported from 1.1 PR) #24

Merged
merged 1 commit into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package no.nextgentel.oss.akkatools.aggregate

import no.nextgentel.oss.akkatools.persistence.InternalCommand

case class SaveSnapshotOfCurrentState(dispatchId:Option[String],deleteEvents : Boolean) extends AggregateCmd with InternalCommand {
override def id(): String = dispatchId.getOrElse(throw new RuntimeException("This SaveSnapshotOfCurrentState does not have a dispatch-id"))
}

case class AggregateRejectedSnapshotRequest(persistenceId: String, sequenceNr: Long, stateWhenRejected: Any)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package no.nextgentel.oss.akkatools.aggregate

import akka.actor.ActorPath
import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning
import akka.persistence.{DeleteMessagesFailure, DeleteMessagesSuccess, SaveSnapshotFailure, SaveSnapshotSuccess, SnapshotOffer}
import no.nextgentel.oss.akkatools.persistence.{EnhancedPersistentShardingActor, GetState, SendAsDM}

import scala.reflect.ClassTag
Expand Down Expand Up @@ -48,6 +49,16 @@ abstract class GeneralAggregateBase[E:ClassTag, S <: AggregateStateBase[E, S]:Cl
private val defaultErrorHandler = (errorMsg:String) => log.debug("No cmdFailed-handler executed")


protected override def onSnapshotOffer(offer : SnapshotOffer) : Unit = {
state = offer.snapshot.asInstanceOf[S]
}

//Override to handle aggregate specific restriction on snapshots, accepts all by default
protected def acceptSnapshotRequest(request : SaveSnapshotOfCurrentState) : Boolean = {
true
}


def cmdToEvent:PartialFunction[AggregateCmd, ResultingEvent[E]]

override protected def stateInfo(): String = state.toString
Expand All @@ -73,11 +84,23 @@ abstract class GeneralAggregateBase[E:ClassTag, S <: AggregateStateBase[E, S]:Cl
}

final def tryCommand = {
case x:AggregateCmd =>
case x: AggregateCmd =>
// Can't get pattern-matching to work with generics..
if (x.isInstanceOf[GetState]) {
sender ! state
} else {
}
else if (x.isInstanceOf[SaveSnapshotOfCurrentState]) {
val msg = x.asInstanceOf[SaveSnapshotOfCurrentState]
val accepted = acceptSnapshotRequest(msg)
if (accepted && this.isInSnapshottableState()) {
saveSnapshot(state,msg.deleteEvents)
} else {
log.warning(s"Rejected snapshot request $msg when in state $state")
sender ! AggregateRejectedSnapshotRequest(this.persistenceId, lastSequenceNr, state)
}

}
else {
val cmd = x
val defaultCmdToEvent:(AggregateCmd) => ResultingEvent[E] = {(q) => throw new AggregateError("Do not know how to process cmd of type " + q.getClass)}
val eventResult:ResultingEvent[E] = cmdToEvent.applyOrElse(cmd, defaultCmdToEvent)
Expand Down Expand Up @@ -182,6 +205,7 @@ abstract class GeneralAggregateBase[E:ClassTag, S <: AggregateStateBase[E, S]:Cl
tmpStateWhileProcessingUnconfirmedWarning = null.asInstanceOf[S]
}


/**
* If doUnconfirmedWarningProcessing is turned on, then override this method
* to try to do something useful before we give up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.actor._
import akka.event.Logging.MDC
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
import akka.persistence.query.{EventEnvelope, PersistenceQuery, Sequence, scaladsl}
import akka.persistence.{AtLeastOnceDelivery, PersistentActor, RecoveryCompleted}
import akka.persistence.{AtLeastOnceDelivery, DeleteMessagesFailure, DeleteMessagesSuccess, PersistentActor, RecoveryCompleted, SaveSnapshotFailure, SaveSnapshotSuccess, SnapshotOffer}
import akka.stream.ActorMaterializer
import com.fasterxml.jackson.annotation.JsonTypeInfo
import no.nextgentel.oss.akkatools.persistence.jdbcjournal._
Expand All @@ -18,6 +18,33 @@ import scala.util.{Failure, Success}

case class SendAsDM(payload: AnyRef, destinationActor: ActorPath, confirmationRoutingInfo: AnyRef = null)

//Event that marks the point where messages have been deleted up to
case class MoveDeletedMessageMarkEvent(minimumMark : Long) extends JacksonJsonSerializable

//Contains state from the user and the state of this actor that needs to be stored in a snapshot
case class FullSnapshotState(@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include= JsonTypeInfo.As.PROPERTY, property="@snapshot_data") userData : Any,
localState : InternalDurableState) extends JacksonJsonSerializable

//Contains state that must be stored with a snapshot, and recreated when recovering from a snapshots
case class InternalDurableState(
var deleteMessagesUpTo : Option[Long],
var deletedMessages : Long,
var dmGeneratingVersionFixedDeliveryIds: Set[Long],
var currentDmGeneratingVersion: Int,
var recoveredEventsCount_sinceLast_dmGeneratingVersion: Int,
// Info about all already processed (successfully) inbound DMs.
// If we see one of these again, we know that it is a resending caused by the sender not
// getting the DM-confirm.
// So instead of trying to process them - and fail since we've already processed them,
// we can 'ignore' them...
// But we cannot just ignore them:
// If the sender never got our DMReceived, we must send it again..
// And since the impl might not have just sent confirm, but used the DM with a new payload to send
// a cmd back to the sender via the DM-confirm, we must facilitate for that to also work again..
// You know.... idempotent cmds :)
var processedDMs: Set[ProcessedDMEvent] // DMs without payload
)

object EnhancedPersistentActor {
// Before we calculated the timeout based on redeliverInterval and warnAfterNumberOfUnconfirmedAttempts,
// This timeout used to be 240 seconds.
Expand All @@ -41,6 +68,20 @@ abstract class EnhancedPersistentActor[E:ClassTag, Ex <: Exception : ClassTag]

implicit val ec = context.dispatcher

def initialState(): InternalDurableState = {
InternalDurableState(
deleteMessagesUpTo = None,
deletedMessages = 0,
dmGeneratingVersionFixedDeliveryIds = Set[Long](),
currentDmGeneratingVersion = 0,
recoveredEventsCount_sinceLast_dmGeneratingVersion = 0,
processedDMs = Set[ProcessedDMEvent]()
)
}

//State that will be stored alongside a snapshot, and recovered when starting from a snapshot
private var internalDurableState = initialState()

private var isProcessingEvent = false
private var pendingDurableMessage:Option[DurableMessage] = None
private var timeoutTimer:Option[Cancellable] = None
Expand Down Expand Up @@ -69,9 +110,6 @@ abstract class EnhancedPersistentActor[E:ClassTag, Ex <: Exception : ClassTag]
timeout
}

private var dmGeneratingVersionFixedDeliveryIds = Set[Long]()
private var currentDmGeneratingVersion = 0

// Override this in your code to set the dmGeneratingVersion your code is currently using.
// You can bump this version if you have changed the code in such a way that it now sends
// more DMs than before based on the same events.
Expand All @@ -82,17 +120,10 @@ abstract class EnhancedPersistentActor[E:ClassTag, Ex <: Exception : ClassTag]
// we will not perform the fix described above again
protected def getDMGeneratingVersion = 0

// Info about all already processed (successfully) inbound DMs.
// If we see one of these again, we know that it is a resending caused by the sender not
// getting the DM-confirm.
// So instead of trying to process them - and fail since we've already processed them,
// we can 'ignore' them...
// But we cannot just ignore them:
// If the sender never got our DMReceived, we must send it again..
// And since the impl might not have just sent confirm, but used the DM with a new payload to send
// a cmd back to the sender via the DM-confirm, we must facilitate for that to also work again..
// You know.... idempotent cmds :)
private var processedDMs = Set[ProcessedDMEvent]() // DMs without payload
//If this returns false, a request to make a snapshot will be rejected
protected def isInSnapshottableState(): Boolean = {
!isProcessingEvent
}

/**
* @param eventLogLevelInfo Used when processing events live - not recovering
Expand Down Expand Up @@ -143,6 +174,10 @@ abstract class EnhancedPersistentActor[E:ClassTag, Ex <: Exception : ClassTag]
log.mdc( log.mdc - "akkaPersistenceRecovering" )
}

def handleMoveDeletedMessageMarkEvent(ev : MoveDeletedMessageMarkEvent): Unit = {
internalDurableState.deletedMessages = Math.max(internalDurableState.deletedMessages,ev.minimumMark)
}

override def receiveRecover: Receive = {
case r: DurableMessageReceived =>
processingRecoveringMessageStarted
Expand All @@ -153,44 +188,99 @@ abstract class EnhancedPersistentActor[E:ClassTag, Ex <: Exception : ClassTag]
}
case e:ProcessedDMEvent =>
onProcessedDMEvent(e)
case e:MoveDeletedMessageMarkEvent => {
handleMoveDeletedMessageMarkEvent(e)
}
case e:NewDMGeneratingVersionEvent =>
onNewDMGeneratingVersionEvent(e)
case c:RecoveryCompleted =>
onRecoveryCompleted()
case offer: SnapshotOffer =>
offer.snapshot match {
case snapshotData: FullSnapshotState =>
internalDurableState = snapshotData.localState
onSnapshotOffer(SnapshotOffer(offer.metadata, snapshotData.userData))
case _ => throw new RuntimeException(s"Snapshot is not of the expected type, expected ${FullSnapshotState.getClass.getName}) but was ${offer.getClass.getName}" )
}
case event:AnyRef =>
onReceiveRecover(event.asInstanceOf[E])
}

def saveSnapshot(snapshot: Any, deleteMessagesUpTo :Boolean): Unit = {
log.info("Saving snapshot")
if(deleteMessagesUpTo) {
internalDurableState.deleteMessagesUpTo = Some(lastSequenceNr)
}
super.saveSnapshot(FullSnapshotState(snapshot,internalDurableState))
}

override def saveSnapshot(snapshot: Any): Unit = {
saveSnapshot(snapshot,deleteMessagesUpTo = false)
}

protected def onSnapshotOffer(offer : SnapshotOffer): Unit = {
throw new Exception(s"Can not recover from snapshot $offer, handling not defined")
}


protected def onSnapshotSuccess(success : SaveSnapshotSuccess) : Unit = {
log.info(s"Saved snapshot $success")
}

protected def onSnapshotFailure(failure : SaveSnapshotFailure) : Unit = {
log.warning(s"Failed to save snapshot $failure")
}

protected def onDeleteMessagesSuccess(success : DeleteMessagesSuccess) : Unit = {
log.info(s"Delete messages succeed for: $success")
}

protected def onDeleteMessagesFailure(failure : DeleteMessagesFailure) : Unit = {
log.warning(s"Deleting messages failed: $failure")
}

protected def onRecoveryCompleted(): Unit = {
log.debug("Recover complete")

if ( currentDmGeneratingVersion < getDMGeneratingVersion ) {
if ( internalDurableState.currentDmGeneratingVersion < getDMGeneratingVersion ) {
fixDMGeneratingVersionProblem()
}

deletePotentialMessagesMarkedForDeletion()
}

private def deletePotentialMessagesMarkedForDeletion(): Unit = {
internalDurableState.deleteMessagesUpTo match {
case Some(deleteMark) =>
if(deleteMark > internalDurableState.deletedMessages) {
log.info(s"Deleting messages up to sequenceNr $deleteMark")
deleteMessages(deleteMark)
}
case None =>
}
}

private def isFixingDMGeneratingVersionProblem():Boolean = {
currentDmGeneratingVersion < getDMGeneratingVersion && recoveryRunning
internalDurableState.currentDmGeneratingVersion < getDMGeneratingVersion && recoveryRunning
}

private def fixDMGeneratingVersionProblem(): Unit = {
val listOfReceivedDMs = if ( dmGeneratingVersionFixedDeliveryIds.nonEmpty ) {
val listOfReceivedDMs = if ( internalDurableState.dmGeneratingVersionFixedDeliveryIds.nonEmpty ) {

log.warning(s"Found and fixing unconfirmed DMs $dmGeneratingVersionFixedDeliveryIds when going to new dmGeneratingVersion")
log.warning(s"Found and fixing unconfirmed DMs ${internalDurableState.dmGeneratingVersionFixedDeliveryIds} when going to new dmGeneratingVersion")

// We must save that we're done with these DMs
dmGeneratingVersionFixedDeliveryIds.map {
internalDurableState.dmGeneratingVersionFixedDeliveryIds.map {
deliveryId =>
// This is the event we're going to save
DurableMessageReceived(deliveryId, "Added by fixDMGeneratingVersionProblem")
}.toList

} else List()

dmGeneratingVersionFixedDeliveryIds = Set() // Clear it
internalDurableState.dmGeneratingVersionFixedDeliveryIds = Set() // Clear it

log.warning(s"Saving new dmGeneratingVersion=$getDMGeneratingVersion (old: dmGeneratingVersion=$currentDmGeneratingVersion)")
log.info(s"Saving new dmGeneratingVersion=$getDMGeneratingVersion (old: dmGeneratingVersion=${internalDurableState.currentDmGeneratingVersion})")

// Must also save that we are now using new DMGeneratingVersion
val eventList = listOfReceivedDMs :+ NewDMGeneratingVersionEvent(getDMGeneratingVersion)
Expand Down Expand Up @@ -282,8 +372,8 @@ abstract class EnhancedPersistentActor[E:ClassTag, Ex <: Exception : ClassTag]
log.debug(s"Remembering DurableMessageReceived with DeliveryId=${msg.deliveryId}, wasRemovedFromUnconfirmedList=$wasRemovedFromUnconfirmedList")

// Since we might be fixing DMGeneratingVersion issues, we must remove this id from dmGeneratingVersionFixedDeliveryIds - if it exists there..
if ( dmGeneratingVersionFixedDeliveryIds.contains(msg.deliveryId)) {
dmGeneratingVersionFixedDeliveryIds = dmGeneratingVersionFixedDeliveryIds - msg.deliveryId
if ( internalDurableState.dmGeneratingVersionFixedDeliveryIds.contains(msg.deliveryId)) {
internalDurableState.dmGeneratingVersionFixedDeliveryIds = internalDurableState.dmGeneratingVersionFixedDeliveryIds - msg.deliveryId
}
}

Expand Down Expand Up @@ -331,11 +421,11 @@ abstract class EnhancedPersistentActor[E:ClassTag, Ex <: Exception : ClassTag]
}

private def onNewDMGeneratingVersionEvent(settingDMGeneratingVersionEvent: NewDMGeneratingVersionEvent): Unit = {
this.currentDmGeneratingVersion = settingDMGeneratingVersionEvent.version
this.internalDurableState.currentDmGeneratingVersion = settingDMGeneratingVersionEvent.version
}

private def onProcessedDMEvent(processedDMEvent: ProcessedDMEvent): Unit = {
processedDMs = processedDMs + processedDMEvent
internalDurableState.processedDMs = internalDurableState.processedDMs + processedDMEvent
}

/**
Expand Down Expand Up @@ -396,10 +486,21 @@ abstract class EnhancedPersistentActor[E:ClassTag, Ex <: Exception : ClassTag]
try {
if (doUnconfirmedWarningProcessing && (command.isInstanceOf[AtLeastOnceDelivery.UnconfirmedWarning])) {
internalProcessUnconfirmedWarning(command.asInstanceOf[AtLeastOnceDelivery.UnconfirmedWarning])
} else if (pendingDurableMessage.isDefined && processedDMs.contains( ProcessedDMEvent.createFromDM(pendingDurableMessage.get) )) {
} else if (pendingDurableMessage.isDefined && internalDurableState.processedDMs.contains( ProcessedDMEvent.createFromDM(pendingDurableMessage.get) )) {
onAlreadyProcessedCmdViaDMReceivedAgain(command)
} else {
tryCommand.apply(command)
command match {
case snapFailure: SaveSnapshotFailure =>
onSnapshotFailure(snapFailure)
case snapSuccess: SaveSnapshotSuccess =>
deletePotentialMessagesMarkedForDeletion()
onSnapshotSuccess(snapSuccess)
case deleteMessagesSuccess: DeleteMessagesSuccess =>
moveDeletedMessageMark(deleteMessagesSuccess)
case deleteMessagesFailure: DeleteMessagesFailure =>
onDeleteMessagesFailure(deleteMessagesFailure)
case _ => tryCommand.apply(command)
}
}

if (!persistAndApplyEventHasBeenCalled) {
Expand All @@ -422,6 +523,16 @@ abstract class EnhancedPersistentActor[E:ClassTag, Ex <: Exception : ClassTag]
startTimeoutTimer
}

//Try to persist the deleted messages mark, and if ok run update to state and inform user
private def moveDeletedMessageMark(deleteMessagesSuccess: DeleteMessagesSuccess): Unit = {
persist(MoveDeletedMessageMarkEvent(deleteMessagesSuccess.toSequenceNr)) {
e => {
handleMoveDeletedMessageMarkEvent(e)
onDeleteMessagesSuccess(deleteMessagesSuccess)
}
}
}

protected def onAlreadyProcessedCmdViaDMReceivedAgain(cmd:AnyRef): Unit ={
log.warning(s"Received already processed DM again: $cmd")
}
Expand Down Expand Up @@ -521,7 +632,7 @@ abstract class EnhancedPersistentActor[E:ClassTag, Ex <: Exception : ClassTag]
// We need to add all ids to a list, and remove it from the list if we (later) get an event telling us that we knew that this dms was confirmed.
// then we end up knowing which deliveryIds we actually need to save that now is confirmed.

dmGeneratingVersionFixedDeliveryIds = dmGeneratingVersionFixedDeliveryIds + usedDeliveryId
internalDurableState.dmGeneratingVersionFixedDeliveryIds = internalDurableState.dmGeneratingVersionFixedDeliveryIds + usedDeliveryId

log.debug(s"Since we're isFixingDMGeneratingVersionProblem, we're confirming deliveryId=$usedDeliveryId right away")
confirmDelivery(usedDeliveryId)
Expand Down
Loading