Skip to content

Commit

Permalink
fix: Progress bucket count query if empty (#645)
Browse files Browse the repository at this point in the history
* the bucket queries continues from the end of the previous
* the limit of 10000 times 10 seconds is around 28 hours
* meaning that if there were no new events for 28 hours the
  queries will not progress since it will not find any more
* solution here is to append empty bucket at the end in that case
* use last bucket
* reintroduce clearUntil to reduce memory use
  • Loading branch information
patriknw authored Jan 16, 2025
1 parent b3920cd commit e8d6ff0
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 102 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internal
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Buckets.this")
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ import org.slf4j.Logger
* @param countByBucket
* Key is the epoch seconds for the start of the bucket. Value is the number of entries in the bucket.
*/
class Buckets(countByBucket: immutable.SortedMap[Buckets.EpochSeconds, Buckets.Count]) {
class Buckets(
countByBucket: immutable.SortedMap[Buckets.EpochSeconds, Buckets.Count],
val createdAt: Instant = InstantFactory.now()) {
import Buckets.{ Bucket, BucketDurationSeconds, Count, EpochSeconds }

val createdAt: Instant = InstantFactory.now()

def findTimeForLimit(from: Instant, atLeastCounts: Int): Option[Instant] = {
val fromEpochSeconds = from.toEpochMilli / 1000
val iter = countByBucket.iterator.dropWhile { case (key, _) => fromEpochSeconds >= key }
Expand Down Expand Up @@ -169,12 +169,22 @@ import org.slf4j.Logger
new Buckets(newCountByBucket)
}

def clearUntil(time: Instant): Buckets = {
val epochSeconds = time.minusSeconds(BucketDurationSeconds).toEpochMilli / 1000
val newCountByBucket = countByBucket.dropWhile { case (key, _) => epochSeconds >= key }
if (newCountByBucket.size == countByBucket.size)
this
else if (newCountByBucket.isEmpty)
new Buckets(immutable.SortedMap(countByBucket.last), createdAt = this.createdAt) // keep last
else
new Buckets(newCountByBucket, createdAt = this.createdAt)
}

def nextStartTime: Option[Instant] = {
// we only expect the last 2 buckets to change from previous bucket count query
if (size < 2)
if (isEmpty)
None
else {
val startSeconds = countByBucket.keysIterator.toVector(countByBucket.size - 2)
val startSeconds = countByBucket.last._1 - BucketDurationSeconds
Some(Instant.ofEpochSecond(startSeconds))
}
}
Expand Down Expand Up @@ -222,6 +232,16 @@ import org.slf4j.Logger
fromTimestamp: Instant,
limit: Int): Future[Seq[Bucket]]

protected def appendEmptyBucketIfLastIsMissing(
buckets: IndexedSeq[Bucket],
toTimestamp: Instant): IndexedSeq[Bucket] = {
val startTimeOfLastBucket = (toTimestamp.getEpochSecond / 10) * 10
if (buckets.last.startTime != startTimeOfLastBucket)
buckets :+ Bucket(startTimeOfLastBucket, 0)
else
buckets
}

}
}

Expand Down Expand Up @@ -267,11 +287,15 @@ import org.slf4j.Logger
// so continue until rowCount is 0. That means an extra query at the end to make sure there are no
// more to fetch.
if (state.queryCount == 0L || state.rowCount > 0) {
val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1, previous = state.latest)

val fromTimestamp = state.latest.timestamp
val fromSeqNr = highestSeenSeqNr(state.previous, state.latest)

val newState = state.copy(
rowCount = 0,
queryCount = state.queryCount + 1,
buckets = state.buckets.clearUntil(fromTimestamp),
previous = state.latest)

val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize) match {
case Some(t) =>
if (t.isBefore(endTimestamp)) t else endTimestamp
Expand Down Expand Up @@ -456,6 +480,7 @@ import org.slf4j.Logger
backtrackingCount = 1,
latestBacktracking = fromOffset,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
buckets = state.buckets.clearUntil(fromOffset.timestamp),
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,8 +863,8 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
fromTimestamp: Instant,
limit: Int): Future[Seq[Bucket]] = {

val now = InstantFactory.now() // not important to use database time
val toTimestamp = {
val now = InstantFactory.now() // not important to use database time
if (fromTimestamp == Instant.EPOCH)
now
else {
Expand All @@ -889,8 +889,10 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
if (log.isDebugEnabled)
result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice))

result

if (toTimestamp == now)
result
else
result.map(appendEmptyBucketIfLastIsMissing(_, toTimestamp))
}

