Skip to content

Commit

Permalink
Performance regression for events without tags (#586)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren authored Aug 30, 2021
1 parent bef97bf commit 6f613b3
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# internals
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("akka.persistence.jdbc.journal.dao.BaseDao")
ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.journal.dao.DefaultJournalDao")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.journal.dao.DefaultJournalDao.queueWriteJournalRows")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.journal.dao.DefaultJournalDao.writeQueue")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.journal.dao.JournalQueries.insertAndReturn")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.journal.dao.JournalQueries.writeJournalRows")
ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.journal.dao.legacy.BaseByteArrayJournalDao")
ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao.queueWriteJournalRows")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao.writeQueue")
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,9 @@ class JdbcAsyncWriteJournal(config: Config) extends AsyncWriteJournal {

override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
// add timestamp to all payloads in all AtomicWrite messages
val now = System.currentTimeMillis()
val timedMessages =
messages.map { atomWrt =>
// since they are all persisted atomically,
// all PersistentRepr on the same atomic batch gets the same timestamp
val now = System.currentTimeMillis()
atomWrt.copy(payload = atomWrt.payload.map(pr => pr.withTimestamp(now)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import scala.collection.immutable.{ Seq, Vector }
import scala.concurrent.{ ExecutionContext, Future, Promise }

// Shared with the legacy DAO
trait BaseDao[T] {
abstract class BaseDao[T] {
implicit val mat: Materializer
implicit val ec: ExecutionContext

def baseDaoConfig: BaseDaoConfig

lazy val writeQueue: SourceQueueWithComplete[(Promise[Unit], Seq[T])] = Source
val writeQueue: SourceQueueWithComplete[(Promise[Unit], Seq[T])] = Source
.queue[(Promise[Unit], Seq[T])](baseDaoConfig.bufferSize, OverflowStrategy.dropNew)
.batchWeighted[(Seq[Promise[Unit]], Seq[T])](baseDaoConfig.batchSize, _._2.size, tup => Vector(tup._1) -> tup._2) {
case ((promises, rows), (newPromise, newRows)) => (promises :+ newPromise) -> (rows ++ newRows)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package akka.persistence.jdbc.journal.dao

import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.persistence.jdbc.AkkaSerialization
import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig }
import akka.persistence.jdbc.journal.dao.JournalTables.JournalAkkaSerializationRow
Expand Down Expand Up @@ -41,7 +42,7 @@ class DefaultJournalDao(
override def baseDaoConfig: BaseDaoConfig = journalConfig.daoConfig

override def writeJournalRows(xs: immutable.Seq[(JournalAkkaSerializationRow, Set[String])]): Future[Unit] = {
db.run(queries.writeJournalRows(xs).transactionally).map(_ => ())
db.run(queries.writeJournalRows(xs).transactionally).map(_ => ())(ExecutionContexts.parasitic)
}

val queries =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,25 @@ class JournalQueries(

import profile.api._

val insertAndReturn =
JournalTable.returning(JournalTable.map(_.ordering))
private val JournalTableC = Compiled(JournalTable)
private val insertAndReturn = JournalTable.returning(JournalTable.map(_.ordering))
private val TagTableC = Compiled(TagTable)

def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])(implicit ec: ExecutionContext) = {
val sorted = xs.sortBy((event => event._1.sequenceNumber))
val (events, tags) = sorted.unzip
for {
ids <- insertAndReturn ++= events
tagInserts = ids.zip(tags).flatMap { case (id, tags) => tags.map(tag => TagRow(id, tag)) }
_ <- TagTableC ++= tagInserts
} yield ()
if (sorted.exists(_._2.nonEmpty)) {
// only if there are any tags
val (events, tags) = sorted.unzip
for {
ids <- insertAndReturn ++= events
tagInserts = ids.zip(tags).flatMap { case (id, tags) => tags.map(tag => TagRow(id, tag)) }
_ <- TagTableC ++= tagInserts
} yield ()
} else {
// optimization avoid some work when not using tags
val events = sorted.map(_._1)
JournalTableC ++= events
}
}

private def selectAllJournalForPersistenceIdDesc(persistenceId: Rep[String]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ class ByteArrayJournalDao(
* The DefaultJournalDao contains all the knowledge to persist and load serialized journal entries
*/
trait BaseByteArrayJournalDao
extends JournalDaoWithUpdates
extends BaseDao[JournalRow]
with JournalDaoWithUpdates
with BaseJournalDaoWithReadMessages
with BaseDao[JournalRow]
with H2Compat {
val db: Database
val profile: JdbcProfile
val queries: JournalQueries
val journalConfig: JournalConfig
val baseDaoConfig: BaseDaoConfig = journalConfig.daoConfig
override def baseDaoConfig: BaseDaoConfig = journalConfig.daoConfig
val serializer: FlowPersistentReprSerializer[JournalRow]
implicit val ec: ExecutionContext
implicit val mat: Materializer
Expand Down

0 comments on commit 6f613b3

Please sign in to comment.