Skip to content

Commit

Permalink
bump: update akka to 2.10.0-M1 (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-alfers authored Oct 15, 2024
1 parent fcb55ac commit 0537a73
Show file tree
Hide file tree
Showing 23 changed files with 94 additions and 113 deletions.
5 changes: 0 additions & 5 deletions .jvmopts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import scala.util.Success
import akka.Done
import akka.actor.ClassicActorSystemProvider
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.persistence.SnapshotSelectionCriteria
Expand Down Expand Up @@ -549,7 +548,7 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf
case pid :: tail =>
pidOperation(pid).flatMap { _ =>
if (n % settings.cleanupSettings.logProgressEvery == 0)
log.infoN("Cleanup {} [{}] of [{}].", operationName, n, size)
log.info("Cleanup {} [{}] of [{}].", operationName, n, size)
loop(tail, n + 1)
}
}
Expand All @@ -559,7 +558,7 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf

result.onComplete {
case Success(_) =>
log.info2("Cleanup completed {} of [{}] persistenceId.", operationName, size)
log.info("Cleanup completed {} of [{}] persistenceId.", operationName, size)
case Failure(e) =>
log.error(s"Cleanup {$operationName} failed.", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

import akka.NotUsed
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.InternalApi
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.query.Offset
Expand Down Expand Up @@ -118,7 +117,7 @@ import org.slf4j.Logger
}

if (state.queryCount != 0 && log.isDebugEnabled())
log.debugN(
log.debug(
"{} next query [{}] from slice [{}], between time [{} - {}]. Found [{}] items in previous query.",
logPrefix,
state.queryCount,
Expand All @@ -136,7 +135,7 @@ import org.slf4j.Logger
.via(deserializeAndAddOffset(state.latest)))
} else {
if (log.isDebugEnabled)
log.debugN(
log.debug(
"{} query [{}] from slice [{}] completed. Found [{}] items in previous query.",
logPrefix,
state.queryCount,
Expand All @@ -149,7 +148,7 @@ import org.slf4j.Logger

val currentTimestamp = InstantFactory.now()
if (log.isDebugEnabled())
log.debugN(
log.debug(
"{} query slice [{}], from time [{}] until now [{}].",
logPrefix,
slice,
Expand All @@ -174,7 +173,7 @@ import org.slf4j.Logger
val initialOffset = toTimestampOffset(offset)

if (log.isDebugEnabled())
log.debugN("Starting {} query from slice [{}], from time [{}].", logPrefix, slice, initialOffset.timestamp)
log.debug("Starting {} query from slice [{}], from time [{}].", logPrefix, slice, initialOffset.timestamp)

def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
val offset = extractOffset(envelope)
Expand All @@ -198,7 +197,7 @@ import org.slf4j.Logger
if (log.isDebugEnabled()) {
if (state.latestBacktracking.seen.nonEmpty &&
offset.timestamp.isAfter(state.latestBacktracking.timestamp.plus(firstBacktrackingQueryWindow)))
log.debugN(
log.debug(
"{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]",
logPrefix,
state.latestBacktracking,
Expand All @@ -221,7 +220,7 @@ import org.slf4j.Logger

if (log.isDebugEnabled)
delay.foreach { d =>
log.debugN(
log.debug(
"{} query [{}] from slice [{}] delay next [{}] ms.",
logPrefix,
state.queryCount,
Expand Down Expand Up @@ -308,7 +307,7 @@ import org.slf4j.Logger
" in backtracking mode,"
else
""
log.debugN(
log.debug(
"{} next query [{}]{} from slice [{}], between time [{} - {}]. {}",
logPrefix,
newState.queryCount,
Expand Down Expand Up @@ -351,7 +350,7 @@ import org.slf4j.Logger
throw new IllegalStateException(
s"Too many events stored with the same timestamp [$currentTimestamp], buffer size [${settings.querySettings.bufferSize}]")
}
log.traceN(
log.trace(
"filtering [{}] [{}] as db timestamp is the same as last offset and is in seen [{}]",
item.persistenceId,
item.seqNr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

package akka.persistence.dynamodb.internal

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try

import akka.NotUsed
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.stream.Attributes
import akka.stream.Outlet
import akka.stream.SourceShape
Expand Down Expand Up @@ -118,7 +118,7 @@ final private[dynamodb] class ContinuousQuery[S, T](
beforeQuery(state) match {
case None => runNextQuery()
case Some(beforeQueryFuture) =>
beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContexts.parasitic)
beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContext.parasitic)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import scala.jdk.FutureConverters._
import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.Persistence
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.typed.PersistenceId
Expand Down Expand Up @@ -115,7 +114,7 @@ import software.amazon.awssdk.services.dynamodb.model.Update
response.consumedCapacity.capacityUnits)
}
}
result.map(_ => Done)(ExecutionContexts.parasitic)
result.map(_ => Done)(ExecutionContext.parasitic)
} else {
val writeItems =
events.map { item =>
Expand Down Expand Up @@ -143,10 +142,10 @@ import software.amazon.awssdk.services.dynamodb.model.Update
}
}
result
.map(_ => Done)(ExecutionContexts.parasitic)
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

}
Expand Down Expand Up @@ -190,7 +189,7 @@ import software.amazon.awssdk.services.dynamodb.model.Update
result
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

private def readLowestSequenceNr(persistenceId: String): Future[Long] = {
Expand Down Expand Up @@ -220,7 +219,7 @@ import software.amazon.awssdk.services.dynamodb.model.Update
result
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit] = {
Expand Down Expand Up @@ -284,10 +283,10 @@ import software.amazon.awssdk.services.dynamodb.model.Update
}
}
result
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

// TransactWriteItems has a limit of 100
Expand Down Expand Up @@ -317,7 +316,7 @@ import software.amazon.awssdk.services.dynamodb.model.Update
result
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

def updateEventExpiry(
Expand Down Expand Up @@ -376,10 +375,10 @@ import software.amazon.awssdk.services.dynamodb.model.Update
}
}
result
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

// TransactWriteItems has a limit of 100
Expand Down Expand Up @@ -409,7 +408,7 @@ import software.amazon.awssdk.services.dynamodb.model.Update
result
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import java.time.Instant
import java.util.concurrent.CompletionException
import java.util.{ Map => JMap }

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._

import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.typed.PersistenceId
import akka.stream.scaladsl.Source
Expand Down Expand Up @@ -303,7 +303,7 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest
}
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

def loadEvent(persistenceId: String, seqNr: Long, includePayload: Boolean): Future[Option[SerializedJournalItem]] = {
Expand Down Expand Up @@ -353,7 +353,7 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest
}
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import scala.jdk.FutureConverters._
import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.Persistence
import akka.persistence.SnapshotSelectionCriteria
import akka.persistence.dynamodb.DynamoDBSettings
Expand Down Expand Up @@ -106,7 +105,7 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
}
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

private def itemHasExpired(item: JMap[String, AttributeValue]): Boolean = {
Expand Down Expand Up @@ -173,10 +172,10 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
}

result
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
Expand Down Expand Up @@ -212,7 +211,7 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
}

result
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)
// ignore if the criteria conditional check failed
.recover {
case _: ConditionalCheckFailedException => ()
Expand All @@ -221,7 +220,7 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
case _: ConditionalCheckFailedException => ()
case cause => throw cause
}
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

def updateExpiry(
Expand Down Expand Up @@ -263,7 +262,7 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
}

result
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)
// ignore if the criteria conditional check failed
.recover {
case _: ConditionalCheckFailedException => ()
Expand All @@ -272,7 +271,7 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
case _: ConditionalCheckFailedException => ()
case cause => throw cause
}
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

// Used from `BySliceQuery` (only if settings.querySettings.startFromSnapshotEnabled).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import akka.actor.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.Logging
import akka.persistence.AtomicWrite
import akka.persistence.PersistentRepr
Expand Down Expand Up @@ -226,7 +225,7 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
case Some(f) =>
log.debug("Write in progress for [{}], deferring replayMessages until write completed", persistenceId)
// we only want to make write - replay sequential, not fail if previous write failed
f.recover { case _ => Done }(ExecutionContexts.parasitic)
f.recover { case _ => Done }(ExecutionContext.parasitic)
case None => FutureDone
}
pendingWrite.flatMap { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import java.util
import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.concurrent.ExecutionContext
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.jdk.OptionConverters.RichOption

import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.japi.Pair
import akka.persistence.dynamodb.query.scaladsl
import akka.persistence.query.Offset
Expand Down Expand Up @@ -136,7 +136,7 @@ final class DynamoDBReadJournal(delegate: scaladsl.DynamoDBReadJournal)
delegate.eventsBySlicesStartingFromSnapshots(entityType, minSlice, maxSlice, offset, transformSnapshot(_)).asJava

override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] =
delegate.timestampOf(persistenceId, sequenceNr).map(_.toJava)(ExecutionContexts.parasitic).asJava
delegate.timestampOf(persistenceId, sequenceNr).map(_.toJava)(ExecutionContext.parasitic).asJava

override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] =
delegate.loadEnvelope[Event](persistenceId, sequenceNr).asJava
Expand Down
Loading

0 comments on commit 0537a73

Please sign in to comment.