Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reject events on retryable errors #75

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package akka.persistence.dynamodb.journal

import java.time.Instant
import java.util.concurrent.CompletionException

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
Expand Down Expand Up @@ -41,6 +41,7 @@ import akka.serialization.SerializationExtension
import akka.serialization.Serializers
import akka.stream.scaladsl.Sink
import com.typesafe.config.Config
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException

/**
* INTERNAL API
Expand Down Expand Up @@ -104,14 +105,14 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)

// if there are pending writes when an actor restarts we must wait for
// them to complete before we can read the highest sequence number or we will miss it
private val writesInProgress = new java.util.HashMap[String, Future[Done]]()
private val writesInProgress = new java.util.HashMap[String, Future[Seq[Try[Unit]]]]()

override def receivePluginInternal: Receive = { case WriteFinished(pid, f) =>
writesInProgress.remove(pid, f)
}

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
def atomicWrite(atomicWrite: AtomicWrite): Future[Done] = {
override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
def atomicWrite(atomicWrite: AtomicWrite): Future[Seq[Try[Unit]]] = {
val serialized: Try[Seq[SerializedJournalItem]] = Try {
atomicWrite.payload.map { pr =>
val (event, tags) = pr.payload match {
Expand Down Expand Up @@ -166,7 +167,15 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
ps.publish(pr, serialized.writeTimestamp)
}
}
Done
Nil // successful writes
}
.recoverWith { case e: CompletionException =>
e.getCause match {
case error: ProvisionedThroughputExceededException => // reject retryable errors
Future.successful(atomicWrite.payload.map(_ => Failure(error)))
case error => // otherwise journal failure
Future.failed(error)
}
}

case Failure(exc) =>
Expand All @@ -175,7 +184,7 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
}

val persistenceId = messages.head.persistenceId
val writeResult: Future[Done] =
val writeResult: Future[Seq[Try[Unit]]] =
if (messages.size == 1)
atomicWrite(messages.head)
else {
Expand All @@ -189,7 +198,7 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
writeResult.onComplete { _ =>
self ! WriteFinished(persistenceId, writeResult)
}
writeResult.map(_ => Nil)(ExecutionContexts.parasitic)
writeResult
}

override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
Expand Down
7 changes: 7 additions & 0 deletions docs/src/main/paradox/journal.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ The events are serialized with @extref:[Akka Serialization](akka:serialization.h
is stored in the `event_payload` column together with information about what serializer that was used in the
`event_ser_id` and `event_ser_manifest` columns.

## Retryable errors

When persisting events, any DynamoDB errors that are considered retryable, such as when provisioned throughput capacity
is exceeded, will cause events to be @extref:[rejected](akka:typed/persistence.html#journal-rejections) rather than
marked as a journal failure. A supervision strategy for `EventRejectedException` failures can then be added to
EventSourcedBehaviors, so that entities can be resumed on these retryable errors rather than stopped or restarted.

## Deletes

The journal supports deletes through hard deletes, which means that journal entries are actually deleted from the
Expand Down