Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Progress bucket count query if empty #645

Merged
merged 5 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internal
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Buckets.this")
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ import org.slf4j.Logger
* @param countByBucket
* Key is the epoch seconds for the start of the bucket. Value is the number of entries in the bucket.
*/
class Buckets(countByBucket: immutable.SortedMap[Buckets.EpochSeconds, Buckets.Count]) {
class Buckets(
countByBucket: immutable.SortedMap[Buckets.EpochSeconds, Buckets.Count],
val createdAt: Instant = InstantFactory.now()) {
import Buckets.{ Bucket, BucketDurationSeconds, Count, EpochSeconds }

val createdAt: Instant = InstantFactory.now()

def findTimeForLimit(from: Instant, atLeastCounts: Int): Option[Instant] = {
val fromEpochSeconds = from.toEpochMilli / 1000
val iter = countByBucket.iterator.dropWhile { case (key, _) => fromEpochSeconds >= key }
Expand Down Expand Up @@ -169,12 +169,22 @@ import org.slf4j.Logger
new Buckets(newCountByBucket)
}

def clearUntil(time: Instant): Buckets = {
val epochSeconds = time.minusSeconds(BucketDurationSeconds).toEpochMilli / 1000
val newCountByBucket = countByBucket.dropWhile { case (key, _) => epochSeconds >= key }
if (newCountByBucket.size == countByBucket.size)
this
else if (newCountByBucket.isEmpty)
new Buckets(immutable.SortedMap(countByBucket.last), createdAt = this.createdAt) // keep last
else
new Buckets(newCountByBucket, createdAt = this.createdAt)
}

def nextStartTime: Option[Instant] = {
// we only expect the last 2 buckets to change from previous bucket count query
if (size < 2)
if (isEmpty)
None
else {
val startSeconds = countByBucket.keysIterator.toVector(countByBucket.size - 2)
val startSeconds = countByBucket.last._1 - BucketDurationSeconds
Some(Instant.ofEpochSecond(startSeconds))
}
}
Expand Down Expand Up @@ -222,6 +232,16 @@ import org.slf4j.Logger
fromTimestamp: Instant,
limit: Int): Future[Seq[Bucket]]

protected def appendEmptyBucketIfLastIsMissing(
buckets: IndexedSeq[Bucket],
toTimestamp: Instant): IndexedSeq[Bucket] = {
val startTimeOfLastBucket = (toTimestamp.getEpochSecond / 10) * 10
if (buckets.last.startTime != startTimeOfLastBucket)
buckets :+ Bucket(startTimeOfLastBucket, 0)
else
buckets
}

}
}

Expand Down Expand Up @@ -267,11 +287,15 @@ import org.slf4j.Logger
// so continue until rowCount is 0. That means an extra query at the end to make sure there are no
// more to fetch.
if (state.queryCount == 0L || state.rowCount > 0) {
val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1, previous = state.latest)

val fromTimestamp = state.latest.timestamp
val fromSeqNr = highestSeenSeqNr(state.previous, state.latest)

val newState = state.copy(
rowCount = 0,
queryCount = state.queryCount + 1,
buckets = state.buckets.clearUntil(fromTimestamp),
previous = state.latest)

val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize) match {
case Some(t) =>
if (t.isBefore(endTimestamp)) t else endTimestamp
Expand Down Expand Up @@ -456,6 +480,7 @@ import org.slf4j.Logger
backtrackingCount = 1,
latestBacktracking = fromOffset,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
buckets = state.buckets.clearUntil(fromOffset.timestamp),
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,8 +863,8 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
fromTimestamp: Instant,
limit: Int): Future[Seq[Bucket]] = {

val now = InstantFactory.now() // not important to use database time
val toTimestamp = {
val now = InstantFactory.now() // not important to use database time
if (fromTimestamp == Instant.EPOCH)
now
else {
Expand All @@ -889,8 +889,10 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
if (log.isDebugEnabled)
result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice))

result

if (toTimestamp == now)
result
else
result.map(appendEmptyBucketIfLastIsMissing(_, toTimestamp))
}

