Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bump to akka 2.10.0-M1 (#83) #84

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .jvmopts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import scala.util.Success
import akka.Done
import akka.actor.ClassicActorSystemProvider
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.persistence.SnapshotSelectionCriteria
Expand Down Expand Up @@ -549,7 +548,7 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf
case pid :: tail =>
pidOperation(pid).flatMap { _ =>
if (n % settings.cleanupSettings.logProgressEvery == 0)
log.infoN("Cleanup {} [{}] of [{}].", operationName, n, size)
log.info("Cleanup {} [{}] of [{}].", operationName, n, size)
loop(tail, n + 1)
}
}
Expand All @@ -559,7 +558,7 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf

result.onComplete {
case Success(_) =>
log.info2("Cleanup completed {} of [{}] persistenceId.", operationName, size)
log.info("Cleanup completed {} of [{}] persistenceId.", operationName, size)
case Failure(e) =>
log.error(s"Cleanup {$operationName} failed.", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

import akka.NotUsed
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.InternalApi
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.query.Offset
Expand Down Expand Up @@ -118,7 +117,7 @@ import org.slf4j.Logger
}

if (state.queryCount != 0 && log.isDebugEnabled())
log.debugN(
log.debug(
"{} next query [{}] from slice [{}], between time [{} - {}]. Found [{}] items in previous query.",
logPrefix,
state.queryCount,
Expand All @@ -136,7 +135,7 @@ import org.slf4j.Logger
.via(deserializeAndAddOffset(state.latest)))
} else {
if (log.isDebugEnabled)
log.debugN(
log.debug(
"{} query [{}] from slice [{}] completed. Found [{}] items in previous query.",
logPrefix,
state.queryCount,
Expand All @@ -149,7 +148,7 @@ import org.slf4j.Logger

val currentTimestamp = InstantFactory.now()
if (log.isDebugEnabled())
log.debugN(
log.debug(
"{} query slice [{}], from time [{}] until now [{}].",
logPrefix,
slice,
Expand All @@ -174,7 +173,7 @@ import org.slf4j.Logger
val initialOffset = toTimestampOffset(offset)

if (log.isDebugEnabled())
log.debugN("Starting {} query from slice [{}], from time [{}].", logPrefix, slice, initialOffset.timestamp)
log.debug("Starting {} query from slice [{}], from time [{}].", logPrefix, slice, initialOffset.timestamp)

def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
val offset = extractOffset(envelope)
Expand All @@ -198,7 +197,7 @@ import org.slf4j.Logger
if (log.isDebugEnabled()) {
if (state.latestBacktracking.seen.nonEmpty &&
offset.timestamp.isAfter(state.latestBacktracking.timestamp.plus(firstBacktrackingQueryWindow)))
log.debugN(
log.debug(
"{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]",
logPrefix,
state.latestBacktracking,
Expand All @@ -221,7 +220,7 @@ import org.slf4j.Logger

if (log.isDebugEnabled)
delay.foreach { d =>
log.debugN(
log.debug(
"{} query [{}] from slice [{}] delay next [{}] ms.",
logPrefix,
state.queryCount,
Expand Down Expand Up @@ -308,7 +307,7 @@ import org.slf4j.Logger
" in backtracking mode,"
else
""
log.debugN(
log.debug(
"{} next query [{}]{} from slice [{}], between time [{} - {}]. {}",
logPrefix,
newState.queryCount,
Expand Down Expand Up @@ -351,7 +350,7 @@ import org.slf4j.Logger
throw new IllegalStateException(
s"Too many events stored with the same timestamp [$currentTimestamp], buffer size [${settings.querySettings.bufferSize}]")
}
log.traceN(
log.trace(
"filtering [{}] [{}] as db timestamp is the same as last offset and is in seen [{}]",
item.persistenceId,
item.seqNr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

package akka.persistence.dynamodb.internal

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try

import akka.NotUsed
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.stream.Attributes
import akka.stream.Outlet
import akka.stream.SourceShape
Expand Down Expand Up @@ -118,7 +118,7 @@ final private[dynamodb] class ContinuousQuery[S, T](
beforeQuery(state) match {
case None => runNextQuery()
case Some(beforeQueryFuture) =>
beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContexts.parasitic)
beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContext.parasitic)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import scala.jdk.FutureConverters._
import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.Persistence
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.typed.PersistenceId
Expand Down Expand Up @@ -115,7 +114,7 @@ import software.amazon.awssdk.services.dynamodb.model.Update
response.consumedCapacity.capacityUnits)
}
}
result.map(_ => Done)(ExecutionContexts.parasitic)
result.map(_ => Done)(ExecutionContext.parasitic)
} else {
val writeItems =
events.map { item =>
Expand Down Expand Up @@ -143,10 +142,10 @@ import software.amazon.awssdk.services.dynamodb.model.Update
}
}
result
.map(_ => Done)(ExecutionContexts.parasitic)
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

}
Expand Down Expand Up @@ -190,7 +189,7 @@ import software.amazon.awssdk.services.dynamodb.model.Update
result
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

private def readLowestSequenceNr(persistenceId: String): Future[Long] = {
Expand Down Expand Up @@ -220,7 +219,7 @@ import software.amazon.awssdk.services.dynamodb.model.Update
result
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit] = {
Expand Down Expand Up @@ -284,10 +283,10 @@ import software.amazon.awssdk.services.dynamodb.model.Update
}
}
result
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

// TransactWriteItems has a limit of 100
Expand Down Expand Up @@ -317,7 +316,7 @@ import software.amazon.awssdk.services.dynamodb.model.Update
result
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

def updateEventExpiry(
Expand Down Expand Up @@ -376,10 +375,10 @@ import software.amazon.awssdk.services.dynamodb.model.Update
}
}
result
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

// TransactWriteItems has a limit of 100
Expand Down Expand Up @@ -409,7 +408,7 @@ import software.amazon.awssdk.services.dynamodb.model.Update
result
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import java.time.Instant
import java.util.concurrent.CompletionException
import java.util.{ Map => JMap }

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._

import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.typed.PersistenceId
import akka.stream.scaladsl.Source
Expand Down Expand Up @@ -303,7 +303,7 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest
}
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

def loadEvent(persistenceId: String, seqNr: Long, includePayload: Boolean): Future[Option[SerializedJournalItem]] = {
Expand Down Expand Up @@ -353,7 +353,7 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest
}
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import scala.jdk.FutureConverters._
import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.Persistence
import akka.persistence.SnapshotSelectionCriteria
import akka.persistence.dynamodb.DynamoDBSettings
Expand Down Expand Up @@ -106,7 +105,7 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
}
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

private def itemHasExpired(item: JMap[String, AttributeValue]): Boolean = {
Expand Down Expand Up @@ -173,10 +172,10 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
}

result
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
Expand Down Expand Up @@ -212,7 +211,7 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
}

result
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)
// ignore if the criteria conditional check failed
.recover {
case _: ConditionalCheckFailedException => ()
Expand All @@ -221,7 +220,7 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
case _: ConditionalCheckFailedException => ()
case cause => throw cause
}
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

def updateExpiry(
Expand Down Expand Up @@ -263,7 +262,7 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
}

result
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)
// ignore if the criteria conditional check failed
.recover {
case _: ConditionalCheckFailedException => ()
Expand All @@ -272,7 +271,7 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
case _: ConditionalCheckFailedException => ()
case cause => throw cause
}
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

// Used from `BySliceQuery` (only if settings.querySettings.startFromSnapshotEnabled).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import akka.actor.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.Logging
import akka.persistence.AtomicWrite
import akka.persistence.PersistentRepr
Expand Down Expand Up @@ -226,7 +225,7 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String)
case Some(f) =>
log.debug("Write in progress for [{}], deferring replayMessages until write completed", persistenceId)
// we only want to make write - replay sequential, not fail if previous write failed
f.recover { case _ => Done }(ExecutionContexts.parasitic)
f.recover { case _ => Done }(ExecutionContext.parasitic)
case None => FutureDone
}
pendingWrite.flatMap { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import java.util
import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.concurrent.ExecutionContext
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.jdk.OptionConverters.RichOption

import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.japi.Pair
import akka.persistence.dynamodb.query.scaladsl
import akka.persistence.query.Offset
Expand Down Expand Up @@ -136,7 +136,7 @@ final class DynamoDBReadJournal(delegate: scaladsl.DynamoDBReadJournal)
delegate.eventsBySlicesStartingFromSnapshots(entityType, minSlice, maxSlice, offset, transformSnapshot(_)).asJava

override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] =
delegate.timestampOf(persistenceId, sequenceNr).map(_.toJava)(ExecutionContexts.parasitic).asJava
delegate.timestampOf(persistenceId, sequenceNr).map(_.toJava)(ExecutionContext.parasitic).asJava

override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] =
delegate.loadEnvelope[Event](persistenceId, sequenceNr).asJava
Expand Down
Loading
Loading