diff --git a/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/store/dao/JdbcLedgerDaoSuite.scala b/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/store/dao/JdbcLedgerDaoSuite.scala index 88ce68f7991b..f3410c787ca8 100644 --- a/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/store/dao/JdbcLedgerDaoSuite.scala +++ b/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/store/dao/JdbcLedgerDaoSuite.scala @@ -443,6 +443,24 @@ private[dao] trait JdbcLedgerDaoSuite extends AkkaBeforeAndAfterAll with JdbcLed implicit ec: ExecutionContext): Future[(Offset, LedgerEntry.Transaction)] = store(divulgedContracts = Map.empty, offsetAndTx) + protected final def storeSync(commands: Vector[(Offset, LedgerEntry.Transaction)])( + implicit ec: ExecutionContext): Future[Vector[(Offset, LedgerEntry.Transaction)]] = { + + import scalaz.{Free, NaturalTransformation} + import scalaz.syntax.traverse._ + import scalaz.std.vector._ + import scalaz.std.scalaFuture._ + + val storeDelayed = (a: (Offset, LedgerEntry.Transaction)) => Free.suspend(Free.liftF(store(a))) + + // force synchronous future processing with Free monad + // to provide the guarantees that all transactions persisted in the specified order + val xs: Free[Future, Vector[(Offset, LedgerEntry.Transaction)]] = + commands.traverse(storeDelayed) + + xs.foldMap(NaturalTransformation.refl[Future]) + } + /** A transaction that creates the given key */ protected final def txCreateContractWithKey( party: Party, diff --git a/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/store/dao/JdbcLedgerDaoTransactionsSpec.scala b/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/store/dao/JdbcLedgerDaoTransactionsSpec.scala index 360491033011..019347fce161 100644 --- a/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/store/dao/JdbcLedgerDaoTransactionsSpec.scala +++ b/ledger/sandbox/src/test/lib/scala/com/digitalasset/platform/store/dao/JdbcLedgerDaoTransactionsSpec.scala @@ -19,10 +19,6 @@ import com.daml.platform.api.v1.event.EventOps.EventOps import com.daml.platform.store.entries.LedgerEntry import org.scalatest.{AsyncFlatSpec, Inside, LoneElement, Matchers, OptionValues} -import scalaz.syntax.traverse._ -import scalaz.std.vector._ -import scalaz.std.scalaFuture._ - import scala.concurrent.Future private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Inside with LoneElement { @@ -437,7 +433,7 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid val endOffsetFromTheFuture = nextOffset() for { - _ <- commands.traverse(x => store(x)) + _ <- storeSync(commands) result <- ledgerDao.transactionsReader .getFlatTransactions( @@ -453,8 +449,7 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid } // TODO(Leo): this should be converted to scalacheck test with random offset gaps and pageSize - // flaky, issue: #6760 - ignore should "return all transactions in the specified offset range when iterating with gaps in the offsets assigned to events and a page size that ensures a page ends in such a gap" in { + it should "return all transactions in the specified offset range when iterating with gaps in the offsets assigned to events and a page size that ensures a page ends in such a gap" in { // Simulates a gap in the offsets assigned to events, as they // can be assigned to party allocation, package uploads and // configuration updates as well @@ -466,7 +461,7 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid // the order of `nextOffset()` calls is important val beginOffset = nextOffset() - val commandWithOffsetGaps: Vector[(Offset, LedgerEntry.Transaction)] = + val commandsWithOffsetGaps: Vector[(Offset, LedgerEntry.Transaction)] = Vector(singleCreate) ++ offsetGap ++ Vector.fill(2)(singleCreate) ++ offsetGap ++ Vector.fill(3)(singleCreate) ++ offsetGap ++ offsetGap ++ @@ -474,10 +469,10 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid val endOffset = nextOffset() - commandWithOffsetGaps.size shouldBe 11 + commandsWithOffsetGaps should have length 11L for { - _ <- commandWithOffsetGaps.traverse(x => store(x)) + _ <- storeSync(commandsWithOffsetGaps) // `pageSize = 2` and the offset gaps in the `commandWithOffsetGaps` above are to make sure // that streaming works with event pages separated by offsets that don't have events in the store @@ -493,10 +488,9 @@ private[dao] trait JdbcLedgerDaoTransactionsSpec extends OptionValues with Insid readTxs = extractAllTransactions(response) } yield { - readTxs.size shouldBe commandWithOffsetGaps.size val readTxOffsets: Vector[String] = readTxs.map(_.offset) readTxOffsets shouldBe readTxOffsets.sorted - readTxOffsets shouldBe commandWithOffsetGaps.map(_._1.toHexString) + readTxOffsets shouldBe commandsWithOffsetGaps.map(_._1.toHexString) } }