private def additionalBindings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import akka.persistence.Persistence
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.BySliceQuery.Buckets
import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.BucketDurationSeconds
import akka.persistence.r2dbc.internal.InstantFactory
import akka.persistence.r2dbc.internal.JournalDao.SerializedJournalRow
import akka.persistence.r2dbc.internal.QueryDao
Expand Down Expand Up @@ -324,8 +325,8 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
limit: Int): Future[Seq[Bucket]] = {
val executor = executorProvider.executorFor(minSlice)

val now = InstantFactory.now() // not important to use database time
val toTimestamp = {
val now = InstantFactory.now() // not important to use database time
if (fromTimestamp == Instant.EPOCH)
now
else {
Expand All @@ -349,7 +350,10 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
if (log.isDebugEnabled)
result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice))

result
if (toTimestamp == now)
result
else
result.map(appendEmptyBucketIfLastIsMissing(_, toTimestamp))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,8 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider
fromTimestamp: Instant,
limit: Int): Future[Seq[Bucket]] = {

val now = InstantFactory.now() // not important to use database time
val toTimestamp = {
val now = InstantFactory.now() // not important to use database time
if (fromTimestamp == Instant.EPOCH)
now
else {
Expand All @@ -465,7 +465,10 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider
if (log.isDebugEnabled)
result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice))

result
if (toTimestamp == now)
result
else
result.map(appendEmptyBucketIfLastIsMissing(_, toTimestamp))

}

Expand Down
40 changes: 37 additions & 3 deletions core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ import akka.persistence.r2dbc.internal.R2dbcExecutor
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Suite
import org.slf4j.LoggerFactory

import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.h2.H2Dialect
import java.time.Instant

import scala.util.control.NonFatal

import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
import akka.persistence.r2dbc.internal.codec.TimestampCodec
import akka.persistence.r2dbc.internal.codec.PayloadCodec
import akka.persistence.r2dbc.internal.codec.QueryAdapter
import akka.persistence.typed.PersistenceId
import akka.serialization.SerializationExtension

trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
private val log = LoggerFactory.getLogger(getClass)

def typedSystem: ActorSystem[_]

Expand Down Expand Up @@ -74,4 +77,35 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
super.beforeAll()
}

// to be able to store events with specific timestamps
def writeEvent(slice: Int, persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this here since it was used in several tests

implicit val timestampCodec: TimestampCodec = settings.codecSettings.JournalImplicits.timestampCodec
implicit val payloadCodec: PayloadCodec = settings.codecSettings.JournalImplicits.journalPayloadCodec
implicit val queryAdapter: QueryAdapter = settings.codecSettings.JournalImplicits.queryAdapter
import TimestampCodec.TimestampCodecRichStatement
import PayloadCodec.RichStatement
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
val stringSerializer = SerializationExtension(typedSystem).serializerFor(classOf[String])

log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp)
val insertEventSql = sql"""
INSERT INTO ${settings.journalTableWithSchema(slice)}
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload)
VALUES (?, ?, ?, ?, ?, '', '', ?, '', ?)"""
val entityType = PersistenceId.extractEntityType(persistenceId)

