Skip to content

Commit

Permalink
feat: reject events on retryable errors
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Aug 20, 2024
1 parent a0080d2 commit ad918de
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package akka.persistence.dynamodb.journal

import java.time.Instant
import java.util.concurrent.CompletionException

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
Expand Down Expand Up @@ -42,6 +42,7 @@ import akka.serialization.SerializationExtension
import akka.serialization.Serializers
import akka.stream.scaladsl.Sink
import com.typesafe.config.Config
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException

/**
* INTERNAL API
Expand Down Expand Up @@ -109,12 +110,19 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
// them to complete before we can read the highest sequence number or we will miss it
private val writesInProgress = new java.util.HashMap[String, Future[_]]()

// reject retryable errors rather than fail, to be able to resume rather than stop or restart
// TODO: API call timeout exceptions not considered retryable as they may have persisted?
private def isRetryableError(error: Throwable): Boolean = error match {
case _: ProvisionedThroughputExceededException => true
case _ => false
}

override def receivePluginInternal: Receive = { case WriteFinished(pid, f) =>
writesInProgress.remove(pid, f)
}

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
def atomicWrite(atomicWrite: AtomicWrite): Future[Done] = {
override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
def atomicWrite(atomicWrite: AtomicWrite): Future[Seq[Try[Unit]]] = {
val serialized: Try[Seq[SerializedJournalItem]] = Try {
atomicWrite.payload.map { pr =>
val (event, tags) = pr.payload match {
Expand Down Expand Up @@ -169,7 +177,15 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
ps.publish(pr, serialized.writeTimestamp)
}
}
Done
Nil // successful writes
}
.recoverWith { case e: CompletionException =>
e.getCause match {
case error if isRetryableError(error) =>
Future.successful(Seq(Failure(error))) // will be rejected
case error =>
Future.failed(error)
}
}

case Failure(exc) =>
Expand All @@ -178,7 +194,7 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
}

val persistenceId = messages.head.persistenceId
val writeResult: Future[Done] =
val writeResult: Future[Seq[Try[Unit]]] =
if (messages.size == 1)
atomicWrite(messages.head)
else {
Expand All @@ -192,7 +208,7 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
writeResult.onComplete { _ =>
self ! WriteFinished(persistenceId, writeResult)
}
writeResult.map(_ => Nil)(ExecutionContexts.parasitic)
writeResult
}

override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
Expand Down
7 changes: 7 additions & 0 deletions docs/src/main/paradox/journal.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ The events are serialized with @extref:[Akka Serialization](akka:serialization.h
is stored in the `event_payload` column together with information about what serializer that was used in the
`event_ser_id` and `event_ser_manifest` columns.

## Retryable errors

When persisting events, any DynamoDB errors that are considered retryable, such as when provisioned throughput capacity
is exceeded, will cause events to be @extref:[rejected](akka:typed/persistence.html#journal-rejections) rather than
marked as a journal failure. A supervision strategy for `EventRejectedException` failures can then be added to
EventSourcedBehaviors, so that entities can be resumed on these retryable errors rather than stopped or restarted.

## Deletes

The journal supports deletes through hard deletes, which means that journal entries are actually deleted from the
Expand Down

0 comments on commit ad918de

Please sign in to comment.