diff --git a/core/src/main/mima-filters/1.3.1.backwards.excludes/buckets.excludes b/core/src/main/mima-filters/1.3.1.backwards.excludes/buckets.excludes new file mode 100644 index 00000000..84ce0d7f --- /dev/null +++ b/core/src/main/mima-filters/1.3.1.backwards.excludes/buckets.excludes @@ -0,0 +1,2 @@ +# internal +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Buckets.this") diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index c522e357..779e906f 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -132,11 +132,11 @@ import org.slf4j.Logger * @param countByBucket * Key is the epoch seconds for the start of the bucket. Value is the number of entries in the bucket. */ - class Buckets(countByBucket: immutable.SortedMap[Buckets.EpochSeconds, Buckets.Count]) { + class Buckets( + countByBucket: immutable.SortedMap[Buckets.EpochSeconds, Buckets.Count], + val createdAt: Instant = InstantFactory.now()) { import Buckets.{ Bucket, BucketDurationSeconds, Count, EpochSeconds } - val createdAt: Instant = InstantFactory.now() - def findTimeForLimit(from: Instant, atLeastCounts: Int): Option[Instant] = { val fromEpochSeconds = from.toEpochMilli / 1000 val iter = countByBucket.iterator.dropWhile { case (key, _) => fromEpochSeconds >= key } @@ -169,12 +169,22 @@ import org.slf4j.Logger new Buckets(newCountByBucket) } + def clearUntil(time: Instant): Buckets = { + val epochSeconds = time.minusSeconds(BucketDurationSeconds).toEpochMilli / 1000 + val newCountByBucket = countByBucket.dropWhile { case (key, _) => epochSeconds >= key } + if (newCountByBucket.size == countByBucket.size) + this + else if (newCountByBucket.isEmpty) + new Buckets(immutable.SortedMap(countByBucket.last), createdAt = this.createdAt) // keep last + else + new Buckets(newCountByBucket, createdAt = this.createdAt) + } + def nextStartTime: Option[Instant] = { - // we only expect the last 2 buckets to change from previous bucket count query - if (size < 2) + if (isEmpty) None else { - val startSeconds = countByBucket.keysIterator.toVector(countByBucket.size - 2) + val startSeconds = countByBucket.last._1 - BucketDurationSeconds Some(Instant.ofEpochSecond(startSeconds)) } } @@ -222,6 +232,16 @@ import org.slf4j.Logger fromTimestamp: Instant, limit: Int): Future[Seq[Bucket]] + protected def appendEmptyBucketIfLastIsMissing( + buckets: IndexedSeq[Bucket], + toTimestamp: Instant): IndexedSeq[Bucket] = { + val startTimeOfLastBucket = (toTimestamp.getEpochSecond / 10) * 10 + if (buckets.last.startTime != startTimeOfLastBucket) + buckets :+ Bucket(startTimeOfLastBucket, 0) + else + buckets + } + } } @@ -267,11 +287,15 @@ import org.slf4j.Logger // so continue until rowCount is 0. That means an extra query at the end to make sure there are no // more to fetch. if (state.queryCount == 0L || state.rowCount > 0) { - val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1, previous = state.latest) - val fromTimestamp = state.latest.timestamp val fromSeqNr = highestSeenSeqNr(state.previous, state.latest) + val newState = state.copy( + rowCount = 0, + queryCount = state.queryCount + 1, + buckets = state.buckets.clearUntil(fromTimestamp), + previous = state.latest) + val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize) match { case Some(t) => if (t.isBefore(endTimestamp)) t else endTimestamp @@ -456,6 +480,7 @@ import org.slf4j.Logger backtrackingCount = 1, latestBacktracking = fromOffset, backtrackingExpectFiltered = state.latestBacktrackingSeenCount, + buckets = state.buckets.clearUntil(fromOffset.timestamp), currentQueryWallClock = newQueryWallClock, previousQueryWallClock = state.currentQueryWallClock, idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index d1acbd24..e695aa51 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -863,8 +863,8 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv fromTimestamp: Instant, limit: Int): Future[Seq[Bucket]] = { + val now = InstantFactory.now() // not important to use database time val toTimestamp = { - val now = InstantFactory.now() // not important to use database time if (fromTimestamp == Instant.EPOCH) now else { @@ -889,8 +889,10 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv if (log.isDebugEnabled) result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) - result - + if (toTimestamp == now) + result + else + result.map(appendEmptyBucketIfLastIsMissing(_, toTimestamp)) } private def additionalBindings( diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index bf6080c7..da9d536b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -23,6 +23,7 @@ import akka.persistence.Persistence import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.BySliceQuery.Buckets import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket +import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.BucketDurationSeconds import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow import akka.persistence.r2dbc.internal.QueryDao @@ -324,8 +325,8 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e limit: Int): Future[Seq[Bucket]] = { val executor = executorProvider.executorFor(minSlice) + val now = InstantFactory.now() // not important to use database time val toTimestamp = { - val now = InstantFactory.now() // not important to use database time if (fromTimestamp == Instant.EPOCH) now else { @@ -349,7 +350,10 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e if (log.isDebugEnabled) result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) - result + if (toTimestamp == now) + result + else + result.map(appendEmptyBucketIfLastIsMissing(_, toTimestamp)) } /** diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index e9ef57df..615b86e7 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -438,8 +438,8 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider fromTimestamp: Instant, limit: Int): Future[Seq[Bucket]] = { + val now = InstantFactory.now() // not important to use database time val toTimestamp = { - val now = InstantFactory.now() // not important to use database time if (fromTimestamp == Instant.EPOCH) now else { @@ -465,7 +465,10 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider if (log.isDebugEnabled) result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) - result + if (toTimestamp == now) + result + else + result.map(appendEmptyBucketIfLastIsMissing(_, toTimestamp)) } diff --git a/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala b/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala index 4b5cf99e..431692a8 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala @@ -13,16 +13,19 @@ import akka.persistence.r2dbc.internal.R2dbcExecutor import org.scalatest.BeforeAndAfterAll import org.scalatest.Suite import org.slf4j.LoggerFactory - -import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.h2.H2Dialect import java.time.Instant import scala.util.control.NonFatal import akka.persistence.r2dbc.internal.R2dbcExecutorProvider +import akka.persistence.r2dbc.internal.codec.TimestampCodec +import akka.persistence.r2dbc.internal.codec.PayloadCodec +import akka.persistence.r2dbc.internal.codec.QueryAdapter +import akka.persistence.typed.PersistenceId +import akka.serialization.SerializationExtension trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite => + private val log = LoggerFactory.getLogger(getClass) def typedSystem: ActorSystem[_] @@ -74,4 +77,35 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite => super.beforeAll() } + // to be able to store events with specific timestamps + def writeEvent(slice: Int, persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = { + implicit val timestampCodec: TimestampCodec = settings.codecSettings.JournalImplicits.timestampCodec + implicit val payloadCodec: PayloadCodec = settings.codecSettings.JournalImplicits.journalPayloadCodec + implicit val queryAdapter: QueryAdapter = settings.codecSettings.JournalImplicits.queryAdapter + import TimestampCodec.TimestampCodecRichStatement + import PayloadCodec.RichStatement + import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter + val stringSerializer = SerializationExtension(typedSystem).serializerFor(classOf[String]) + + log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp) + val insertEventSql = sql""" + INSERT INTO ${settings.journalTableWithSchema(slice)} + (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload) + VALUES (?, ?, ?, ?, ?, '', '', ?, '', ?)""" + val entityType = PersistenceId.extractEntityType(persistenceId) + + val result = r2dbcExecutor(slice).updateOne("test writeEvent") { connection => + connection + .createStatement(insertEventSql) + .bind(0, slice) + .bind(1, entityType) + .bind(2, persistenceId) + .bind(3, seqNr) + .bindTimestamp(4, timestamp) + .bind(5, stringSerializer.identifier) + .bindPayload(6, stringSerializer.toBinary(event)) + } + Await.result(result, 5.seconds) + } + } diff --git a/core/src/test/scala/akka/persistence/r2dbc/internal/BySliceQueryBucketsSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/internal/BySliceQueryBucketsSpec.scala index 384a4771..f4664c8f 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/internal/BySliceQueryBucketsSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/internal/BySliceQueryBucketsSpec.scala @@ -18,7 +18,7 @@ class BySliceQueryBucketsSpec extends AnyWordSpec with TestSuite with Matchers { private val startTime = InstantFactory.now() private val firstBucketStartTime = startTime.plusSeconds(60) - private val firstBucketStartEpochSeconds = firstBucketStartTime.toEpochMilli / 1000 + private val firstBucketStartEpochSeconds = firstBucketStartTime.getEpochSecond private def bucketStartEpochSeconds(bucketIndex: Int): Long = firstBucketStartEpochSeconds + BucketDurationSeconds * bucketIndex @@ -71,6 +71,22 @@ class BySliceQueryBucketsSpec extends AnyWordSpec with TestSuite with Matchers { moreBuckets.size shouldBe Buckets.Limit } + "clear until time" in { + buckets.clearUntil(startTime).size shouldBe buckets.size + buckets.clearUntil(firstBucketStartTime).size shouldBe buckets.size + buckets.clearUntil(firstBucketStartTime.plusSeconds(9)).size shouldBe buckets.size + + buckets.clearUntil(firstBucketStartTime.plusSeconds(10)).size shouldBe buckets.size - 1 + buckets.clearUntil(firstBucketStartTime.plusSeconds(11)).size shouldBe buckets.size - 1 + buckets.clearUntil(firstBucketStartTime.plusSeconds(19)).size shouldBe buckets.size - 1 + + buckets.clearUntil(firstBucketStartTime.plusSeconds(31)).size shouldBe buckets.size - 3 + buckets.clearUntil(firstBucketStartTime.plusSeconds(100)).size shouldBe 1 // keep last + + // don't change createdAt + buckets.clearUntil(firstBucketStartTime.plusSeconds(31)).createdAt shouldBe buckets.createdAt + } + "provide start time for next query" in { Buckets.empty .add(List(Bucket(bucketStartEpochSeconds(0), 101), Bucket(bucketStartEpochSeconds(1), 202))) @@ -83,7 +99,8 @@ class BySliceQueryBucketsSpec extends AnyWordSpec with TestSuite with Matchers { Buckets.empty .add(List(Bucket(bucketStartEpochSeconds(0), 101))) - .nextStartTime shouldBe None + .nextStartTime shouldBe Some( + firstBucketStartTime.minusSeconds(Buckets.BucketDurationSeconds).truncatedTo(ChronoUnit.SECONDS)) } } diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/BucketCountSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/BucketCountSpec.scala new file mode 100644 index 00000000..4b315c90 --- /dev/null +++ b/core/src/test/scala/akka/persistence/r2dbc/query/BucketCountSpec.scala @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2022 - 2024 Lightbend Inc. + */ + +package akka.persistence.r2dbc.query + +import org.scalatest.wordspec.AnyWordSpecLike + +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorSystem +import akka.persistence.r2dbc.TestConfig +import akka.persistence.r2dbc.TestData +import akka.persistence.r2dbc.TestDbLifecycle +import akka.persistence.r2dbc.internal.BySliceQuery.Buckets +import akka.persistence.r2dbc.internal.InstantFactory + +class BucketCountSpec + extends ScalaTestWithActorTestKit(TestConfig.config) + with AnyWordSpecLike + with TestDbLifecycle + with TestData + with LogCapturing { + + override def typedSystem: ActorSystem[_] = system + + private val dao = settings.connectionFactorySettings.dialect.createQueryDao(r2dbcExecutorProvider) + + "BySliceQuery.Dao" should { + + "count events in 10 second buckets" in { + pendingIfMoreThanOneDataPartition() + + val entityType = nextEntityType() + val pid1 = nextPid(entityType) + val pid2 = nextPid(entityType) + val slice1 = persistenceExt.sliceForPersistenceId(pid1) + val slice2 = persistenceExt.sliceForPersistenceId(pid2) + + val startTime = InstantFactory.now().minusSeconds(3600) + val bucketStartTime = (startTime.getEpochSecond / 10) * 10 + + (0 until 10).foreach { i => + writeEvent(slice1, pid1, 1 + i, startTime.plusSeconds(Buckets.BucketDurationSeconds * i), s"e1-$i") + writeEvent(slice2, pid2, 1 + i, startTime.plusSeconds(Buckets.BucketDurationSeconds * i), s"e1-$i") + } + + val buckets = + dao + .countBuckets(entityType, 0, persistenceExt.numberOfSlices - 1, startTime, Buckets.Limit) + .futureValue + withClue(s"startTime $startTime ($bucketStartTime): ") { + buckets.size shouldBe 10 + buckets.head.startTime shouldBe bucketStartTime + buckets.last.startTime shouldBe (bucketStartTime + 9 * Buckets.BucketDurationSeconds) + buckets.map(_.count).toSet shouldBe Set(2) + buckets.map(_.count).sum shouldBe (2 * 10) + } + } + + "append empty bucket if no events in the last bucket, limit before now" in { + pendingIfMoreThanOneDataPartition() + + val entityType = nextEntityType() + val pid1 = nextPid(entityType) + val pid2 = nextPid(entityType) + val slice1 = persistenceExt.sliceForPersistenceId(pid1) + val slice2 = persistenceExt.sliceForPersistenceId(pid2) + + val limit = 100 + val startTime = InstantFactory.now().minusSeconds(3600) + val bucketStartTime = (startTime.getEpochSecond / 10) * 10 + + (0 until 10).foreach { i => + writeEvent(slice1, pid1, 1 + i, startTime.plusSeconds(Buckets.BucketDurationSeconds * i), s"e1-$i") + writeEvent(slice2, pid2, 1 + i, startTime.plusSeconds(Buckets.BucketDurationSeconds * i), s"e1-$i") + } + + val buckets = + dao + .countBuckets(entityType, 0, persistenceExt.numberOfSlices - 1, startTime, limit) + .futureValue + withClue(s"startTime $startTime ($bucketStartTime): ") { + buckets.size shouldBe 11 + buckets.head.startTime shouldBe bucketStartTime + // the toTimestamp of the sql query is one bucket more than fromTimestamp + (limit * BucketDurationSeconds) + buckets.last.startTime shouldBe (bucketStartTime + (limit + 1) * Buckets.BucketDurationSeconds) + buckets.last.count shouldBe 0 + buckets.dropRight(1).map(_.count).toSet shouldBe Set(2) + buckets.map(_.count).sum shouldBe (2 * 10) + } + } + + } + +} diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala index 45794d98..ba1ae14b 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala @@ -4,14 +4,12 @@ package akka.persistence.r2dbc.query -import java.time.Instant import java.time.temporal.ChronoUnit import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike -import org.slf4j.LoggerFactory import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -26,15 +24,7 @@ import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle import akka.persistence.r2dbc.internal.EnvelopeOrigin import akka.persistence.r2dbc.internal.InstantFactory -import akka.persistence.r2dbc.internal.codec.PayloadCodec -import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement -import akka.persistence.r2dbc.internal.codec.TimestampCodec -import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement -import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.codec.QueryAdapter import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal -import akka.persistence.typed.PersistenceId -import akka.serialization.SerializationExtension import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.TestSink import scala.jdk.DurationConverters._ @@ -58,37 +48,9 @@ class EventsBySliceBacktrackingSpec with LogCapturing { override def typedSystem: ActorSystem[_] = system - implicit val payloadCodec: PayloadCodec = settings.codecSettings.JournalImplicits.journalPayloadCodec - implicit val timestampCodec: TimestampCodec = settings.codecSettings.JournalImplicits.timestampCodec - implicit val queryAdapter: QueryAdapter = settings.codecSettings.JournalImplicits.queryAdapter private val query = PersistenceQuery(testKit.system) .readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier) - private val stringSerializer = SerializationExtension(system).serializerFor(classOf[String]) - private val log = LoggerFactory.getLogger(getClass) - - // to be able to store events with specific timestamps - private def writeEvent(slice: Int, persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = { - log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp) - val insertEventSql = sql""" - INSERT INTO ${settings.journalTableWithSchema(slice)} - (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload) - VALUES (?, ?, ?, ?, ?, '', '', ?, '', ?)""" - val entityType = PersistenceId.extractEntityType(persistenceId) - - val result = r2dbcExecutor(slice).updateOne("test writeEvent") { connection => - connection - .createStatement(insertEventSql) - .bind(0, slice) - .bind(1, entityType) - .bind(2, persistenceId) - .bind(3, seqNr) - .bindTimestamp(4, timestamp) - .bind(5, stringSerializer.identifier) - .bindPayload(6, stringSerializer.toBinary(event)) - } - result.futureValue shouldBe 1 - } "eventsBySlices backtracking" should { diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala index 8b2478b2..1471c6be 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala @@ -4,8 +4,6 @@ package akka.persistence.r2dbc.query -import java.time.Instant - import scala.concurrent.Await import akka.Done @@ -30,16 +28,9 @@ import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle import akka.persistence.r2dbc.internal.InstantFactory -import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.codec.PayloadCodec -import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement -import akka.persistence.r2dbc.internal.codec.QueryAdapter -import akka.persistence.r2dbc.internal.codec.TimestampCodec -import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.persistence.typed.PersistenceId import akka.persistence.typed.internal.ReplicatedEventMetadata -import akka.serialization.SerializationExtension import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import akka.stream.testkit.TestSubscriber @@ -47,7 +38,6 @@ import akka.stream.testkit.scaladsl.TestSink import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike -import org.slf4j.LoggerFactory object EventsBySliceSpec { sealed trait QueryType @@ -87,40 +77,6 @@ class EventsBySliceSpec private val query = PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier) - implicit val payloadCodec: PayloadCodec = settings.codecSettings.JournalImplicits.journalPayloadCodec - implicit val timestampCodec: TimestampCodec = settings.codecSettings.JournalImplicits.timestampCodec - implicit val queryAdapter: QueryAdapter = settings.codecSettings.JournalImplicits.queryAdapter - - private val stringSerializer = SerializationExtension(system).serializerFor(classOf[String]) - - private val log = LoggerFactory.getLogger(getClass) - - // to be able to store events with specific timestamps - private def writeEvent(slice: Int, persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = { - log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp) - - val insertEventSql = sql""" - INSERT INTO ${settings.journalTableWithSchema(slice)} - (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload) - VALUES (?, ?, ?, ?, ?, '', '', ?, '', ?)""" - - val entityType = PersistenceId.extractEntityType(persistenceId) - - val result = r2dbcExecutor(slice).updateOne("test writeEvent") { connection => - connection - .createStatement(insertEventSql) - .bind(0, slice) - .bind(1, entityType) - .bind(2, persistenceId) - .bind(3, seqNr) - .bindTimestamp(4, timestamp) - .bind(5, stringSerializer.identifier) - .bindPayload(6, stringSerializer.toBinary(event)) - } - - result.futureValue shouldBe 1 - } - private class Setup { val entityType = nextEntityType() val persistenceId = nextPid(entityType)