val result = r2dbcExecutor(slice).updateOne("test writeEvent") { connection =>
connection
.createStatement(insertEventSql)
.bind(0, slice)
.bind(1, entityType)
.bind(2, persistenceId)
.bind(3, seqNr)
.bindTimestamp(4, timestamp)
.bind(5, stringSerializer.identifier)
.bindPayload(6, stringSerializer.toBinary(event))
}
Await.result(result, 5.seconds)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class BySliceQueryBucketsSpec extends AnyWordSpec with TestSuite with Matchers {

private val startTime = InstantFactory.now()
private val firstBucketStartTime = startTime.plusSeconds(60)
private val firstBucketStartEpochSeconds = firstBucketStartTime.toEpochMilli / 1000
private val firstBucketStartEpochSeconds = firstBucketStartTime.getEpochSecond

private def bucketStartEpochSeconds(bucketIndex: Int): Long =
firstBucketStartEpochSeconds + BucketDurationSeconds * bucketIndex
Expand Down Expand Up @@ -71,6 +71,22 @@ class BySliceQueryBucketsSpec extends AnyWordSpec with TestSuite with Matchers {
moreBuckets.size shouldBe Buckets.Limit
}

"clear until time" in {
buckets.clearUntil(startTime).size shouldBe buckets.size
buckets.clearUntil(firstBucketStartTime).size shouldBe buckets.size
buckets.clearUntil(firstBucketStartTime.plusSeconds(9)).size shouldBe buckets.size

buckets.clearUntil(firstBucketStartTime.plusSeconds(10)).size shouldBe buckets.size - 1
buckets.clearUntil(firstBucketStartTime.plusSeconds(11)).size shouldBe buckets.size - 1
buckets.clearUntil(firstBucketStartTime.plusSeconds(19)).size shouldBe buckets.size - 1

buckets.clearUntil(firstBucketStartTime.plusSeconds(31)).size shouldBe buckets.size - 3
buckets.clearUntil(firstBucketStartTime.plusSeconds(100)).size shouldBe 1 // keep last

// don't change createdAt
buckets.clearUntil(firstBucketStartTime.plusSeconds(31)).createdAt shouldBe buckets.createdAt
}

"provide start time for next query" in {
Buckets.empty
.add(List(Bucket(bucketStartEpochSeconds(0), 101), Bucket(bucketStartEpochSeconds(1), 202)))
Expand All @@ -83,7 +99,8 @@ class BySliceQueryBucketsSpec extends AnyWordSpec with TestSuite with Matchers {

Buckets.empty
.add(List(Bucket(bucketStartEpochSeconds(0), 101)))
.nextStartTime shouldBe None
.nextStartTime shouldBe Some(
firstBucketStartTime.minusSeconds(Buckets.BucketDurationSeconds).truncatedTo(ChronoUnit.SECONDS))
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (C) 2022 - 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.r2dbc.query

import org.scalatest.wordspec.AnyWordSpecLike

import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.persistence.r2dbc.TestConfig
import akka.persistence.r2dbc.TestData
import akka.persistence.r2dbc.TestDbLifecycle
import akka.persistence.r2dbc.internal.BySliceQuery.Buckets
import akka.persistence.r2dbc.internal.InstantFactory

class BucketCountSpec
extends ScalaTestWithActorTestKit(TestConfig.config)
with AnyWordSpecLike
with TestDbLifecycle
with TestData
with LogCapturing {

override def typedSystem: ActorSystem[_] = system

private val dao = settings.connectionFactorySettings.dialect.createQueryDao(r2dbcExecutorProvider)

"BySliceQuery.Dao" should {

"count events in 10 second buckets" in {
pendingIfMoreThanOneDataPartition()

val entityType = nextEntityType()
val pid1 = nextPid(entityType)
val pid2 = nextPid(entityType)
val slice1 = persistenceExt.sliceForPersistenceId(pid1)
val slice2 = persistenceExt.sliceForPersistenceId(pid2)

val startTime = InstantFactory.now().minusSeconds(3600)
val bucketStartTime = (startTime.getEpochSecond / 10) * 10

(0 until 10).foreach { i =>
writeEvent(slice1, pid1, 1 + i, startTime.plusSeconds(Buckets.BucketDurationSeconds * i), s"e1-$i")
writeEvent(slice2, pid2, 1 + i, startTime.plusSeconds(Buckets.BucketDurationSeconds * i), s"e1-$i")
}

val buckets =
dao
.countBuckets(entityType, 0, persistenceExt.numberOfSlices - 1, startTime, Buckets.Limit)
.futureValue
withClue(s"startTime $startTime ($bucketStartTime): ") {
buckets.size shouldBe 10
buckets.head.startTime shouldBe bucketStartTime
buckets.last.startTime shouldBe (bucketStartTime + 9 * Buckets.BucketDurationSeconds)
buckets.map(_.count).toSet shouldBe Set(2)
buckets.map(_.count).sum shouldBe (2 * 10)
}
}

"append empty bucket if no events in the last bucket, limit before now" in {
pendingIfMoreThanOneDataPartition()

val entityType = nextEntityType()
val pid1 = nextPid(entityType)
val pid2 = nextPid(entityType)
val slice1 = persistenceExt.sliceForPersistenceId(pid1)
val slice2 = persistenceExt.sliceForPersistenceId(pid2)

val limit = 100
val startTime = InstantFactory.now().minusSeconds(3600)
val bucketStartTime = (startTime.getEpochSecond / 10) * 10

(0 until 10).foreach { i =>
writeEvent(slice1, pid1, 1 + i, startTime.plusSeconds(Buckets.BucketDurationSeconds * i), s"e1-$i")
writeEvent(slice2, pid2, 1 + i, startTime.plusSeconds(Buckets.BucketDurationSeconds * i), s"e1-$i")
}

val buckets =
dao
.countBuckets(entityType, 0, persistenceExt.numberOfSlices - 1, startTime, limit)
.futureValue
withClue(s"startTime $startTime ($bucketStartTime): ") {
buckets.size shouldBe 11
buckets.head.startTime shouldBe bucketStartTime
// the toTimestamp of the sql query is one bucket more than fromTimestamp + (limit * BucketDurationSeconds)
buckets.last.startTime shouldBe (bucketStartTime + (limit + 1) * Buckets.BucketDurationSeconds)
buckets.last.count shouldBe 0
buckets.dropRight(1).map(_.count).toSet shouldBe Set(2)
buckets.map(_.count).sum shouldBe (2 * 10)
}
}

}

}
Loading