From ccf27a1888fb00de64f9fccf97a40371a86bead7 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Thu, 30 Sep 2021 09:59:05 +0200 Subject: [PATCH 1/2] reduce number of collect calls --- .../qbeast/spark/sql/files/OTreeIndex.scala | 7 +- .../spark/sql/qbeast/QbeastSnapshot.scala | 69 +++++++++---------- .../qbeast/spark/index/NewRevisionTest.scala | 4 +- .../spark/utils/QbeastSnapshotTest.scala | 10 ++- 4 files changed, 39 insertions(+), 51 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/sql/files/OTreeIndex.scala b/src/main/scala/io/qbeast/spark/sql/files/OTreeIndex.scala index cdcf039e2..8ba4a487a 100644 --- a/src/main/scala/io/qbeast/spark/sql/files/OTreeIndex.scala +++ b/src/main/scala/io/qbeast/spark/sql/files/OTreeIndex.scala @@ -63,11 +63,9 @@ case class OTreeIndex(index: TahoeLogFileIndex, desiredCubeSize: Int) partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[AddFile] = { - val (qbeastDataFilters, tahoeDataFilters) = extractDataFilters(dataFilters) - val tahoeMatchingFiles = index.matchingFiles(partitionFilters, tahoeDataFilters) - + val (qbeastDataFilters, _) = extractDataFilters(dataFilters) val (minWeight, maxWeight) = extractWeightRange(qbeastDataFilters) - val files = sample(minWeight, maxWeight, tahoeMatchingFiles) + val files = sample(minWeight, maxWeight, qbeastSnapshot.allFiles) files } @@ -95,7 +93,6 @@ case class OTreeIndex(index: TahoeLogFileIndex, desiredCubeSize: Int) val filesVector = files.toVector qbeastSnapshot.spaceRevisions - .collect() .flatMap(spaceRevision => { val querySpace = QuerySpaceFromTo(originalFrom, originalTo, spaceRevision) diff --git a/src/main/scala/io/qbeast/spark/sql/qbeast/QbeastSnapshot.scala b/src/main/scala/io/qbeast/spark/sql/qbeast/QbeastSnapshot.scala index 203d2679f..3b63d2e9c 100644 --- a/src/main/scala/io/qbeast/spark/sql/qbeast/QbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/spark/sql/qbeast/QbeastSnapshot.scala @@ -14,7 +14,6 @@ import org.apache.spark.sql.delta.util.JsonUtils import org.apache.spark.sql.delta.{DeltaLogFileIndex, Snapshot} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{BinaryType, LongType, StructField, StructType} import org.apache.spark.sql.{Dataset, DatasetFactory, SparkSession} @@ -30,16 +29,15 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { def isInitial: Boolean = snapshot.version == -1 + lazy val allFiles = snapshot.allFiles.collect() + val indexedCols: Seq[String] = { - if (isInitial || snapshot.allFiles.isEmpty) Seq.empty - else ColumnsToIndex.decode(snapshot.allFiles.head.tags(indexedColsTag)) + if (isInitial || allFiles.isEmpty) Seq.empty + else ColumnsToIndex.decode(allFiles.head.tags(indexedColsTag)) } val dimensionCount: Int = indexedCols.length - private val spark = SparkSession.active - import spark.implicits._ - private val logSchema = StructType( Array( StructField(name = cubeColumnName, dataType = BinaryType, nullable = false), @@ -47,6 +45,9 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { private def fileToDataframe(fileStatus: Array[FileStatus]): Dataset[(Array[Byte], Long)] = { + val spark = SparkSession.active + import spark.implicits._ + val index = DeltaLogFileIndex(new ParquetFileFormat, fileStatus) val relation = HadoopFsRelation( @@ -67,9 +68,7 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { * @return a SpaceRevision with the corresponding timestamp if any */ def getRevisionAt(timestamp: Long): Option[SpaceRevision] = { - val spaceRevision = spaceRevisions.filter(_.timestamp.equals(timestamp)) - if (spaceRevision.isEmpty) None - else Some(spaceRevision.first()) + spaceRevisions.find(_.timestamp.equals(timestamp)) } /** @@ -79,7 +78,6 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { */ def cubeWeights(spaceRevision: SpaceRevision): Map[CubeId, Weight] = { indexState(spaceRevision) - .collect() .map(info => (CubeId(dimensionCount, info.cube), info.maxWeight)) .toMap } @@ -91,15 +89,12 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { * @return a map with key cube and value max weight */ def cubeNormalizedWeights(spaceRevision: SpaceRevision): Map[CubeId, Double] = { - indexState(spaceRevision) - .collect() - .map { - case CubeInfo(cube, Weight.MaxValue, size) => - (CubeId(dimensionCount, cube), NormalizedWeight(desiredCubeSize, size)) - case CubeInfo(cube, maxWeight, _) => - (CubeId(dimensionCount, cube), NormalizedWeight(maxWeight)) - } - .toMap + indexState(spaceRevision).map { + case CubeInfo(cube, Weight.MaxValue, size) => + (CubeId(dimensionCount, cube), NormalizedWeight(desiredCubeSize, size)) + case CubeInfo(cube, maxWeight, _) => + (CubeId(dimensionCount, cube), NormalizedWeight(maxWeight)) + }.toMap } /** @@ -111,7 +106,6 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { def overflowedSet(spaceRevision: SpaceRevision): Set[CubeId] = { indexState(spaceRevision) .filter(_.maxWeight != Weight.MaxValue) - .collect() .map(cubeInfo => CubeId(dimensionCount, cubeInfo.cube)) .toSet } @@ -123,6 +117,7 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { */ def replicatedSet(spaceRevision: SpaceRevision): Set[CubeId] = { + val spark = SparkSession.active val hadoopConf = spark.sessionState.newHadoopConf() snapshot.setTransactions.filter(_.appId.equals(indexId)) match { @@ -149,22 +144,20 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { * Returns available space revisions ordered by timestamp * @return a Dataset of SpaceRevision */ - def spaceRevisions: Dataset[SpaceRevision] = - snapshot.allFiles - .select(s"tags.$spaceTag") + lazy val spaceRevisions: Seq[SpaceRevision] = + allFiles + .map(_.tags(spaceTag)) .distinct - .map(a => JsonUtils.fromJson[SpaceRevision](a.getString(0))) - .orderBy(col("timestamp").desc) + .map(a => JsonUtils.fromJson[SpaceRevision](a)) + .sortBy(_.timestamp) /** * Returns the space revision with the higher timestamp * @return the space revision */ - def lastSpaceRevision: SpaceRevision = { + lazy val lastSpaceRevision: SpaceRevision = { // Dataset spaceRevisions is ordered by timestamp - spaceRevisions - .first() - + spaceRevisions.last } /** @@ -172,10 +165,7 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { * @param spaceRevision space revision * @return Dataset containing cube information */ - private def indexState(spaceRevision: SpaceRevision): Dataset[CubeInfo] = { - - val allFiles = snapshot.allFiles - val weightValueTag = weightMaxTag + ".value" + private def indexState(spaceRevision: SpaceRevision): Seq[CubeInfo] = { allFiles .filter(_.tags(spaceTag).equals(spaceRevision.toString)) @@ -186,9 +176,13 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { Weight(a.tags(weightMinTag).toInt), a.tags(stateTag), a.tags(elementCountTag).toLong)) - .groupBy(cubeTag) - .agg(min(weightValueTag), sum(elementCountTag)) - .map(row => CubeInfo(row.getAs[String](0), Weight(row.getAs[Int](1)), row.getAs[Long](2))) + .groupBy(_.cube) + .map { case (cube: String, blocks: Array[BlockStats]) => + val weightMax = blocks.map(_.maxWeight.value).min + val numElements = blocks.map(_.rowCount).sum + CubeInfo(cube, Weight(weightMax), numElements) + } + .toSeq } /** @@ -199,11 +193,10 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { */ def getCubeBlocks(cubes: Set[CubeId], spaceRevision: SpaceRevision): Seq[AddFile] = { val dimensionCount = this.dimensionCount - snapshot.allFiles + allFiles .filter(_.tags(spaceTag).equals(spaceRevision.toString)) .filter(_.tags(stateTag) != ANNOUNCED) .filter(a => cubes.contains(CubeId(dimensionCount, a.tags(cubeTag)))) - .collect() } } diff --git a/src/test/scala/io/qbeast/spark/index/NewRevisionTest.scala b/src/test/scala/io/qbeast/spark/index/NewRevisionTest.scala index eb21bfe4a..3e9fbc911 100644 --- a/src/test/scala/io/qbeast/spark/index/NewRevisionTest.scala +++ b/src/test/scala/io/qbeast/spark/index/NewRevisionTest.scala @@ -42,7 +42,7 @@ class NewRevisionTest val deltaLog = DeltaLog.forTable(spark, tmpDir) val qbeastSnapshot = QbeastSnapshot(deltaLog.snapshot, 10000) - qbeastSnapshot.spaceRevisions.count() shouldBe spaceMultipliers.length + qbeastSnapshot.spaceRevisions.size shouldBe spaceMultipliers.length } @@ -55,7 +55,7 @@ class NewRevisionTest val deltaLog = DeltaLog.forTable(spark, tmpDir) val qbeastSnapshot = QbeastSnapshot(deltaLog.snapshot, 10000) - val allWM = qbeastSnapshot.spaceRevisions.collect().map(qbeastSnapshot.cubeWeights) + val allWM = qbeastSnapshot.spaceRevisions.map(qbeastSnapshot.cubeWeights) allWM.foreach(wm => assert(wm.nonEmpty)) assert(allWM.distinct.length == allWM.length) diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSnapshotTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSnapshotTest.scala index b52441327..11bb2d42f 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSnapshotTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSnapshotTest.scala @@ -8,7 +8,7 @@ import io.qbeast.spark.index.OTreeAlgorithmTest.Client3 import io.qbeast.spark.index.{Weight} import io.qbeast.spark.model.CubeInfo import io.qbeast.spark.sql.qbeast.QbeastSnapshot -import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.delta.DeltaLog import org.scalatest.PrivateMethodTester import org.scalatest.flatspec.AnyFlatSpec @@ -25,9 +25,8 @@ class QbeastSnapshotTest val rdd = spark.sparkContext.parallelize( - Seq(Client3(size * size, s"student-$size", 20 + 1, 1000 + 123 + 1, 2567.3432143 + 1)) ++ - 1.until(size) - .map(i => Client3(i * i, s"student-$i", 20, 1000 + 123, 2567.3432143))) + 1.to(size) + .map(i => Client3(i * i, s"student-$i", i, i * 1000 + 123, i * 2567.3432143))) assert(rdd.count() == size) spark.createDataFrame(rdd) @@ -145,7 +144,7 @@ class QbeastSnapshotTest val deltaLog = DeltaLog.forTable(spark, tmpDir) val qbeastSnapshot = QbeastSnapshot(deltaLog.snapshot, oTreeAlgorithm.desiredCubeSize) - val indexStateMethod = PrivateMethod[Dataset[CubeInfo]]('indexState) + val indexStateMethod = PrivateMethod[Seq[CubeInfo]]('indexState) val indexState = qbeastSnapshot invokePrivate indexStateMethod(qbeastSnapshot.lastSpaceRevision) val overflowed = @@ -153,7 +152,6 @@ class QbeastSnapshotTest indexState .filter(cubeInfo => overflowed.contains(cubeInfo.cube)) - .collect() .foreach(cubeInfo => assert( cubeInfo.size > oTreeAlgorithm.desiredCubeSize * 0.9, From 523293d15f5c32af18981d7480f0cb6edf2dce89 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Thu, 21 Oct 2021 17:40:27 +0200 Subject: [PATCH 2/2] Change lazy vals to def --- .../scala/io/qbeast/spark/sql/qbeast/QbeastSnapshot.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/sql/qbeast/QbeastSnapshot.scala b/src/main/scala/io/qbeast/spark/sql/qbeast/QbeastSnapshot.scala index 3b63d2e9c..d0c0901c8 100644 --- a/src/main/scala/io/qbeast/spark/sql/qbeast/QbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/spark/sql/qbeast/QbeastSnapshot.scala @@ -144,7 +144,7 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { * Returns available space revisions ordered by timestamp * @return a Dataset of SpaceRevision */ - lazy val spaceRevisions: Seq[SpaceRevision] = + def spaceRevisions: Seq[SpaceRevision] = allFiles .map(_.tags(spaceTag)) .distinct @@ -155,7 +155,7 @@ case class QbeastSnapshot(snapshot: Snapshot, desiredCubeSize: Int) { * Returns the space revision with the higher timestamp * @return the space revision */ - lazy val lastSpaceRevision: SpaceRevision = { + def lastSpaceRevision: SpaceRevision = { // Dataset spaceRevisions is ordered by timestamp spaceRevisions.last }