diff --git a/core/src/main/scala/io/qbeast/core/model/MetadataManager.scala b/core/src/main/scala/io/qbeast/core/model/MetadataManager.scala index 4dc394925..897461113 100644 --- a/core/src/main/scala/io/qbeast/core/model/MetadataManager.scala +++ b/core/src/main/scala/io/qbeast/core/model/MetadataManager.scala @@ -8,6 +8,7 @@ import io.qbeast.IISeq * @tparam FileDescriptor type of file descriptor */ trait MetadataManager[DataSchema, FileDescriptor] { + type Configuration = Map[String, String] /** * Gets the Snapshot for a given table @@ -33,6 +34,16 @@ trait MetadataManager[DataSchema, FileDescriptor] { def updateWithTransaction(tableID: QTableID, schema: DataSchema, append: Boolean)( writer: => (TableChanges, IISeq[FileDescriptor])): Unit + /** + * Updates the table metadata by overwriting the metadata configurations + * with the provided key-value pairs. + * @param tableID QTableID + * @param schema table schema + * @param update configurations used to overwrite the existing metadata + */ + def updateMetadataWithTransaction(tableID: QTableID, schema: DataSchema)( + update: => Configuration): Unit + /** * Updates the Revision with the given RevisionChanges * @param tableID the QTableID diff --git a/core/src/main/scala/io/qbeast/core/model/RevisionClasses.scala b/core/src/main/scala/io/qbeast/core/model/RevisionClasses.scala index 565ba6e2c..8f978ec03 100644 --- a/core/src/main/scala/io/qbeast/core/model/RevisionClasses.scala +++ b/core/src/main/scala/io/qbeast/core/model/RevisionClasses.scala @@ -4,7 +4,8 @@ import com.fasterxml.jackson.annotation.{JsonCreator, JsonValue} import com.fasterxml.jackson.databind.annotation.JsonSerialize import com.fasterxml.jackson.databind.annotation.JsonSerialize.Typing import io.qbeast.IISeq -import io.qbeast.core.transform.{Transformation, Transformer} +import io.qbeast.core.model.RevisionUtils.stagingID +import io.qbeast.core.transform.{EmptyTransformer, Transformation, Transformer} import scala.collection.immutable.SortedMap @@ -60,11 +61,46 @@ object Revision { desiredCubeSize, columnTransformers, Vector.empty) + } + + /** + * Initialize Revision for table conversion. The RevisionID for a converted table is 0. + * EmptyTransformers and EmptyTransformations are used. This Revision should always be + * superseded. + */ + def emptyRevision( + tableID: QTableID, + desiredCubeSize: Int, + columnsToIndex: Seq[String]): Revision = { + val emptyTransformers = columnsToIndex.map(s => EmptyTransformer(s)).toIndexedSeq + val emptyTransformations = emptyTransformers.map(_.makeTransformation(r => r)) + Revision( + stagingID, + System.currentTimeMillis(), + tableID, + desiredCubeSize, + emptyTransformers, + emptyTransformations) } } +object RevisionUtils { + val stagingID: RevisionID = 0 + + def isStaging(revisionID: RevisionID): Boolean = + revisionID == stagingID + + def isStaging(revision: Revision): Boolean = + isStaging(revision.revisionID) && + revision.columnTransformers.forall { + case _: EmptyTransformer => true + case _ => false + } + +} + /** * A revision of a QTable. * @param revisionID the identifier of the revision @@ -89,8 +125,7 @@ final case class Revision( assert(columnTransformers != null || transformations != null) /** - * * - * Controls that the this revision indexes all and only the provided columns. + * Controls that this revision indexes all and only the provided columns. * * @param columnsToIndex the column names to check. * @return true if the revision indexes all and only the provided columns. @@ -117,7 +152,7 @@ final case class Revision( /** * returns the normalized values - * @param values + * @param values row values for the indexing columns * @return the normalized values */ def transform(values: IISeq[_]): IISeq[Double] = { @@ -193,8 +228,9 @@ case class IndexStatus( cubesStatuses: SortedMap[CubeId, CubeStatus] = SortedMap.empty) extends Serializable { - def addAnnouncements(newAnnouncedSet: Set[CubeId]): IndexStatus = + def addAnnouncements(newAnnouncedSet: Set[CubeId]): IndexStatus = { copy(announcedSet = announcedSet ++ newAnnouncedSet) + } def cubesToOptimize: Set[CubeId] = announcedSet.diff(replicatedSet) diff --git a/core/src/main/scala/io/qbeast/core/transform/EmptyTransformation.scala b/core/src/main/scala/io/qbeast/core/transform/EmptyTransformation.scala new file mode 100644 index 000000000..61d61d8dd --- /dev/null +++ b/core/src/main/scala/io/qbeast/core/transform/EmptyTransformation.scala @@ -0,0 +1,13 @@ +package io.qbeast.core.transform + +/** + * An empty Transformation meant for empty revisions + */ +case class EmptyTransformation() extends Transformation { + + override def transform(value: Any): Double = 0d + + override def isSupersededBy(newTransformation: Transformation): Boolean = true + + override def merge(other: Transformation): Transformation = other +} diff --git a/core/src/main/scala/io/qbeast/core/transform/EmptyTransformer.scala b/core/src/main/scala/io/qbeast/core/transform/EmptyTransformer.scala new file mode 100644 index 000000000..a9b2631ac --- /dev/null +++ b/core/src/main/scala/io/qbeast/core/transform/EmptyTransformer.scala @@ -0,0 +1,22 @@ +package io.qbeast.core.transform + +import io.qbeast.core.model.QDataType + +object EmptyTransformer extends TransformerType { + override def transformerSimpleName: String = "empty" + + override def apply(columnName: String, dataType: QDataType): Transformer = + EmptyTransformer(columnName) + +} + +/** + * An empty Transformer meant for empty revisions + */ +case class EmptyTransformer(columnName: String) extends Transformer { + override protected def transformerType: TransformerType = EmptyTransformer + + override def stats: ColumnStats = NoColumnStats + + override def makeTransformation(row: String => Any): Transformation = EmptyTransformation() +} diff --git a/core/src/test/scala/io/qbeast/core/transform/EmptyTransformationTest.scala b/core/src/test/scala/io/qbeast/core/transform/EmptyTransformationTest.scala new file mode 100644 index 000000000..fa172af3e --- /dev/null +++ b/core/src/test/scala/io/qbeast/core/transform/EmptyTransformationTest.scala @@ -0,0 +1,35 @@ +package io.qbeast.core.transform + +import io.qbeast.core.model.DoubleDataType +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class EmptyTransformationTest extends AnyFlatSpec with Matchers { + it should "always map to the same value" in { + val t = EmptyTransformation() + + (1 to 100).foreach { i => + t.transform(i) shouldBe 0d + } + + t.transform(null) shouldBe 0d + } + + it should "be superseded by another Transformation" in { + val et = EmptyTransformation() + val ht = HashTransformation() + val lt = LinearTransformation(1d, 1.1, DoubleDataType) + + et.isSupersededBy(ht) shouldBe true + et.isSupersededBy(lt) shouldBe true + } + + it should "return the other Transformation when merging" in { + val et = EmptyTransformation() + val ht = HashTransformation() + val lt = LinearTransformation(1d, 1.1, DoubleDataType) + + et.merge(ht) shouldBe ht + et.merge(lt) shouldBe lt + } +} diff --git a/core/src/test/scala/io/qbeast/core/transform/TransformerTest.scala b/core/src/test/scala/io/qbeast/core/transform/TransformerTest.scala index fcfbf17fc..2ac139524 100644 --- a/core/src/test/scala/io/qbeast/core/transform/TransformerTest.scala +++ b/core/src/test/scala/io/qbeast/core/transform/TransformerTest.scala @@ -1,8 +1,9 @@ package io.qbeast.core.transform -import io.qbeast.core.model.{DateDataType, IntegerDataType, TimestampDataType} +import io.qbeast.core.model.{DateDataType, IntegerDataType, StringDataType, TimestampDataType} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers + import java.sql.{Date, Timestamp} class TransformerTest extends AnyFlatSpec with Matchers { @@ -94,4 +95,14 @@ class TransformerTest extends AnyFlatSpec with Matchers { transformer.maybeUpdateTransformation(currentTransformation, transformation) shouldBe None } + + "An EmptyTransformer" should "create an EmptyTransformation without stats" in { + EmptyTransformer.transformerSimpleName shouldBe "empty" + + val colName = "a" + val transformer = EmptyTransformer(colName, StringDataType) + + val transformation = transformer.makeTransformation(r => r) + transformation shouldBe a[EmptyTransformation] + } } diff --git a/docs/QbeastFormat.md b/docs/QbeastFormat.md index d57f53652..cff83895a 100644 --- a/docs/QbeastFormat.md +++ b/docs/QbeastFormat.md @@ -153,6 +153,36 @@ In Revision, you can find different information about the tree status and config In this case, we index columns `user_id` and `product_id` (which are both `Integers`) with a linear transformation. This means that they will not suffer any transformation besides the normalization. +### Staging Revision and ConvertToQbeastCommand +The introduction of the staging revision enables reading tables in a hybrid `qbeast + delta` state. +The non-qbeast `AddFile`s are considered as part of this staging revision, all belonging to the root. + +Its RevisionID is fixed to `stagingID = 0`, and it has `EmptyTransformer`s and `EmptyTransformation`s. +It is automatically created during the first write or when overwriting a table using qbeast. +For a table that is entirely written in `delta` or `parquet`, we can use the `ConvertToQbeastCommand` to create this revision: +```scala +import io.qbeast.spark.internal.commands.ConvertToQbeastCommand + +val path = "/pathToTable/" +val tableIdentifier = s"parquet.`$path`" +val columnsToIndex = Seq("col1", "col2", "col3") +val desiredCubeSize = 50000 + +ConvertToQbeastCommand(tableIdentifier, columnsToIndex, desiredCubeSize).run(spark) + +val qTable = spark.read.format("qbeast").load(path) +``` +By doing so, we also enable subsequent appends using either delta or qbeast. +Conversion on a partitioned table is not supported. + +`Compaction` can be performed on the staging revision to group small delta files: +```scala +import io.qbeast.spark.QbeastTable + +val table = QbeastTable.forPath(spark, "/pathToTable/") +table.compact(0) +``` + ### State changes in MetaData **Data de-normalization** is a crucial component behind our multi-dimensional index. Instead of storing an index in a separate tree-like data structure, we reorganize the data and their replications in an `OTree`, whose **hierarchical structure** is the actual index. diff --git a/src/main/scala/io/qbeast/spark/QbeastTable.scala b/src/main/scala/io/qbeast/spark/QbeastTable.scala index 2d7b22962..caaa24691 100644 --- a/src/main/scala/io/qbeast/spark/QbeastTable.scala +++ b/src/main/scala/io/qbeast/spark/QbeastTable.scala @@ -4,6 +4,7 @@ package io.qbeast.spark import io.qbeast.context.QbeastContext +import io.qbeast.core.model.RevisionUtils.isStaging import io.qbeast.core.model.{CubeId, CubeStatus, QTableID, RevisionID} import io.qbeast.spark.delta.DeltaQbeastSnapshot import io.qbeast.spark.internal.commands.{ @@ -56,13 +57,17 @@ class QbeastTable private ( * If doesn't exist or none is specified, would be the last available */ def optimize(revisionID: RevisionID): Unit = { - checkRevisionAvailable(revisionID) - OptimizeTableCommand(revisionID, indexedTable) - .run(sparkSession) + if (!isStaging(revisionID)) { + checkRevisionAvailable(revisionID) + OptimizeTableCommand(revisionID, indexedTable) + .run(sparkSession) + } } def optimize(): Unit = { - optimize(latestRevisionAvailableID) + if (!isStaging(latestRevisionAvailableID)) { + optimize(latestRevisionAvailableID) + } } /** @@ -73,14 +78,18 @@ class QbeastTable private ( * @return the sequence of cubes to optimize in string representation */ def analyze(revisionID: RevisionID): Seq[String] = { - checkRevisionAvailable(revisionID) - AnalyzeTableCommand(revisionID, indexedTable) - .run(sparkSession) - .map(_.getString(0)) + if (isStaging(revisionID)) Seq.empty + else { + checkRevisionAvailable(revisionID) + AnalyzeTableCommand(revisionID, indexedTable) + .run(sparkSession) + .map(_.getString(0)) + } } def analyze(): Seq[String] = { - analyze(latestRevisionAvailableID) + if (isStaging(latestRevisionAvailableID)) Seq.empty + else analyze(latestRevisionAvailableID) } /** @@ -103,7 +112,7 @@ class QbeastTable private ( val allCubeStatuses = qbeastSnapshot.loadLatestIndexStatus.cubesStatuses val cubeCount = allCubeStatuses.size - val depth = allCubeStatuses.map(_._1.depth).max + val depth = if (cubeCount == 0) 0 else allCubeStatuses.map(_._1.depth).max val rowCount = allCubeStatuses.flatMap(_._2.files.map(_.elementCount)).sum val dimensionCount = indexedColumns().size diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala b/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala index a9fa6b6c5..af76b47f2 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala @@ -5,14 +5,9 @@ package io.qbeast.spark.delta import io.qbeast.core.model.{QTableID, RevisionID, TableChanges} import io.qbeast.spark.delta.writer.StatsTracker.registerStatsTrackers +import io.qbeast.spark.utils.QbeastExceptionMessages.partitionedTableExceptionMsg import io.qbeast.spark.utils.TagColumns -import org.apache.spark.sql.delta.actions.{ - Action, - AddFile, - FileAction, - RemoveFile, - SetTransaction -} +import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.commands.DeltaCommand import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, DeltaOptions, OptimisticTransaction} import org.apache.spark.sql.execution.datasources.{ @@ -84,6 +79,24 @@ private[delta] case class DeltaMetadataWriter( } } + def updateMetadataWithTransaction(update: => Configuration): Unit = { + deltaLog.withNewTransaction { txn => + if (txn.metadata.partitionColumns.nonEmpty) { + throw AnalysisExceptionFactory.create(partitionedTableExceptionMsg) + } + + val config = update + val updatedConfig = config.foldLeft(txn.metadata.configuration) { case (accConf, (k, v)) => + accConf.updated(k, v) + } + val updatedMetadata = txn.metadata.copy(configuration = updatedConfig) + + val op = DeltaOperations.SetTableProperties(config) + txn.updateMetadata(updatedMetadata) + txn.commit(Seq.empty, op) + } + } + private def updateReplicatedFiles(tableChanges: TableChanges): Seq[Action] = { val revision = tableChanges.updatedRevision @@ -168,7 +181,7 @@ private[delta] case class DeltaMetadataWriter( addFiles.map(_.copy(dataChange = !rearrangeOnly)) ++ deletedFiles.map(_.copy(dataChange = !rearrangeOnly)) } else { - newFiles ++ deletedFiles + addFiles ++ deletedFiles } if (isOptimizeOperation) { diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala index abc4e31b4..cad5a847f 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala @@ -4,6 +4,7 @@ package io.qbeast.spark.delta import io.qbeast.IISeq +import io.qbeast.core.model.RevisionUtils.isStaging import io.qbeast.core.model._ import io.qbeast.spark.utils.{MetadataConfig, TagColumns} import org.apache.spark.sql.delta.Snapshot @@ -20,6 +21,8 @@ case class DeltaQbeastSnapshot(private val snapshot: Snapshot) extends QbeastSna def isInitial: Boolean = snapshot.version == -1 + private val isStagingFile = "tags IS NULL" + private val metadataMap: Map[String, String] = snapshot.metadata.configuration /** @@ -122,7 +125,8 @@ case class DeltaQbeastSnapshot(private val snapshot: Snapshot) extends QbeastSna * * @return an immutable Seq of Revision for qtable */ - override def loadAllRevisions: IISeq[Revision] = revisionsMap.values.toVector + override def loadAllRevisions: IISeq[Revision] = + revisionsMap.values.toVector /** * Obtain the last Revisions @@ -150,8 +154,11 @@ case class DeltaQbeastSnapshot(private val snapshot: Snapshot) extends QbeastSna * @return the latest Revision at a concrete timestamp */ override def loadRevisionAt(timestamp: Long): Revision = { - revisionsMap.values.find(_.timestamp <= timestamp).getOrElse { - throw AnalysisExceptionFactory.create(s"No space revision available before $timestamp") + val candidateRevisions = revisionsMap.values.filter(_.timestamp <= timestamp) + if (candidateRevisions.nonEmpty) candidateRevisions.maxBy(_.timestamp) + else { + throw AnalysisExceptionFactory + .create(s"No space revision available before $timestamp") } } @@ -161,8 +168,8 @@ case class DeltaQbeastSnapshot(private val snapshot: Snapshot) extends QbeastSna * @return the Dataset of QbeastBlocks */ def loadRevisionBlocks(revisionID: RevisionID): Dataset[AddFile] = { - snapshot.allFiles - .where(TagColumns.revision === lit(revisionID.toString)) + if (isStaging(revisionID)) snapshot.allFiles.where(isStagingFile) + else snapshot.allFiles.where(TagColumns.revision === lit(revisionID.toString)) } } diff --git a/src/main/scala/io/qbeast/spark/delta/IndexStatusBuilder.scala b/src/main/scala/io/qbeast/spark/delta/IndexStatusBuilder.scala index bfde5b047..524af78e0 100644 --- a/src/main/scala/io/qbeast/spark/delta/IndexStatusBuilder.scala +++ b/src/main/scala/io/qbeast/spark/delta/IndexStatusBuilder.scala @@ -3,8 +3,10 @@ */ package io.qbeast.spark.delta +import io.qbeast.core.model.RevisionUtils.isStaging import io.qbeast.core.model._ import io.qbeast.spark.delta.QbeastMetadataSQL._ +import io.qbeast.spark.utils.State.FLOODED import io.qbeast.spark.utils.TagColumns import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.functions.{col, collect_list, lit, min, sum} @@ -35,11 +37,35 @@ private[delta] class IndexStatusBuilder( qbeastSnapshot.loadRevisionBlocks(revision.revisionID) def build(): IndexStatus = { + val cubeStatus = + if (isStaging(revision)) stagingCubeStatuses + else buildCubesStatuses + IndexStatus( revision = revision, replicatedSet = replicatedSet, announcedSet = announcedSet, - cubesStatuses = buildCubesStatuses) + cubesStatuses = cubeStatus) + } + + def stagingCubeStatuses: SortedMap[CubeId, CubeStatus] = { + val root = revision.createCubeIdRoot() + val maxWeight = Weight.MaxValue + val blocks = revisionFiles + .collect() + .map(addFile => + QbeastBlock( + addFile.path, + revision.revisionID, + Weight.MinValue, + maxWeight, + FLOODED, + 0, + addFile.size, + addFile.modificationTime)) + .toIndexedSeq + + SortedMap(root -> CubeStatus(root, maxWeight, maxWeight.fraction, blocks)) } /** diff --git a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala index d6fe79c79..9a83a7775 100644 --- a/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala +++ b/src/main/scala/io/qbeast/spark/delta/OTreeIndex.scala @@ -4,17 +4,18 @@ package io.qbeast.spark.delta import io.qbeast.core.model.QbeastBlock +import io.qbeast.core.model.RevisionUtils.stagingID import io.qbeast.spark.index.query.{QueryExecutor, QuerySpecBuilder} import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow} -import org.apache.spark.sql.delta.Snapshot +import org.apache.spark.sql.delta.{DeltaLog, Snapshot} +import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.SparkSession import java.net.URI -import org.apache.spark.sql.delta.DeltaLog /** * FileIndex to prune files @@ -49,11 +50,28 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex { queryExecutor.execute() } + /** + * Collect Staging AddFiles from _delta_log and convert them into FileStatuses. + * The output is merged with those built from QbeastBlocks. + * @return + */ + private def stagingFiles: Seq[FileStatus] = { + qbeastSnapshot.loadRevisionBlocks(stagingID).collect().map { a: AddFile => + new FileStatus( + /* length */ a.size, + /* isDir */ false, + /* blockReplication */ 0, + /* blockSize */ 1, + /* modificationTime */ a.modificationTime, + absolutePath(a.path)) + } + } + override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - val fileStats = matchingBlocks(partitionFilters, dataFilters).map { qbeastBlock => + val qbeastFileStats = matchingBlocks(partitionFilters, dataFilters).map { qbeastBlock => new FileStatus( /* length */ qbeastBlock.size, /* isDir */ false, @@ -62,6 +80,8 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex { /* modificationTime */ qbeastBlock.modificationTime, absolutePath(qbeastBlock.path)) }.toArray + val stagingStats = stagingFiles + val fileStats = qbeastFileStats ++ stagingStats Seq(PartitionDirectory(new GenericInternalRow(Array.empty[Any]), fileStats)) @@ -80,12 +100,6 @@ case class OTreeIndex(index: TahoeLogFileIndex) extends FileIndex { override def partitionSchema: StructType = index.partitionSchema } -/** - * Object OTreeIndex to create a new OTreeIndex - * @param sparkSession the spark session - * @param path the path to the delta log - * @return the OTreeIndex - */ object OTreeIndex { def apply(spark: SparkSession, path: Path): OTreeIndex = { diff --git a/src/main/scala/io/qbeast/spark/delta/QbeastMetadataOperation.scala b/src/main/scala/io/qbeast/spark/delta/QbeastMetadataOperation.scala index b7bbeb156..6a17d4e78 100644 --- a/src/main/scala/io/qbeast/spark/delta/QbeastMetadataOperation.scala +++ b/src/main/scala/io/qbeast/spark/delta/QbeastMetadataOperation.scala @@ -3,8 +3,10 @@ */ package io.qbeast.spark.delta +import io.qbeast.core.model.RevisionUtils.stagingID import io.qbeast.core.model.{ReplicatedSet, Revision, TableChanges, mapper} import io.qbeast.spark.utils.MetadataConfig +import io.qbeast.spark.utils.MetadataConfig.{lastRevisionID, revision} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaMergingUtils} import org.apache.spark.sql.delta.{ @@ -78,16 +80,35 @@ private[delta] class QbeastMetadataOperation extends ImplicitMetadataOperation { private def updateQbeastRevision( baseConfiguration: Configuration, newRevision: Revision): Configuration = { - val newRevisionID = newRevision.revisionID - // Qbeast configuration metadata - baseConfiguration - .updated(MetadataConfig.lastRevisionID, newRevisionID.toString) - .updated( - s"${MetadataConfig.revision}.$newRevisionID", - mapper.writeValueAsString(newRevision)) + // Add staging revision, if necessary. The qbeast metadata configuration + // should always have a revision with RevisionID = stagingID. + val stagingRevisionKey = s"$revision.$stagingID" + val addStagingRevision = + newRevisionID == 1 && !baseConfiguration.contains(stagingRevisionKey) + val configuration = + if (!addStagingRevision) baseConfiguration + else { + // Create staging revision with EmptyTransformers (and EmptyTransformations). + // We modify its timestamp to secure loadRevisionAt + val stagingRevision = Revision + .emptyRevision( + newRevision.tableID, + newRevision.desiredCubeSize, + newRevision.columnTransformers.map(_.columnName)) + .copy(timestamp = newRevision.timestamp - 1) + + // Add the staging revision to the revisionMap without overwriting + // the latestRevisionID + baseConfiguration + .updated(stagingRevisionKey, mapper.writeValueAsString(stagingRevision)) + } + // Update latest revision id and add new revision to metadata + configuration + .updated(lastRevisionID, newRevisionID.toString) + .updated(s"$revision.$newRevisionID", mapper.writeValueAsString(newRevision)) } def updateQbeastMetadata( @@ -103,13 +124,11 @@ private[delta] class QbeastMetadataOperation extends ImplicitMetadataOperation { val dataSchema = StructType(schema.fields.map(field => field.copy(nullable = true, dataType = asNullable(field.dataType)))) - val mergedSchema = if (isOverwriteMode && canOverwriteSchema) { - dataSchema - } else { - SchemaMergingUtils.mergeSchemas(txn.metadata.schema, dataSchema) - } - val normalizedPartitionCols = - Seq.empty + val mergedSchema = + if (isOverwriteMode && canOverwriteSchema) dataSchema + else SchemaMergingUtils.mergeSchemas(txn.metadata.schema, dataSchema) + + val normalizedPartitionCols = Seq.empty // Merged schema will contain additional columns at the end val isNewSchema: Boolean = txn.metadata.schema != mergedSchema @@ -120,15 +139,17 @@ private[delta] class QbeastMetadataOperation extends ImplicitMetadataOperation { if (txn.readVersion == -1) Map.empty else if (isOverwriteMode) overwriteQbeastConfiguration(txn.metadata.configuration) else txn.metadata.configuration + // Qbeast configuration metadata - val configuration = if (isNewRevision || isOverwriteMode) { - updateQbeastRevision(baseConfiguration, latestRevision) - } else if (isOptimizeOperation) { - updateQbeastReplicatedSet( - baseConfiguration, - latestRevision, - tableChanges.announcedOrReplicatedSet) - } else { baseConfiguration } + val configuration = + if (isNewRevision || isOverwriteMode) { + updateQbeastRevision(baseConfiguration, latestRevision) + } else if (isOptimizeOperation) { + updateQbeastReplicatedSet( + baseConfiguration, + latestRevision, + tableChanges.announcedOrReplicatedSet) + } else baseConfiguration if (txn.readVersion == -1) { super.updateMetadata( @@ -163,15 +184,12 @@ private[delta] class QbeastMetadataOperation extends ImplicitMetadataOperation { } else if (isNewSchema) { recordDeltaEvent(txn.deltaLog, "delta.schemaValidation.failure") val errorBuilder = new MetadataMismatchErrorBuilder - if (isOverwriteMode) { - errorBuilder.addOverwriteBit() - } else { + if (isOverwriteMode) errorBuilder.addOverwriteBit() + else { errorBuilder.addSchemaMismatch(txn.metadata.schema, dataSchema, txn.metadata.id) } errorBuilder.finalizeAndThrow(spark.sessionState.conf) - } else { - txn.updateMetadata(txn.metadata.copy(configuration = configuration)) - } + } else txn.updateMetadata(txn.metadata.copy(configuration = configuration)) } override protected val canMergeSchema: Boolean = true diff --git a/src/main/scala/io/qbeast/spark/delta/SparkDeltaMetadataManager.scala b/src/main/scala/io/qbeast/spark/delta/SparkDeltaMetadataManager.scala index 2c70c3395..fe4e36307 100644 --- a/src/main/scala/io/qbeast/spark/delta/SparkDeltaMetadataManager.scala +++ b/src/main/scala/io/qbeast/spark/delta/SparkDeltaMetadataManager.scala @@ -4,11 +4,11 @@ package io.qbeast.spark.delta import io.qbeast.IISeq -import io.qbeast.core.model.{MetadataManager, _} -import org.apache.spark.sql.{SaveMode, SparkSession} -import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions} +import io.qbeast.core.model._ import org.apache.spark.sql.delta.actions.FileAction +import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{SaveMode, SparkSession} /** * Spark+Delta implementation of the MetadataManager interface @@ -26,6 +26,18 @@ object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction] metadataWriter.writeWithTransaction(writer) } + override def updateMetadataWithTransaction(tableID: QTableID, schema: StructType)( + update: => Configuration): Unit = { + val deltaLog = loadDeltaQbeastLog(tableID).deltaLog + val options = + new DeltaOptions(Map("path" -> tableID.id), SparkSession.active.sessionState.conf) + + val metadataWriter = + DeltaMetadataWriter(tableID, mode = SaveMode.Append, deltaLog, options, schema) + + metadataWriter.updateMetadataWithTransaction(update) + } + override def loadSnapshot(tableID: QTableID): DeltaQbeastSnapshot = { DeltaQbeastSnapshot(loadDeltaQbeastLog(tableID).deltaLog.update()) } diff --git a/src/main/scala/io/qbeast/spark/delta/writer/Compactor.scala b/src/main/scala/io/qbeast/spark/delta/writer/Compactor.scala index 5fb3ba04a..194f4970c 100644 --- a/src/main/scala/io/qbeast/spark/delta/writer/Compactor.scala +++ b/src/main/scala/io/qbeast/spark/delta/writer/Compactor.scala @@ -4,6 +4,7 @@ package io.qbeast.spark.delta.writer import io.qbeast.IISeq +import io.qbeast.core.model.RevisionUtils.isStaging import io.qbeast.core.model.{CubeId, QTableID, QbeastBlock, TableChanges, Weight} import io.qbeast.spark.utils.{State, TagUtils} import org.apache.hadoop.fs.Path @@ -59,13 +60,17 @@ case class Compactor( val revision = tableChanges.updatedRevision // Update the tags of the block with the information of the cubeBlocks - val tags: Map[String, String] = Map( - TagUtils.cube -> cubeId.string, - TagUtils.minWeight -> minWeight.value.toString, - TagUtils.maxWeight -> maxWeight.value.toString, - TagUtils.state -> state, - TagUtils.revision -> revision.revisionID.toString, - TagUtils.elementCount -> elementCount.toString) + val tags: Map[String, String] = + if (isStaging(revision)) null + else { + Map( + TagUtils.cube -> cubeId.string, + TagUtils.minWeight -> minWeight.value.toString, + TagUtils.maxWeight -> maxWeight.value.toString, + TagUtils.state -> state, + TagUtils.revision -> revision.revisionID.toString, + TagUtils.elementCount -> elementCount.toString) + } val writtenPath = new Path(tableID.id, s"${UUID.randomUUID()}.parquet") val writer: OutputWriter = factory.newInstance( @@ -93,7 +98,7 @@ case class Compactor( partitionValues = Map(), size = fileStatus.getLen, modificationTime = fileStatus.getModificationTime, - dataChange = true, + dataChange = false, stats = "", tags = tags) diff --git a/src/main/scala/io/qbeast/spark/index/query/QueryExecutor.scala b/src/main/scala/io/qbeast/spark/index/query/QueryExecutor.scala index 4c0ab31eb..91fa4d88c 100644 --- a/src/main/scala/io/qbeast/spark/index/query/QueryExecutor.scala +++ b/src/main/scala/io/qbeast/spark/index/query/QueryExecutor.scala @@ -16,7 +16,7 @@ import scala.collection.mutable class QueryExecutor(querySpecBuilder: QuerySpecBuilder, qbeastSnapshot: QbeastSnapshot) { /** - * Executes the query + * Executes the query on each revision according to their QuerySpec * @return the final sequence of blocks that match the query */ def execute(): Seq[QbeastBlock] = { diff --git a/src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala b/src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala index 01cd4b7d8..8076eedb0 100644 --- a/src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala +++ b/src/main/scala/io/qbeast/spark/index/query/QuerySpecBuilder.scala @@ -3,8 +3,8 @@ */ package io.qbeast.spark.index.query +import io.qbeast.core.model.RevisionUtils.isStaging import io.qbeast.core.model._ -import io.qbeast.spark.index.query import io.qbeast.spark.internal.expressions.QbeastMurmur3Hash import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.Resolver @@ -90,38 +90,42 @@ private[spark] class QuerySpecBuilder(sparkFilters: Seq[Expression]) extends Ser */ def extractQuerySpace(dataFilters: Seq[Expression], revision: Revision): QuerySpace = { - // Split conjunctive predicates val filters = dataFilters.flatMap(filter => splitConjunctivePredicates(filter)) - val indexedColumns = revision.columnTransformers.map(_.columnName) - - val (from, to) = - indexedColumns.map { columnName => - // Get the filters related to the column - val columnFilters = filters.filter(hasColumnReference(_, columnName)) - - // Get the coordinates of the column in the filters, - // if not found, use the overall coordinates - val columnFrom = columnFilters - .collectFirst { - case GreaterThan(_, Literal(value, _)) => sparkTypeToCoreType(value) - case GreaterThanOrEqual(_, Literal(value, _)) => sparkTypeToCoreType(value) - case EqualTo(_, Literal(value, _)) => sparkTypeToCoreType(value) - case IsNull(_) => null - } - - val columnTo = columnFilters - .collectFirst { - case LessThan(_, Literal(value, _)) => sparkTypeToCoreType(value) - case LessThanOrEqual(_, Literal(value, _)) => sparkTypeToCoreType(value) - case EqualTo(_, Literal(value, _)) => sparkTypeToCoreType(value) - case IsNull(_) => null - } - - (columnFrom, columnTo) - }.unzip - - QuerySpace(from, to, revision.transformations) + + // Include all revision space when no filter is applied on its indexing columns + if (filters.isEmpty) AllSpace() + else { + val indexedColumns = revision.columnTransformers.map(_.columnName) + + val (from, to) = + indexedColumns.map { columnName => + // Get the filters related to the column + val columnFilters = filters.filter(hasColumnReference(_, columnName)) + + // Get the coordinates of the column in the filters, + // if not found, use the overall coordinates + val columnFrom = columnFilters + .collectFirst { + case GreaterThan(_, Literal(value, _)) => sparkTypeToCoreType(value) + case GreaterThanOrEqual(_, Literal(value, _)) => sparkTypeToCoreType(value) + case EqualTo(_, Literal(value, _)) => sparkTypeToCoreType(value) + case IsNull(_) => null + } + + val columnTo = columnFilters + .collectFirst { + case LessThan(_, Literal(value, _)) => sparkTypeToCoreType(value) + case LessThanOrEqual(_, Literal(value, _)) => sparkTypeToCoreType(value) + case EqualTo(_, Literal(value, _)) => sparkTypeToCoreType(value) + case IsNull(_) => null + } + + (columnFrom, columnTo) + }.unzip + + QuerySpace(from, to, revision.transformations) + } } /** @@ -157,8 +161,14 @@ private[spark] class QuerySpecBuilder(sparkFilters: Seq[Expression]) extends Ser */ def build(revision: Revision): QuerySpec = { - val qbeastFilters = extractDataFilters(sparkFilters, revision) - query.QuerySpec(extractWeightRange(qbeastFilters), extractQuerySpace(qbeastFilters, revision)) + val (weightRange, querySpace) = + if (isStaging(revision)) { + (WeightRange(Weight(Int.MinValue), Weight(Int.MaxValue)), EmptySpace()) + } else { + val qbeastFilters = extractDataFilters(sparkFilters, revision) + (extractWeightRange(qbeastFilters), extractQuerySpace(qbeastFilters, revision)) + } + QuerySpec(weightRange, querySpace) } } diff --git a/src/main/scala/io/qbeast/spark/internal/commands/ConvertToQbeastCommand.scala b/src/main/scala/io/qbeast/spark/internal/commands/ConvertToQbeastCommand.scala new file mode 100644 index 000000000..48dfeae5e --- /dev/null +++ b/src/main/scala/io/qbeast/spark/internal/commands/ConvertToQbeastCommand.scala @@ -0,0 +1,96 @@ +/* + * Copyright 2021 Qbeast Analytics, S.L. + */ +package io.qbeast.spark.internal.commands + +import io.qbeast.core.model._ +import io.qbeast.spark.delta.{DeltaQbeastSnapshot, SparkDeltaMetadataManager} +import io.qbeast.spark.utils.MetadataConfig.{lastRevisionID, revision} +import io.qbeast.spark.utils.QbeastExceptionMessages.{ + incorrectIdentifierFormat, + partitionedTableExceptionMsg, + unsupportedFormatExceptionMsg +} +import org.apache.http.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.qbeast.config.DEFAULT_CUBE_SIZE +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.execution.command.LeafRunnableCommand +import org.apache.spark.sql.{AnalysisException, AnalysisExceptionFactory, Row, SparkSession} + +import java.util.Locale + +/** + * Command to convert a parquet or a delta table into a qbeast table. + * The command creates the an empty revision for the metadata, the qbeast options provided + * should be those with which the user want to index the table. Partitioned tables are not + * supported. + * @param identifier STRING, table identifier consisting of "format.`tablePath`" + * e.g. parquet.`/tmp/test/` + * @param columnsToIndex Seq[STRING], the columns on which the index is built + * e.g. Seq("col1", "col2") + * @param cubeSize INT, the desired cube size for the index + * e.g. 5000 + */ +@Experimental +case class ConvertToQbeastCommand( + identifier: String, + columnsToIndex: Seq[String], + cubeSize: Int = DEFAULT_CUBE_SIZE) + extends LeafRunnableCommand + with Logging { + + private def resolveTableFormat(spark: SparkSession): (String, TableIdentifier) = + identifier.split("\\.") match { + case Array(f, p) if f.nonEmpty && p.nonEmpty => + (f.toLowerCase(Locale.ROOT), spark.sessionState.sqlParser.parseTableIdentifier(p)) + case _ => + throw AnalysisExceptionFactory.create(incorrectIdentifierFormat(identifier)) + } + + override def run(spark: SparkSession): Seq[Row] = { + val (fileFormat, tableId) = resolveTableFormat(spark) + + val deltaLog = DeltaLog.forTable(spark, tableId.table) + val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.snapshot) + val isQbeast = qbeastSnapshot.loadAllRevisions.nonEmpty + + if (isQbeast) { + logInfo("The table you are trying to convert is already a qbeast table") + } else { + fileFormat match { + // Convert parquet to delta + case "parquet" => + try { + spark.sql(s"CONVERT TO DELTA parquet.${tableId.quotedString}") + } catch { + case e: AnalysisException => + val deltaMsg = e.getMessage() + throw AnalysisExceptionFactory.create( + partitionedTableExceptionMsg + + s"Failed to convert the parquet table into delta: $deltaMsg") + } + case "delta" => + case _ => throw AnalysisExceptionFactory.create(unsupportedFormatExceptionMsg(fileFormat)) + } + + // Convert delta to qbeast through metadata modification + val tableID = QTableID(tableId.table) + val schema = deltaLog.snapshot.schema + + SparkDeltaMetadataManager.updateMetadataWithTransaction(tableID, schema) { + val convRevision = Revision.emptyRevision(tableID, cubeSize, columnsToIndex) + val revisionID = convRevision.revisionID + + // Add staging revision to Revision Map, set it as the latestRevision + Map( + lastRevisionID -> revisionID.toString, + s"$revision.$revisionID" -> mapper.writeValueAsString(convRevision)) + } + } + + Seq.empty[Row] + } + +} diff --git a/src/main/scala/io/qbeast/spark/internal/rules/SampleRule.scala b/src/main/scala/io/qbeast/spark/internal/rules/SampleRule.scala index 37a7f638a..659b423ae 100644 --- a/src/main/scala/io/qbeast/spark/internal/rules/SampleRule.scala +++ b/src/main/scala/io/qbeast/spark/internal/rules/SampleRule.scala @@ -3,7 +3,7 @@ */ package io.qbeast.spark.internal.rules -import io.qbeast.core.model.Weight +import io.qbeast.core.model.{Weight, WeightRange} import io.qbeast.spark.internal.expressions.QbeastMurmur3Hash import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{And, GreaterThanOrEqual, LessThan, Literal} @@ -11,7 +11,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.SparkSession -import io.qbeast.core.model.WeightRange import io.qbeast.IndexedColumns import org.apache.spark.sql.execution.datasources.HadoopFsRelation import io.qbeast.spark.delta.OTreeIndex diff --git a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala index 8e21e9c6d..34344a06a 100644 --- a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala +++ b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala @@ -4,6 +4,7 @@ package io.qbeast.spark.table import io.qbeast.core.keeper.Keeper +import io.qbeast.core.model.RevisionUtils.isStaging import io.qbeast.core.model._ import io.qbeast.spark.delta.CubeDataLoader import io.qbeast.spark.index.QbeastColumns @@ -153,7 +154,7 @@ private[table] class IndexedTableImpl( * Add the required indexing parameters when the SaveMode is Append. * The user-provided parameters are respected. * @param latestRevision the latest revision - * @param params the parameters required for indexing + * @param parameters the parameters required for indexing */ private def addRequiredParams( latestRevision: Revision, @@ -175,15 +176,20 @@ private[table] class IndexedTableImpl( append: Boolean): BaseRelation = { val indexStatus = if (exists && append) { - val latestIndexStatus = snapshot.loadLatestIndexStatus - val updatedParameters = addRequiredParams(latestIndexStatus.revision, parameters) - if (checkRevisionParameters(QbeastOptions(updatedParameters), latestIndexStatus)) { - latestIndexStatus + val latestRevision = snapshot.loadLatestRevision + val updatedParameters = addRequiredParams(latestRevision, parameters) + if (isStaging(latestRevision)) { + IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, updatedParameters)) } else { - val oldRevisionID = latestIndexStatus.revision.revisionID - val newRevision = revisionBuilder - .createNextRevision(tableID, data.schema, updatedParameters, oldRevisionID) - IndexStatus(newRevision) + val latestIndexStatus = snapshot.loadIndexStatus(latestRevision.revisionID) + if (checkRevisionParameters(QbeastOptions(updatedParameters), latestIndexStatus)) { + latestIndexStatus + } else { + val oldRevisionID = latestIndexStatus.revision.revisionID + val newRevision = revisionBuilder + .createNextRevision(tableID, data.schema, updatedParameters, oldRevisionID) + IndexStatus(newRevision) + } } } else { IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, parameters)) @@ -219,7 +225,6 @@ private[table] class IndexedTableImpl( /** * Creates a QbeastBaseRelation for the given table. - * @param tableID the table identifier * @return the QbeastBaseRelation */ private def createQbeastBaseRelation(): BaseRelation = { @@ -332,7 +337,7 @@ private[table] class IndexedTableImpl( val schema = metadataManager.loadCurrentSchema(tableID) val currentIndexStatus = snapshot.loadIndexStatus(revisionID) - metadataManager.updateWithTransaction(tableID, schema, true) { + metadataManager.updateWithTransaction(tableID, schema, append = true) { // There's no affected table changes on compaction, so we send an empty object val tableChanges = BroadcastedTableChanges(None, currentIndexStatus, Map.empty) val fileActions = diff --git a/src/main/scala/io/qbeast/spark/utils/QbeastExceptionMessages.scala b/src/main/scala/io/qbeast/spark/utils/QbeastExceptionMessages.scala new file mode 100644 index 000000000..16a4bc42e --- /dev/null +++ b/src/main/scala/io/qbeast/spark/utils/QbeastExceptionMessages.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2021 Qbeast Analytics, S.L. + */ +package io.qbeast.spark.utils + +object QbeastExceptionMessages { + + /** + * Conversion error for attempting to convert a partitioned table + */ + val partitionedTableExceptionMsg: String = + """Converting a partitioned table into qbeast is not supported. + |Consider overwriting the entire data using qbeast.""".stripMargin.replaceAll("\n", " ") + + /** + * Conversion error for unsupported file format + * @return Exception message with the input file format + */ + def unsupportedFormatExceptionMsg: String => String = (fileFormat: String) => + s"Unsupported file format: $fileFormat" + + /** + * Conversion error for incorrect identifier format + * @return + */ + def incorrectIdentifierFormat: String => String = (identifier: String) => + "Required table identifier format: fileFormat.`` " + + s"identifier received: $identifier" + +} diff --git a/src/test/scala/io/qbeast/spark/delta/QbeastSnapshotTest.scala b/src/test/scala/io/qbeast/spark/delta/QbeastSnapshotTest.scala index 3ab1754e5..980b01710 100644 --- a/src/test/scala/io/qbeast/spark/delta/QbeastSnapshotTest.scala +++ b/src/test/scala/io/qbeast/spark/delta/QbeastSnapshotTest.scala @@ -8,7 +8,7 @@ import io.qbeast.core.model.{CubeStatus, QTableID} import io.qbeast.spark.index.SparkRevisionFactory import io.qbeast.spark.QbeastIntegrationTestSpec import org.apache.spark.sql.delta.DeltaLog -import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession} import org.scalatest.AppendedClues.convertToClueful class QbeastSnapshotTest extends QbeastIntegrationTestSpec { @@ -99,6 +99,31 @@ class QbeastSnapshotTest extends QbeastIntegrationTestSpec { } } + it should "throw an exception when no revision satisfy timestamp requirement" in + withQbeastContextSparkAndTmpDir { (spark, tmpDir) => + { + val invalidRevisionTimestamp = System.currentTimeMillis() + + val df = createDF(1000) + val names = List("age", "val2") + val cubeSize = 10 + val options = + Map("columnsToIndex" -> names.mkString(","), "cubeSize" -> cubeSize.toString) + + df.write + .format("qbeast") + .mode("overwrite") + .options(options) + .save(tmpDir) + + val deltaLog = DeltaLog.forTable(spark, tmpDir) + val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.snapshot) + an[AnalysisException] shouldBe thrownBy( + qbeastSnapshot.loadRevisionAt(invalidRevisionTimestamp)) + + } + } + "Overflowed set" should "contain only cubes that surpass desiredCubeSize" in withQbeastContextSparkAndTmpDir { (spark, tmpDir) => diff --git a/src/test/scala/io/qbeast/spark/index/NewRevisionTest.scala b/src/test/scala/io/qbeast/spark/index/NewRevisionTest.scala index 844aed9c4..4eaa118bb 100644 --- a/src/test/scala/io/qbeast/spark/index/NewRevisionTest.scala +++ b/src/test/scala/io/qbeast/spark/index/NewRevisionTest.scala @@ -3,13 +3,13 @@ */ package io.qbeast.spark.index +import io.qbeast.TestClasses._ import io.qbeast.spark.{QbeastIntegrationTestSpec, delta} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaLog import org.scalatest.PrivateMethodTester import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import io.qbeast.TestClasses._ class NewRevisionTest extends AnyFlatSpec @@ -42,7 +42,8 @@ class NewRevisionTest val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.snapshot) val spaceRevisions = qbeastSnapshot.loadAllRevisions - spaceRevisions.size shouldBe spaceMultipliers.length + // Including the staging revision + spaceRevisions.size shouldBe spaceMultipliers.length + 1 } @@ -121,7 +122,8 @@ class NewRevisionTest val deltaLog = DeltaLog.forTable(spark, tmpDir) val qbeastSnapshot = delta.DeltaQbeastSnapshot(deltaLog.snapshot) - qbeastSnapshot.loadAllRevisions.length shouldBe 2 + // Including the staging revision + qbeastSnapshot.loadAllRevisions.size shouldBe 3 qbeastSnapshot.loadLatestRevision.desiredCubeSize shouldBe cubeSize2 }) diff --git a/src/test/scala/io/qbeast/spark/index/query/QueryExecutorTest.scala b/src/test/scala/io/qbeast/spark/index/query/QueryExecutorTest.scala index 8b3ad317a..c60318d4e 100644 --- a/src/test/scala/io/qbeast/spark/index/query/QueryExecutorTest.scala +++ b/src/test/scala/io/qbeast/spark/index/query/QueryExecutorTest.scala @@ -121,7 +121,8 @@ class QueryExecutorTest extends QbeastIntegrationTestSpec { val deltaLog = DeltaLog.forTable(spark, tmpdir) val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.snapshot) - qbeastSnapshot.loadAllRevisions.size shouldBe 2 + // Including the staging revision + qbeastSnapshot.loadAllRevisions.size shouldBe 3 val filters = Seq.empty @@ -180,7 +181,7 @@ class QueryExecutorTest extends QbeastIntegrationTestSpec { val deltaLog = DeltaLog.forTable(spark, tmpdir) val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.snapshot) - val revision = qbeastSnapshot.loadAllRevisions.head + val revision = qbeastSnapshot.loadLatestRevision val indexStatus = qbeastSnapshot.loadIndexStatus(revision.revisionID) val innerCubesLevel1 = diff --git a/src/test/scala/io/qbeast/spark/index/query/QuerySpecBuilderTest.scala b/src/test/scala/io/qbeast/spark/index/query/QuerySpecBuilderTest.scala index 863035a64..6f40af862 100644 --- a/src/test/scala/io/qbeast/spark/index/query/QuerySpecBuilderTest.scala +++ b/src/test/scala/io/qbeast/spark/index/query/QuerySpecBuilderTest.scala @@ -30,7 +30,7 @@ class QuerySpecBuilderTest val columnTransformers = Seq(Transformer("linear", "id", IntegerDataType)).toIndexedSeq Revision( - 0, + 1, System.currentTimeMillis(), QTableID("test"), 100, @@ -96,7 +96,7 @@ class QuerySpecBuilderTest val columnTransformers = Seq(Transformer("hashing", "name", StringDataType)).toIndexedSeq val revision = Revision( - 0, + 1, System.currentTimeMillis(), QTableID("test"), 100, @@ -155,7 +155,8 @@ class QuerySpecBuilderTest "extractQuerySpace" should "filter Revision properly" in withSpark(_ => { // Revision space ranges: [0, 10], [10, 20], [20, 30], [30, 40] - val revisions = (0 to 3).map(i => createRevision(10 * i, 10 * (i + 1)).copy(revisionID = i)) + val revisions = + (0 to 3).map(i => createRevision(10 * i, 10 * (i + 1)).copy(revisionID = i + 1)) val expressions = Seq( ("id < -1", 0), @@ -179,7 +180,8 @@ class QuerySpecBuilderTest it should "retrieve all revisions with no filter expressions" in withSpark(_ => { // Revision space ranges: [0, 10], [10, 20], [20, 30], [30, 40] - val revisions = (0 to 3).map(i => createRevision(10 * i, 10 * (i + 1)).copy(revisionID = i)) + val revisions = + (0 to 3).map(i => createRevision(10 * i, 10 * (i + 1)).copy(revisionID = i + 1)) revisions.count { revision => val querySpec = new QuerySpecBuilder(Seq.empty).build(revision) diff --git a/src/test/scala/io/qbeast/spark/utils/ConvertToQbeastTest.scala b/src/test/scala/io/qbeast/spark/utils/ConvertToQbeastTest.scala new file mode 100644 index 000000000..6013b68a6 --- /dev/null +++ b/src/test/scala/io/qbeast/spark/utils/ConvertToQbeastTest.scala @@ -0,0 +1,239 @@ +package io.qbeast.spark.utils + +import io.qbeast.core.model.RevisionUtils.{isStaging, stagingID} +import io.qbeast.spark.delta.DeltaQbeastSnapshot +import io.qbeast.spark.internal.commands.ConvertToQbeastCommand +import io.qbeast.spark.utils.QbeastExceptionMessages.{ + incorrectIdentifierFormat, + partitionedTableExceptionMsg, + unsupportedFormatExceptionMsg +} +import io.qbeast.spark.{QbeastIntegrationTestSpec, QbeastTable} +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.scalatest.PrivateMethodTester + +class ConvertToQbeastTest extends QbeastIntegrationTestSpec with PrivateMethodTester { + val dataSize = 50000 + val numSparkPartitions = 20 + + val columnsToIndex: Seq[String] = Seq("user_id", "price", "event_type") + val dcs = 5000 + + val partitionedParquetExceptionMsg: String = + partitionedTableExceptionMsg + "Failed to convert the parquet table into delta: " + + def convertFromFormat( + spark: SparkSession, + format: String, + tablePath: String, + isPartitioned: Boolean = false): Unit = { + val data = loadTestData(spark).limit(dataSize).repartition(numSparkPartitions) + + // Write source data + if (isPartitioned) { + data.write + .mode("overwrite") + .partitionBy("event_type") + .format(format) + .save(tablePath) + } else { + data.write.mode("overwrite").format(format).save(tablePath) + } + + // Convert source data to qbeast + val tableIdentifier = s"$format.`$tablePath`" + ConvertToQbeastCommand(tableIdentifier, columnsToIndex, dcs).run(spark) + } + + def getQbeastSnapshot(spark: SparkSession, dir: String): DeltaQbeastSnapshot = { + val deltaLog = DeltaLog.forTable(spark, dir) + DeltaQbeastSnapshot(deltaLog.snapshot) + } + + behavior of "ConvertToQbeastCommand" + + it should "convert a delta table" in withSparkAndTmpDir((spark, tmpDir) => { + val fileFormat = "delta" + convertFromFormat(spark, fileFormat, tmpDir) + + val sourceDf = spark.read.format(fileFormat).load(tmpDir) + val qbeastDf = spark.read.format("qbeast").load(tmpDir) + + assertLargeDatasetEquality(qbeastDf, sourceDf) + + // All non-qbeast files are considered staging files and are placed + // directly into the staging revision(RevisionID = 0) + val indexStatus = getQbeastSnapshot(spark, tmpDir).loadIndexStatus(stagingID) + indexStatus.cubesStatuses.size shouldBe 1 + indexStatus.cubesStatuses.head._2.files.size shouldBe numSparkPartitions + }) + + it should "convert a parquet table" in withSparkAndTmpDir((spark, tmpDir) => { + val fileFormat = "parquet" + convertFromFormat(spark, fileFormat, tmpDir) + + val sourceDf = spark.read.format(fileFormat).load(tmpDir) + val qbeastDf = spark.read.format("qbeast").load(tmpDir) + + assertLargeDatasetEquality(qbeastDf, sourceDf) + + // All non-qbeast files are considered staging files and are placed + // directly into the staging revision(RevisionID = 0) + val indexStatus = getQbeastSnapshot(spark, tmpDir).loadIndexStatus(stagingID) + indexStatus.cubesStatuses.size shouldBe 1 + indexStatus.cubesStatuses.head._2.files.size shouldBe numSparkPartitions + }) + + it should "fail to convert a PARTITIONED delta table" in withSparkAndTmpDir((spark, tmpDir) => { + val fileFormat = "delta" + + val thrown = + the[AnalysisException] thrownBy + convertFromFormat(spark, fileFormat, tmpDir, isPartitioned = true) + + thrown.getMessage() should startWith(partitionedTableExceptionMsg) + }) + + it should "fail to convert a PARTITIONED parquet table" in withSparkAndTmpDir( + (spark, tmpDir) => { + val fileFormat = "parquet" + + val thrown = + the[AnalysisException] thrownBy + convertFromFormat(spark, fileFormat, tmpDir, isPartitioned = true) + + thrown.getMessage() should startWith(partitionedParquetExceptionMsg) + }) + + it should "fail to convert an unsupported format" in withSparkAndTmpDir((spark, tmpDir) => { + val fileFormat = "json" + + val thrown = + the[AnalysisException] thrownBy convertFromFormat(spark, fileFormat, tmpDir) + + // json not supported + thrown.getMessage() should startWith(unsupportedFormatExceptionMsg("json")) + }) + + it should "not create new revisions for a qbeast table" in withSparkAndTmpDir( + (spark, tmpDir) => { + loadTestData(spark) + .limit(dataSize) + .write + .format("qbeast") + .option("columnsToIndex", columnsToIndex.mkString(",")) + .option("cubeSize", dcs) + .save(tmpDir) + + val revisionsBefore = getQbeastSnapshot(spark, tmpDir).loadAllRevisions + ConvertToQbeastCommand(s"qbeast.`$tmpDir`", columnsToIndex, dcs).run(spark) + val revisionsAfter = getQbeastSnapshot(spark, tmpDir).loadAllRevisions + + // Revisions should not modify + revisionsAfter shouldBe revisionsBefore + }) + + it should "fail to convert when the identifier format is not correct" in withSparkAndTmpDir( + (spark, tmpDir) => { + val identifier = s"parquet`$tmpDir`" + val thrown = the[AnalysisException] thrownBy + ConvertToQbeastCommand(identifier, columnsToIndex, dcs).run(spark) + + thrown.getMessage shouldBe incorrectIdentifierFormat(identifier) + }) + + it should "preserve sampling accuracy" in withSparkAndTmpDir((spark, tmpDir) => { + convertFromFormat(spark, "parquet", tmpDir) + + val convertedTable = spark.read.format("qbeast").load(tmpDir) + val tolerance = 0.01 + + List(0.1, 0.2, 0.5, 0.7, 0.99).foreach(f => { + val sampleSize = convertedTable + .sample(withReplacement = false, f) + .count() + .toDouble + + val margin = dataSize * f * tolerance + sampleSize shouldBe (dataSize * f) +- margin + }) + }) + + "Appending to a converted table" should "create a new, non-staging revision" in + withSparkAndTmpDir((spark, tmpDir) => { + convertFromFormat(spark, "parquet", tmpDir) + + // Append qbeast data + loadTestData(spark) + .limit(dataSize) + .write + .mode("append") + .format("qbeast") + .save(tmpDir) + + // Should add new revision + val qs = getQbeastSnapshot(spark, tmpDir) + val allRevisions = qs.loadAllRevisions + val rev = qs.loadLatestRevision + + allRevisions.size shouldBe 2 + isStaging(rev) shouldBe false + }) + + "Analyzing the staging revision" should "not change the ANNOUNCED set" in + withSparkAndTmpDir((spark, tmpDir) => { + convertFromFormat(spark, "parquet", tmpDir) + + // Analyze the staging revision + val qbeastTable = QbeastTable.forPath(spark, tmpDir) + qbeastTable.analyze() + + // Preserve empty ANNOUNCED set + val qs = getQbeastSnapshot(spark, tmpDir) + qs.loadLatestIndexStatus.announcedSet.isEmpty shouldBe true + }) + + "Optimizing the staging revision" should "not replicate any data" in + withSparkAndTmpDir((spark, tmpDir) => { + val fileFormat = "parquet" + convertFromFormat(spark, fileFormat, tmpDir) + + // Analyze and optimize + val qbeastTable = QbeastTable.forPath(spark, tmpDir) + qbeastTable.analyze() + qbeastTable.optimize() + + // Compare DataFrames + val sourceDf = spark.read.format(fileFormat).load(tmpDir) + val qbeastDf = spark.read.format("qbeast").load(tmpDir) + assertLargeDatasetEquality(qbeastDf, sourceDf) + }) + + "Compacting the staging revision" should "reduce the number of delta AddFiles" in + withExtendedSparkAndTmpDir( + sparkConfWithSqlAndCatalog + .set("spark.qbeast.compact.minFileSize", "1") + .set("spark.qbeast.compact.maxFileSize", "2000000")) { (spark, tmpDir) => + { + val fileFormat = "delta" + convertFromFormat(spark, fileFormat, tmpDir) + + // Perform compaction + val qbeastTable = QbeastTable.forPath(spark, tmpDir) + qbeastTable.compact() + + // Compare DataFrames + val sourceDf = spark.read.format(fileFormat).load(tmpDir) + val qbeastDf = spark.read.format("qbeast").load(tmpDir) + assertLargeDatasetEquality(qbeastDf, sourceDf) + + // Standard staging revision behavior + val qs = getQbeastSnapshot(spark, tmpDir) + val stagingCs = qs.loadLatestIndexStatus.cubesStatuses + + stagingCs.size shouldBe 1 + stagingCs.head._2.files.size shouldBe <(numSparkPartitions) + } + } +} diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastCompactionIntegrationTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastCompactionIntegrationTest.scala index 23f851c76..b48a5edf2 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastCompactionIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastCompactionIntegrationTest.scala @@ -153,7 +153,8 @@ class QbeastCompactionIntegrationTest extends QbeastIntegrationTestSpec { writeTestDataInBatches(newData, tmpDir, 4) val tableId = QTableID(tmpDir) - SparkDeltaMetadataManager.loadSnapshot(tableId).loadAllRevisions.size shouldBe 2 + // Including the staging revision + SparkDeltaMetadataManager.loadSnapshot(tableId).loadAllRevisions.size shouldBe 3 // Count files written for each revision val allFiles = DeltaLog.forTable(spark, tmpDir).snapshot.allFiles @@ -192,7 +193,8 @@ class QbeastCompactionIntegrationTest extends QbeastIntegrationTestSpec { writeTestDataInBatches(newData, tmpDir, 4) val tableId = QTableID(tmpDir) - SparkDeltaMetadataManager.loadSnapshot(tableId).loadAllRevisions.size shouldBe 2 + // Including the staging revision + SparkDeltaMetadataManager.loadSnapshot(tableId).loadAllRevisions.size shouldBe 3 // Count files written for each revision val allFiles = DeltaLog.forTable(spark, tmpDir).snapshot.allFiles diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastDeltaStagingTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastDeltaStagingTest.scala new file mode 100644 index 000000000..8f268b776 --- /dev/null +++ b/src/test/scala/io/qbeast/spark/utils/QbeastDeltaStagingTest.scala @@ -0,0 +1,117 @@ +package io.qbeast.spark.utils + +import io.qbeast.TestClasses.T2 +import io.qbeast.core.model.RevisionUtils.stagingID +import io.qbeast.spark.delta.DeltaQbeastSnapshot +import io.qbeast.spark.{QbeastIntegrationTestSpec, QbeastTable} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.DeltaLog + +class QbeastDeltaStagingTest extends QbeastIntegrationTestSpec { + val columnsToIndex: Seq[String] = Seq("a", "b") + val qDataSize = 10000 + val dDataSize = 10000 + val totalSize: Long = qDataSize + dDataSize + val numSparkPartitions = 20 + + def writeHybridTable(spark: SparkSession, dir: String): Unit = { + import spark.implicits._ + + // Write qbeast data + val qdf = (0 until qDataSize).map(i => T2(i, i)).toDF("a", "b") + qdf.write + .format("qbeast") + .option("columnsToIndex", columnsToIndex.mkString(",")) + .option("cubeSize", "5000") + .save(dir) + + // Create hybrid table by appending delta data + val ddf = (qDataSize until qDataSize + dDataSize).map(i => T2(i, i)).toDF("a", "b") + ddf + .repartition(numSparkPartitions) + .write + .mode("append") + .format("delta") + .save(dir) + } + + "A qbeast + delta hybrid table" should "be read correctly" in withSparkAndTmpDir( + (spark, tmpDir) => { + writeHybridTable(spark, tmpDir) + + val qbeastDf = spark.read.format("qbeast").load(tmpDir) + val deltaDf = spark.read.format("delta").load(tmpDir) + assertLargeDatasetEquality(qbeastDf, deltaDf) + + // Should have the staging revision and the first revision + val snapshot = DeltaLog.forTable(spark, tmpDir).snapshot + val qs = DeltaQbeastSnapshot(snapshot) + qs.loadAllRevisions.size shouldBe 2 + qs.existsRevision(stagingID) + }) + + it should "be readable using both formats after Analyze and Optimize" in withSparkAndTmpDir( + (spark, tmpDir) => { + writeHybridTable(spark, tmpDir) + + // Analyze and Optimize the staging revision + val table = QbeastTable.forPath(spark, tmpDir) + table.analyze(stagingID) + table.optimize(stagingID) + + // DataFrame should not change by optimizing the staging revision + val qbeastDf = spark.read.format("qbeast").load(tmpDir) + val deltaDf = spark.read.format("delta").load(tmpDir) + qbeastDf.count() shouldBe deltaDf.count() + + assertLargeDatasetEquality(qbeastDf, deltaDf) + + // Should preserve standing staging revision behavior + val snapshot = DeltaLog.forTable(spark, tmpDir).snapshot + val qbeastSnapshot = DeltaQbeastSnapshot(snapshot) + val stagingIndexStatus = qbeastSnapshot.loadIndexStatus(stagingID) + stagingIndexStatus.cubesStatuses.size shouldBe 1 + stagingIndexStatus.replicatedOrAnnouncedSet.isEmpty shouldBe true + }) + + it should "correctly compact the staging revision" in withExtendedSparkAndTmpDir( + sparkConfWithSqlAndCatalog.set("spark.qbeast.compact.minFileSize", "1")) { (spark, tmpDir) => + { + writeHybridTable(spark, tmpDir) + + // Number of delta files before compaction + val deltaLog = DeltaLog.forTable(spark, tmpDir) + val qsBefore = DeltaQbeastSnapshot(deltaLog.snapshot) + val numFilesBefore = qsBefore.loadIndexStatus(stagingID).cubesStatuses.head._2.files.size + + // Perform compaction + val table = QbeastTable.forPath(spark, tmpDir) + table.compact(stagingID) + + // Number of delta files after compaction + val qsAfter = DeltaQbeastSnapshot(deltaLog.update()) + val numFilesAfter = qsAfter.loadIndexStatus(stagingID).cubesStatuses.head._2.files.size + + numFilesAfter shouldBe <(numFilesBefore) + + val deltaDf = spark.read.format("delta").load(tmpDir) + val qbeastDf = spark.read.format("qbeast").load(tmpDir) + + assertLargeDatasetEquality(qbeastDf, deltaDf) + } + } + + it should "sample correctly" in withSparkAndTmpDir((spark, tmpDir) => { + writeHybridTable(spark, tmpDir) + val qdf = spark.read.format("qbeast").load(tmpDir) + + val tolerance = 0.05 + List(0.1, 0.2, 0.5, 0.7, 0.99).foreach(f => { + val sampleSize = qdf.sample(withReplacement = false, f).count().toDouble + val margin = totalSize * f * tolerance + + sampleSize shouldBe (totalSize * f) +- margin + }) + }) + +} diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSparkCorrectnessTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSparkCorrectnessTest.scala index acce7fb03..87d5cd69a 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSparkCorrectnessTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSparkCorrectnessTest.scala @@ -84,7 +84,8 @@ class QbeastSparkCorrectnessTest extends QbeastIntegrationTestSpec { val deltaLog = DeltaLog.forTable(spark, tmpDir) val qbeastSnapshot = DeltaQbeastSnapshot(deltaLog.snapshot) - qbeastSnapshot.loadAllRevisions.size shouldBe 1 + // Include the staging revision + qbeastSnapshot.loadAllRevisions.size shouldBe 2 qbeastSnapshot.loadLatestRevision.revisionID shouldBe 1L } diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastTableTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastTableTest.scala index 90af3270f..fe8989ab3 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastTableTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastTableTest.scala @@ -82,7 +82,7 @@ class QbeastTableTest extends QbeastIntegrationTestSpec { val revision1 = createDF(spark) val columnsToIndex = Seq("age", "val2") val cubeSize = 100 - // WRITE SOME DATA + // WRITE SOME DATA, adds revisionIDs 0 and 1 writeTestData(revision1, columnsToIndex, cubeSize, tmpDir) val revision2 = revision1.withColumn("age", col("age") * 2) @@ -92,8 +92,9 @@ class QbeastTableTest extends QbeastIntegrationTestSpec { writeTestData(revision3, columnsToIndex, cubeSize, tmpDir, "append") val qbeastTable = QbeastTable.forPath(spark, tmpDir) - qbeastTable.revisionsIDs().size shouldBe 3 - qbeastTable.revisionsIDs() shouldBe Seq(1L, 2L, 3L) + // Including the staging revision + qbeastTable.revisionsIDs().size shouldBe 4 + qbeastTable.revisionsIDs() == Seq(0L, 1L, 2L, 3L) } }