diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e6d1c5ec7..310b8b4cd 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -125,7 +125,7 @@ For example: sbt assembly $SPARK_HOME/bin/spark-shell \ ---jars ./target/scala-2.12/qbeast-spark-assembly-0.3.0.jar \ +--jars ./target/scala-2.12/qbeast-spark-assembly-0.3.1.jar \ --conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \ --packages io.delta:delta-core_2.12:1.2.0 ``` diff --git a/build.sbt b/build.sbt index 656c4eda7..bc31925fe 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ import Dependencies._ import xerial.sbt.Sonatype._ -val mainVersion = "0.3.0" +val mainVersion = "0.3.1" lazy val qbeastCore = (project in file("core")) .settings( diff --git a/docs/CloudStorages.md b/docs/CloudStorages.md index fa8ce3d8a..c60c5c8ba 100644 --- a/docs/CloudStorages.md +++ b/docs/CloudStorages.md @@ -32,8 +32,8 @@ Amazon Web Services S3 does not work with Hadoop 2.7. For this provider you'll n $SPARK_HOME/bin/spark-shell \ --conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \ --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \ ---packages io.qbeast:qbeast-spark_2.12:0.2.0,\ -io.delta:delta-core_2.12:1.0.0,\ +--packages io.qbeast:qbeast-spark_2.12:0.3.1,\ +io.delta:delta-core_2.12:1.2.0,\ com.amazonaws:aws-java-sdk:1.12.20,\ org.apache.hadoop:hadoop-common:3.2.0,\ org.apache.hadoop:hadoop-client:3.2.0,\ @@ -46,8 +46,8 @@ $SPARK_HOME/bin/spark-shell \ --conf spark.hadoop.fs.s3a.access.key=${AWS_ACCESS_KEY_ID} \ --conf spark.hadoop.fs.s3a.secret.key=${AWS_SECRET_ACCESS_KEY} \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ ---packages io.qbeast:qbeast-spark_2.12:0.2.0,\ -io.delta:delta-core_2.12:1.0.0,\ +--packages io.qbeast:qbeast-spark_2.12:0.3.1,\ +io.delta:delta-core_2.12:1.2.0,\ org.apache.hadoop:hadoop-common:3.2.0,\ org.apache.hadoop:hadoop-client:3.2.0,\ org.apache.hadoop:hadoop-aws:3.2.0 @@ -63,7 +63,7 @@ $SPARK_HOME/bin/spark-shell \ --conf spark.hadoop.fs.azure.account.key.blobqsql.blob.core.windows.net="${AZURE_BLOB_STORAGE_KEY}" \ --conf spark.hadoop.fs.AbstractFileSystem.wasb.impl=org.apache.hadoop.fs.azure.Wasb \ --conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \ ---packages io.qbeast:qbeast-spark_2.12:0.2.0,\ -io.delta:delta-core_2.12:1.0.0,\ +--packages io.qbeast:qbeast-spark_2.12:0.3.1,\ +io.delta:delta-core_2.12:1.2.0,\ org.apache.hadoop:hadoop-azure:3.2.0 ``` \ No newline at end of file diff --git a/docs/Quickstart.md b/docs/Quickstart.md index 73b3ea92a..b6de140fc 100644 --- a/docs/Quickstart.md +++ b/docs/Quickstart.md @@ -17,7 +17,7 @@ Inside the project folder, launch a spark-shell with the required **dependencies $SPARK_HOME/bin/spark-shell \ --conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \ --conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog \ ---packages io.qbeast:qbeast-spark_2.12:0.3.0,io.delta:delta-core_2.12:1.2.0 +--packages io.qbeast:qbeast-spark_2.12:0.3.1,io.delta:delta-core_2.12:1.2.0 ``` As an **_extra configuration_**, you can also change two global parameters of the index: diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala b/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala index e442c2ce4..a9fa6b6c5 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala @@ -4,6 +4,7 @@ package io.qbeast.spark.delta import io.qbeast.core.model.{QTableID, RevisionID, TableChanges} +import io.qbeast.spark.delta.writer.StatsTracker.registerStatsTrackers import io.qbeast.spark.utils.TagColumns import org.apache.spark.sql.delta.actions.{ Action, @@ -14,9 +15,16 @@ import org.apache.spark.sql.delta.actions.{ } import org.apache.spark.sql.delta.commands.DeltaCommand import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, DeltaOptions, OptimisticTransaction} +import org.apache.spark.sql.execution.datasources.{ + BasicWriteJobStatsTracker, + WriteJobStatsTracker +} import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisExceptionFactory, SaveMode, SparkSession} +import org.apache.spark.util.SerializableConfiguration + +import scala.collection.mutable.ListBuffer /** * DeltaMetadataWriter is in charge of writing data to a table @@ -45,10 +53,33 @@ private[delta] case class DeltaMetadataWriter( DeltaOperations.Write(mode, None, options.replaceWhere, options.userMetadata) } + /** + * Creates an instance of basic stats tracker on the desired transaction + * @param txn + * @return + */ + private def createStatsTrackers(txn: OptimisticTransaction): Seq[WriteJobStatsTracker] = { + val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer() + // Create basic stats trackers to add metrics on the Write Operation + val hadoopConf = sparkSession.sessionState.newHadoopConf() // TODO check conf + val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker( + new SerializableConfiguration(hadoopConf), + BasicWriteJobStatsTracker.metrics) + txn.registerSQLMetrics(sparkSession, basicWriteJobStatsTracker.driverSideMetrics) + statsTrackers.append(basicWriteJobStatsTracker) + statsTrackers + } + def writeWithTransaction(writer: => (TableChanges, Seq[FileAction])): Unit = { deltaLog.withNewTransaction { txn => + // Register metrics to use in the Commit Info + val statsTrackers = createStatsTrackers(txn) + registerStatsTrackers(statsTrackers) + // Execute write val (changes, newFiles) = writer + // Update Qbeast Metadata (replicated set, revision..) val finalActions = updateMetadata(txn, changes, newFiles) + // Commit the information to the DeltaLog txn.commit(finalActions, deltaOperation) } } diff --git a/src/main/scala/io/qbeast/spark/delta/writer/BlockWriter.scala b/src/main/scala/io/qbeast/spark/delta/writer/BlockWriter.scala index e73a9a374..57bb2731f 100644 --- a/src/main/scala/io/qbeast/spark/delta/writer/BlockWriter.scala +++ b/src/main/scala/io/qbeast/spark/delta/writer/BlockWriter.scala @@ -11,7 +11,12 @@ import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID} import org.apache.hadoop.mapreduce.TaskType import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.delta.actions.AddFile -import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.{ + OutputWriter, + OutputWriterFactory, + WriteJobStatsTracker, + WriteTaskStatsTracker +} import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -34,6 +39,7 @@ case class BlockWriter( schemaIndex: StructType, factory: OutputWriterFactory, serConf: SerializableConfiguration, + statsTrackers: Seq[WriteJobStatsTracker], qbeastColumns: QbeastColumns, tableChanges: TableChanges) extends Serializable { @@ -44,7 +50,7 @@ case class BlockWriter( * @param iter iterator of rows * @return the sequence of files added */ - def writeRow(iter: Iterator[InternalRow]): Iterator[AddFile] = { + def writeRow(iter: Iterator[InternalRow]): Iterator[(AddFile, TaskStats)] = { if (!iter.hasNext) { return Iterator.empty } @@ -74,17 +80,23 @@ case class BlockWriter( val rowWeight = Weight(row.getInt(qbeastColumns.weightColumnIndex)) // Writing the data in a single file. - blockCtx.writer.write(InternalRow.fromSeq(cleanRow.result())) + val internalRow = InternalRow.fromSeq(cleanRow.result()) + blockCtx.writer.write(internalRow) + blockCtx.blockStatsTracker.foreach( + _.newRow(blockCtx.path.toString, internalRow) + ) // Update statsTrackers blocks.updated(cubeId, blockCtx.update(rowWeight)) + } .values .flatMap { - case BlockContext(blockStats, _, _) if blockStats.elementCount == 0 => + case BlockContext(blockStats, _, _, _) if blockStats.elementCount == 0 => Iterator.empty // Do nothing, this is a empty partition case BlockContext( BlockStats(cube, maxWeight, minWeight, state, rowCount), writer, - path) => + path, + blockStatsTracker) => val tags = Map( TagUtils.cube -> cube, TagUtils.minWeight -> minWeight.value.toString, @@ -95,19 +107,27 @@ case class BlockWriter( writer.close() + // Process final stats + blockStatsTracker.foreach(_.closeFile(path.toString)) + val endTime = System.currentTimeMillis() + val finalStats = blockStatsTracker.map(_.getFinalStats(endTime)) + val taskStats = TaskStats(finalStats, endTime) + + // Process file status val fileStatus = path .getFileSystem(serConf.value) .getFileStatus(path) - Iterator( - AddFile( - path = path.getName(), - partitionValues = Map(), - size = fileStatus.getLen, - modificationTime = fileStatus.getModificationTime, - dataChange = true, - stats = "", - tags = tags)) + val addFile = AddFile( + path = path.getName(), + partitionValues = Map(), + size = fileStatus.getLen, + modificationTime = fileStatus.getModificationTime, + dataChange = true, + stats = "", + tags = tags) + + Iterator((addFile, taskStats)) } }.toIterator @@ -119,6 +139,7 @@ case class BlockWriter( * @return */ private def buildWriter(cubeId: CubeId, state: String, maxWeight: Weight): BlockContext = { + val blockStatsTracker = statsTrackers.map(_.newTaskInstance()) val writtenPath = new Path(dataPath, s"${UUID.randomUUID()}.parquet") val writer: OutputWriter = factory.newInstance( writtenPath.toString, @@ -126,7 +147,12 @@ case class BlockWriter( new TaskAttemptContextImpl( new JobConf(serConf.value), new TaskAttemptID("", 0, TaskType.REDUCE, 0, 0))) - BlockContext(BlockStats(cubeId.string, state, maxWeight), writer, writtenPath) + blockStatsTracker.foreach(_.newFile(writtenPath.toString)) // Update stats trackers + BlockContext( + BlockStats(cubeId.string, state, maxWeight), + writer, + writtenPath, + blockStatsTracker) } /* @@ -136,7 +162,12 @@ case class BlockWriter( * @param writer an instance of the file writer * @param path the path of the written file */ - private case class BlockContext(stats: BlockStats, writer: OutputWriter, path: Path) { + private case class BlockContext( + stats: BlockStats, + writer: OutputWriter, + path: Path, + blockStatsTracker: Seq[WriteTaskStatsTracker]) + extends Serializable { def update(minWeight: Weight): BlockContext = this.copy(stats = stats.update(minWeight)) diff --git a/src/main/scala/io/qbeast/spark/delta/writer/SparkDeltaDataWriter.scala b/src/main/scala/io/qbeast/spark/delta/writer/SparkDeltaDataWriter.scala index 2bc7be869..f8f91384f 100644 --- a/src/main/scala/io/qbeast/spark/delta/writer/SparkDeltaDataWriter.scala +++ b/src/main/scala/io/qbeast/spark/delta/writer/SparkDeltaDataWriter.scala @@ -10,19 +10,25 @@ import io.qbeast.spark.index.QbeastColumns.cubeColumnName import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job import org.apache.spark.qbeast.config.{MAX_FILE_SIZE_COMPACTION, MIN_FILE_SIZE_COMPACTION} +import org.apache.spark.sql.delta.DeltaStatsCollectionUtils import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.delta.actions.FileAction +import org.apache.spark.sql.delta.stats.DeltaFileStatistics +import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, WriteTaskStats} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +import java.net.URI import scala.collection.parallel.immutable.ParVector /** * Spark implementation of the DataWriter interface. */ -object SparkDeltaDataWriter extends DataWriter[DataFrame, StructType, FileAction] { +object SparkDeltaDataWriter + extends DataWriter[DataFrame, StructType, FileAction] + with DeltaStatsCollectionUtils { override def write( tableID: QTableID, @@ -35,19 +41,26 @@ object SparkDeltaDataWriter extends DataWriter[DataFrame, StructType, FileAction val job = Job.getInstance() val factory = new ParquetFileFormat().prepareWrite(sparkSession, job, Map.empty, schema) val serConf = new SerializableConfiguration(job.getConfiguration) + val statsTrackers = StatsTracker.getStatsTrackers() + // Get Stats Trackers for each file val qbeastColumns = QbeastColumns(qbeastData) - val blockWriter = - BlockWriter( - dataPath = tableID.id, - schema = schema, - schemaIndex = qbeastData.schema, - factory = factory, - serConf = serConf, - qbeastColumns = qbeastColumns, - tableChanges = tableChanges) - qbeastData + val dataColumns = qbeastData.schema.map(_.name).filterNot(QbeastColumns.contains) + val cleanedData = qbeastData.selectExpr(dataColumns: _*) + val fileStatsTrackers = getDeltaOptionalTrackers(cleanedData, sparkSession, tableID) + + val blockWriter = BlockWriter( + dataPath = tableID.id, + schema = schema, + schemaIndex = qbeastData.schema, + factory = factory, + serConf = serConf, + statsTrackers = statsTrackers ++ fileStatsTrackers, + qbeastColumns = qbeastColumns, + tableChanges = tableChanges) + + val finalActionsAndStats = qbeastData .repartition(col(cubeColumnName)) .queryExecution .executedPlan @@ -56,6 +69,34 @@ object SparkDeltaDataWriter extends DataWriter[DataFrame, StructType, FileAction .collect() .toIndexedSeq + val fileActions = finalActionsAndStats.map(_._1) + val stats = finalActionsAndStats.map(_._2) + + // Process BasicWriteJobStatsTrackers + var fileWriteTaskStats = Seq.empty[WriteTaskStats] + var basicWriteTaskStats = Seq.empty[WriteTaskStats] + var endTime = 0L + stats.foreach(taskStats => { + fileWriteTaskStats = + fileWriteTaskStats ++ taskStats.writeTaskStats.filter(_.isInstanceOf[DeltaFileStatistics]) + basicWriteTaskStats = (basicWriteTaskStats ++ + taskStats.writeTaskStats.filter(_.isInstanceOf[BasicWriteTaskStats])) + + endTime = math.max(endTime, taskStats.endTime) + }) + statsTrackers.foreach(_.processStats(basicWriteTaskStats, endTime)) + fileStatsTrackers.foreach(_.processStats(fileWriteTaskStats, endTime)) + + // Process DeltaWriteStats + val resultFiles = fileActions.map { a => + a.copy(stats = fileStatsTrackers + .map(_.recordedStats(new Path(new URI(a.path)).toString)) + .getOrElse(a.stats)) + } + + // Return FileAction + resultFiles + } /** diff --git a/src/main/scala/io/qbeast/spark/delta/writer/StatsTracker.scala b/src/main/scala/io/qbeast/spark/delta/writer/StatsTracker.scala new file mode 100644 index 000000000..fa31c2fb4 --- /dev/null +++ b/src/main/scala/io/qbeast/spark/delta/writer/StatsTracker.scala @@ -0,0 +1,20 @@ +/* + * Copyright 2021 Qbeast Analytics, S.L. + */ +package io.qbeast.spark.delta.writer + +import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker, WriteTaskStats} + +case class TaskStats(writeTaskStats: Seq[WriteTaskStats], endTime: Long) + +object StatsTracker { + + private var statsTrackers: Seq[WriteJobStatsTracker] = Seq.empty + + def registerStatsTrackers(newStatsTrackers: Seq[WriteJobStatsTracker]): Unit = { + statsTrackers = newStatsTrackers + } + + def getStatsTrackers(): Seq[WriteJobStatsTracker] = statsTrackers + +} diff --git a/src/main/scala/org/apache/spark/sql/delta/DeltaStatsCollectionUtils.scala b/src/main/scala/org/apache/spark/sql/delta/DeltaStatsCollectionUtils.scala new file mode 100644 index 000000000..143ab3382 --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/delta/DeltaStatsCollectionUtils.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2021 Qbeast Analytics, S.L. + */ +package org.apache.spark.sql.delta + +import io.qbeast.context.QbeastContext +import io.qbeast.core.model.QTableID +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.stats.{DeltaJobStatisticsTracker, StatisticsCollection} +import org.apache.spark.sql.functions.to_json + +trait DeltaStatsCollectionUtils { + + protected def getDeltaOptionalTrackers( + data: DataFrame, + sparkSession: SparkSession, + tableID: QTableID): Option[DeltaJobStatisticsTracker] = { + + if (QbeastContext.config.get(DeltaSQLConf.DELTA_COLLECT_STATS)) { + val output = data.queryExecution.analyzed.output + val statsDataSchema = output + + val deltaLog = DeltaLog.forTable(sparkSession, tableID.id) + val metadata = deltaLog.snapshot.metadata + val outputPath = deltaLog.dataPath + + val indexedCols = DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.fromMetaData(metadata) + + val statsCollection = new StatisticsCollection { + override def dataSchema = statsDataSchema.toStructType + + override val spark: SparkSession = data.sparkSession + override val numIndexedCols = indexedCols + } + + val statsColExpr: Expression = { + val dummyDF = Dataset.ofRows(sparkSession, LocalRelation(statsDataSchema)) + dummyDF + .select(to_json(statsCollection.statsCollector)) + .queryExecution + .analyzed + .expressions + .head + } + + Some( + new DeltaJobStatisticsTracker( + sparkSession.sessionState.newHadoopConf(), + outputPath, + statsDataSchema, + statsColExpr)) + } else { + None + } + } + +} diff --git a/src/test/scala/io/qbeast/spark/delta/writer/BlockWriterTest.scala b/src/test/scala/io/qbeast/spark/delta/writer/BlockWriterTest.scala index 7bc5b5f5c..252477967 100644 --- a/src/test/scala/io/qbeast/spark/delta/writer/BlockWriterTest.scala +++ b/src/test/scala/io/qbeast/spark/delta/writer/BlockWriterTest.scala @@ -52,7 +52,7 @@ class BlockWriterTest extends AnyFlatSpec with Matchers with QbeastIntegrationTe } .toSet - files.map(_.tags(TagUtils.cube)).forall(cube => cubes.contains(cube)) shouldBe true + files.map(_._1.tags(TagUtils.cube)).forall(cube => cubes.contains(cube)) shouldBe true } it should "work with empty partitions" in withSparkAndTmpDir { (spark, tmpDir) => diff --git a/src/test/scala/io/qbeast/spark/delta/writer/WriteTestSpec.scala b/src/test/scala/io/qbeast/spark/delta/writer/WriteTestSpec.scala index 6853c9b48..fbe2d21d1 100644 --- a/src/test/scala/io/qbeast/spark/delta/writer/WriteTestSpec.scala +++ b/src/test/scala/io/qbeast/spark/delta/writer/WriteTestSpec.scala @@ -6,7 +6,7 @@ import io.qbeast.spark.index.QbeastColumns._ import io.qbeast.spark.index.{NormalizedWeight, QbeastColumns, SparkRevisionFactory} import org.apache.hadoop.mapreduce.Job import org.apache.spark.rdd.RDD -import org.apache.spark.sql.execution.datasources.OutputWriterFactory +import org.apache.spark.sql.execution.datasources.{OutputWriterFactory} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.util.SerializableConfiguration @@ -91,12 +91,13 @@ case class WriteTestSpec(numDistinctCubes: Int, spark: SparkSession, tmpDir: Str val tableChanges: TableChanges = BroadcastedTableChanges(None, IndexStatus(rev), deltaNormalizedCubeWeights = weightMap) - val writer: BlockWriter = BlockWriter( + val writer: BlockWriter = new BlockWriter( dataPath = tmpDir, schema = data.schema, schemaIndex = indexed.schema, factory = factory, serConf = serConf, + statsTrackers = Seq.empty, qbeastColumns = qbeastColumns, tableChanges = tableChanges) diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastInsertToTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastInsertToTest.scala index ae1817338..b08cff0c7 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastInsertToTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastInsertToTest.scala @@ -27,9 +27,12 @@ class QbeastInsertToTest extends QbeastIntegrationTestSpec { // Insert using a SELECT statement spark.sql("insert into table t select * from t_lower") - spark.sql("select * from t").collect() shouldBe initialData - .union(insertDataLower) - .collect() + val dataInserted = spark.sql("SELECT * FROM t") + assertSmallDatasetEquality( + dataInserted, + initialData.union(insertDataLower), + orderedComparison = false, + ignoreNullable = true) } } @@ -56,9 +59,12 @@ class QbeastInsertToTest extends QbeastIntegrationTestSpec { // Insert using a FROM statement spark.sql("insert into table t from t_lower select *") - spark.sql("select * from t").collect() shouldBe initialData - .union(insertDataLower) - .collect() + val dataInserted = spark.sql("SELECT * FROM t") + assertSmallDatasetEquality( + dataInserted, + initialData.union(insertDataLower), + orderedComparison = false, + ignoreNullable = true) } } @@ -82,9 +88,12 @@ class QbeastInsertToTest extends QbeastIntegrationTestSpec { // Multi-Row Insert Using a VALUES Clause spark.sql("insert into table t (value) values (4),(5)") - spark.sql("select * from t").collect() shouldBe initialData - .union(Seq(4, 5).toDF()) - .collect() + val dataInserted = spark.sql("SELECT * FROM t") + assertSmallDatasetEquality( + dataInserted, + initialData.union(Seq(4, 5).toDF()), + orderedComparison = false, + ignoreNullable = true) } } @@ -108,9 +117,12 @@ class QbeastInsertToTest extends QbeastIntegrationTestSpec { // Single Row Insert Using a VALUES Clause spark.sql("insert into table t (value) values (4)") - spark.sql("select * from t").collect() shouldBe initialData - .union(Seq(4).toDF()) - .collect() + val dataInserted = spark.sql("SELECT * FROM t") + assertSmallDatasetEquality( + dataInserted, + initialData.union(Seq(4).toDF()), + orderedComparison = false, + ignoreNullable = true) } } @@ -136,9 +148,12 @@ class QbeastInsertToTest extends QbeastIntegrationTestSpec { // Insert using a TABLE statement spark.sql("INSERT INTO initial TABLE toInsert") - spark.sql("SELECT * FROM initial").collect() shouldBe initialData - .union(dataToInsert) - .collect() + val dataInserted = spark.sql("SELECT * FROM initial") + assertSmallDatasetEquality( + dataInserted, + initialData.union(dataToInsert), + orderedComparison = false, + ignoreNullable = true) } } @@ -163,9 +178,12 @@ class QbeastInsertToTest extends QbeastIntegrationTestSpec { // Insert using a COLUMN LIST spark.sql("INSERT INTO initial (a, b) VALUES ('5', 5), ('6', 6)") - spark.sql("SELECT * FROM initial").collect() shouldBe initialData - .union(dataToInsert) - .collect() + val dataInserted = spark.sql("SELECT * FROM initial") + assertSmallDatasetEquality( + dataInserted, + initialData.union(dataToInsert), + orderedComparison = false, + ignoreNullable = true) } } @@ -190,7 +208,12 @@ class QbeastInsertToTest extends QbeastIntegrationTestSpec { // Overwrite using a VALUE clause spark.sql("INSERT OVERWRITE initial VALUES ('5', 5), ('6', 6)") - spark.sql("SELECT * FROM initial").collect() shouldBe dataToInsert.collect() + val dataInserted = spark.sql("SELECT * FROM initial") + assertSmallDatasetEquality( + dataInserted, + dataToInsert, + orderedComparison = false, + ignoreNullable = true) } } @@ -216,7 +239,12 @@ class QbeastInsertToTest extends QbeastIntegrationTestSpec { // Overwrite using a SELECT statement spark.sql("INSERT OVERWRITE initial SELECT a, b FROM toInsert") - spark.sql("SELECT * FROM initial").collect() shouldBe dataToInsert.collect() + val dataInserted = spark.sql("SELECT * FROM initial") + assertSmallDatasetEquality( + dataInserted, + dataToInsert, + orderedComparison = false, + ignoreNullable = true) } } @@ -242,7 +270,12 @@ class QbeastInsertToTest extends QbeastIntegrationTestSpec { // Overwrite using a TABLE statement spark.sql("INSERT OVERWRITE initial TABLE toInsert") - spark.sql("SELECT * FROM initial").collect() shouldBe dataToInsert.collect() + val dataInserted = spark.sql("SELECT * FROM initial") + assertSmallDatasetEquality( + dataInserted, + dataToInsert, + orderedComparison = false, + ignoreNullable = true) } } @@ -268,7 +301,12 @@ class QbeastInsertToTest extends QbeastIntegrationTestSpec { // Overwrite using a FROM statement spark.sql("INSERT OVERWRITE initial FROM toInsert SELECT *") - spark.sql("SELECT * FROM initial").collect() shouldBe dataToInsert.collect() + val dataInserted = spark.sql("SELECT * FROM initial") + assertSmallDatasetEquality( + dataInserted, + dataToInsert, + orderedComparison = false, + ignoreNullable = true) } } @@ -330,7 +368,12 @@ class QbeastInsertToTest extends QbeastIntegrationTestSpec { // Overwrite using a TABLE statement on real data spark.sql("INSERT OVERWRITE initial TABLE toInsert") - spark.sql("SELECT * FROM initial").collect() shouldBe dataToInsert.collect() + val dataInserted = spark.sql("SELECT * FROM initial") + assertSmallDatasetEquality( + dataInserted, + dataToInsert, + orderedComparison = false, + ignoreNullable = true) } } diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSparkCorrectnessTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSparkCorrectnessTest.scala index ac488a74a..acce7fb03 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSparkCorrectnessTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSparkCorrectnessTest.scala @@ -5,6 +5,7 @@ package io.qbeast.spark.utils import io.qbeast.spark.QbeastIntegrationTestSpec import io.qbeast.spark.delta.DeltaQbeastSnapshot +import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.functions.{col, lit} @@ -160,4 +161,104 @@ class QbeastSparkCorrectnessTest extends QbeastIntegrationTestSpec { } } + + it should "work without specifying columnsToIndex " + + "while cause revision change by using a different cubeSize" in + withQbeastContextSparkAndTmpDir { (spark, tmpDir) => + { + val original = loadTestData(spark) + original.write + .format("qbeast") + .option("cubeSize", 10000) + .option("columnsToIndex", "user_id,product_id") + .save(tmpDir) + + original.write + .mode("append") + .format("qbeast") + .option("cubeSize", 5000) + .save(tmpDir) + val qDf = spark.read.format("qbeast").load(tmpDir) + + qDf.count shouldBe original.count * 2 + } + } + + it should "append to an existing qbeast table without specifying cubeSize" in + withQbeastContextSparkAndTmpDir { (spark, tmpDir) => + { + val original = loadTestData(spark) + original.write + .format("qbeast") + .option("cubeSize", 10000) + .option("columnsToIndex", "user_id,product_id") + .save(tmpDir) + + original.write + .mode("append") + .format("qbeast") + .option("columnsToIndex", "user_id,product_id") + .save(tmpDir) + val qDf = spark.read.format("qbeast").load(tmpDir) + + qDf.count shouldBe original.count * 2 + } + } + + "Appending to an non-existing table" should + "throw an exception if 'columnsToIndex' is not provided" in withQbeastContextSparkAndTmpDir { + (spark, tmpDir) => + { + val original = loadTestData(spark) + a[AnalysisException] shouldBe thrownBy { + original.write + .format("qbeast") + .option("cubeSize", 10000) + .save(tmpDir) + } + } + } + + def createSimpleTestData(spark: SparkSession): DataFrame = { + import spark.implicits._ + Seq(("A", 1), ("B", 2), ("C", 3)).toDF("a", "b") + } + + "Qbeast" should "output correctly Operation Metrics in Delta History" in + withQbeastContextSparkAndTmpDir((spark, tmpDir) => { + + val data = createSimpleTestData(spark) + data.write + .format("qbeast") + .option("columnsToIndex", "a,b") + .save(tmpDir + "/qbeast") + + val qbeastHistory = + spark.sql(s"DESCRIBE HISTORY '$tmpDir/qbeast'").select("operationMetrics") + + val historyMap = qbeastHistory.first().get(0).asInstanceOf[Map[String, String]] + historyMap.size should be > 0 + historyMap.get("numFiles") shouldBe Some("1") + historyMap.get("numOutputRows") shouldBe Some("3") + historyMap.get("numOutputBytes") shouldBe Some("660") + + }) + + it should "output correctly File Metrics in Commit Log" in withQbeastContextSparkAndTmpDir( + (spark, tmpDir) => { + + val data = createSimpleTestData(spark) + data.write + .format("qbeast") + .option("columnsToIndex", "a,b") + .save(tmpDir) + + val stats = DeltaLog.forTable(spark, tmpDir).snapshot.allFiles.collect().map(_.stats) + stats.length shouldBe >(0) + stats.head shouldBe "{\"numRecords\":3,\"minValues\":{\"a\":\"A\",\"b\":1}," + + "\"maxValues\":{\"a\":\"C\",\"b\":3}," + + "\"nullCount\":{\"a\":0,\"b\":0}}" + + }) + }