private def additionalBindings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import akka.persistence.Persistence
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.BySliceQuery.Buckets
import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.BucketDurationSeconds
import akka.persistence.r2dbc.internal.InstantFactory
import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow
import akka.persistence.r2dbc.internal.QueryDao
Expand Down Expand Up @@ -324,8 +325,8 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
limit: Int): Future[Seq[Bucket]] = {
val executor = executorProvider.executorFor(minSlice)

val now = InstantFactory.now() // not important to use database time
val toTimestamp = {
val now = InstantFactory.now() // not important to use database time
if (fromTimestamp == Instant.EPOCH)
now
else {
Expand All @@ -349,7 +350,10 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
if (log.isDebugEnabled)
result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice))

result
if (toTimestamp == now)
result
else
result.map(appendEmptyBucketIfLastIsMissing(_, toTimestamp))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,8 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider
fromTimestamp: Instant,
limit: Int): Future[Seq[Bucket]] = {

val now = InstantFactory.now() // not important to use database time
val toTimestamp = {
val now = InstantFactory.now() // not important to use database time
if (fromTimestamp == Instant.EPOCH)
now
else {
Expand All @@ -465,7 +465,10 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider
if (log.isDebugEnabled)
result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice))

result
if (toTimestamp == now)
result
else
result.map(appendEmptyBucketIfLastIsMissing(_, toTimestamp))

}

Expand Down
40 changes: 37 additions & 3 deletions core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ import akka.persistence.r2dbc.internal.R2dbcExecutor
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Suite
import org.slf4j.LoggerFactory

import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.h2.H2Dialect
import java.time.Instant

import scala.util.control.NonFatal

import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
import akka.persistence.r2dbc.internal.codec.TimestampCodec
import akka.persistence.r2dbc.internal.codec.PayloadCodec
import akka.persistence.r2dbc.internal.codec.QueryAdapter
import akka.persistence.typed.PersistenceId
import akka.serialization.SerializationExtension

trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
private val log = LoggerFactory.getLogger(getClass)

def typedSystem: ActorSystem[_]

Expand Down Expand Up @@ -74,4 +77,35 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
super.beforeAll()
}

// to be able to store events with specific timestamps
def writeEvent(slice: Int, persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = {
implicit val timestampCodec: TimestampCodec = settings.codecSettings.JournalImplicits.timestampCodec
implicit val payloadCodec: PayloadCodec = settings.codecSettings.JournalImplicits.journalPayloadCodec
implicit val queryAdapter: QueryAdapter = settings.codecSettings.JournalImplicits.queryAdapter
import TimestampCodec.TimestampCodecRichStatement
import PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
val stringSerializer = SerializationExtension(typedSystem).serializerFor(classOf[String])

log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp)
val insertEventSql = sql"""
INSERT INTO ${settings.journalTableWithSchema(slice)}
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload)
VALUES (?, ?, ?, ?, ?, '', '', ?, '', ?)"""
val entityType = PersistenceId.extractEntityType(persistenceId)

val result = r2dbcExecutor(slice).updateOne("test writeEvent") { connection =>
connection
.createStatement(insertEventSql)
.bind(0, slice)
.bind(1, entityType)
.bind(2, persistenceId)
.bind(3, seqNr)
.bindTimestamp(4, timestamp)
.bind(5, stringSerializer.identifier)
.bindPayload(6, stringSerializer.toBinary(event))
}
Await.result(result, 5.seconds)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class BySliceQueryBucketsSpec extends AnyWordSpec with TestSuite with Matchers {

private val startTime = InstantFactory.now()
private val firstBucketStartTime = startTime.plusSeconds(60)
private val firstBucketStartEpochSeconds = firstBucketStartTime.toEpochMilli / 1000
private val firstBucketStartEpochSeconds = firstBucketStartTime.getEpochSecond

private def bucketStartEpochSeconds(bucketIndex: Int): Long =
firstBucketStartEpochSeconds + BucketDurationSeconds * bucketIndex
Expand Down Expand Up @@ -71,6 +71,22 @@ class BySliceQueryBucketsSpec extends AnyWordSpec with TestSuite with Matchers {
moreBuckets.size shouldBe Buckets.Limit
}

"clear until time" in {
buckets.clearUntil(startTime).size shouldBe buckets.size
buckets.clearUntil(firstBucketStartTime).size shouldBe buckets.size
buckets.clearUntil(firstBucketStartTime.plusSeconds(9)).size shouldBe buckets.size

buckets.clearUntil(firstBucketStartTime.plusSeconds(10)).size shouldBe buckets.size - 1
buckets.clearUntil(firstBucketStartTime.plusSeconds(11)).size shouldBe buckets.size - 1
buckets.clearUntil(firstBucketStartTime.plusSeconds(19)).size shouldBe buckets.size - 1

buckets.clearUntil(firstBucketStartTime.plusSeconds(31)).size shouldBe buckets.size - 3
buckets.clearUntil(firstBucketStartTime.plusSeconds(100)).size shouldBe 1 // keep last

// don't change createdAt
buckets.clearUntil(firstBucketStartTime.plusSeconds(31)).createdAt shouldBe buckets.createdAt
}

"provide start time for next query" in {
Buckets.empty
.add(List(Bucket(bucketStartEpochSeconds(0), 101), Bucket(bucketStartEpochSeconds(1), 202)))
Expand All @@ -83,7 +99,8 @@ class BySliceQueryBucketsSpec extends AnyWordSpec with TestSuite with Matchers {

Buckets.empty
.add(List(Bucket(bucketStartEpochSeconds(0), 101)))
.nextStartTime shouldBe None
.nextStartTime shouldBe Some(
firstBucketStartTime.minusSeconds(Buckets.BucketDurationSeconds).truncatedTo(ChronoUnit.SECONDS))
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (C) 2022 - 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.r2dbc.query

import org.scalatest.wordspec.AnyWordSpecLike

import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.persistence.r2dbc.TestConfig
import akka.persistence.r2dbc.TestData
import akka.persistence.r2dbc.TestDbLifecycle
import akka.persistence.r2dbc.internal.BySliceQuery.Buckets
import akka.persistence.r2dbc.internal.InstantFactory

class BucketCountSpec
extends ScalaTestWithActorTestKit(TestConfig.config)
with AnyWordSpecLike
with TestDbLifecycle
with TestData
with LogCapturing {

override def typedSystem: ActorSystem[_] = system

private val dao = settings.connectionFactorySettings.dialect.createQueryDao(r2dbcExecutorProvider)

"BySliceQuery.Dao" should {

"count events in 10 second buckets" in {
pendingIfMoreThanOneDataPartition()

val entityType = nextEntityType()
val pid1 = nextPid(entityType)
val pid2 = nextPid(entityType)
val slice1 = persistenceExt.sliceForPersistenceId(pid1)
val slice2 = persistenceExt.sliceForPersistenceId(pid2)

val startTime = InstantFactory.now().minusSeconds(3600)
val bucketStartTime = (startTime.getEpochSecond / 10) * 10

(0 until 10).foreach { i =>
writeEvent(slice1, pid1, 1 + i, startTime.plusSeconds(Buckets.BucketDurationSeconds * i), s"e1-$i")
writeEvent(slice2, pid2, 1 + i, startTime.plusSeconds(Buckets.BucketDurationSeconds * i), s"e1-$i")
}

val buckets =
dao
.countBuckets(entityType, 0, persistenceExt.numberOfSlices - 1, startTime, Buckets.Limit)
.futureValue
withClue(s"startTime $startTime ($bucketStartTime): ") {
buckets.size shouldBe 10
buckets.head.startTime shouldBe bucketStartTime
buckets.last.startTime shouldBe (bucketStartTime + 9 * Buckets.BucketDurationSeconds)
buckets.map(_.count).toSet shouldBe Set(2)
buckets.map(_.count).sum shouldBe (2 * 10)
}
}

"append empty bucket if no events in the last bucket, limit before now" in {
pendingIfMoreThanOneDataPartition()

val entityType = nextEntityType()
val pid1 = nextPid(entityType)
val pid2 = nextPid(entityType)
val slice1 = persistenceExt.sliceForPersistenceId(pid1)
val slice2 = persistenceExt.sliceForPersistenceId(pid2)

val limit = 100
val startTime = InstantFactory.now().minusSeconds(3600)
val bucketStartTime = (startTime.getEpochSecond / 10) * 10

(0 until 10).foreach { i =>
writeEvent(slice1, pid1, 1 + i, startTime.plusSeconds(Buckets.BucketDurationSeconds * i), s"e1-$i")
writeEvent(slice2, pid2, 1 + i, startTime.plusSeconds(Buckets.BucketDurationSeconds * i), s"e1-$i")
}

val buckets =
dao
.countBuckets(entityType, 0, persistenceExt.numberOfSlices - 1, startTime, limit)
.futureValue
withClue(s"startTime $startTime ($bucketStartTime): ") {
buckets.size shouldBe 11
buckets.head.startTime shouldBe bucketStartTime
// the toTimestamp of the sql query is one bucket more than fromTimestamp + (limit * BucketDurationSeconds)
buckets.last.startTime shouldBe (bucketStartTime + (limit + 1) * Buckets.BucketDurationSeconds)
buckets.last.count shouldBe 0
buckets.dropRight(1).map(_.count).toSet shouldBe Set(2)
buckets.map(_.count).sum shouldBe (2 * 10)
}
}

}

}
Loading

0 comments on commit e8d6ff0

Please sign in to comment.