From 1332bc97dd460a0a70ca3ca3c010cf2ceeea9a5c Mon Sep 17 00:00:00 2001 From: Jiawei Date: Fri, 27 Jan 2023 13:26:14 +0100 Subject: [PATCH] Update metadata in MetadataManager --- .../qbeast/core/model/MetadataManager.scala | 11 ++++++ .../spark/delta/DeltaMetadataWriter.scala | 27 ++++++++++---- .../delta/SparkDeltaMetadataManager.scala | 18 +++++++-- .../commands/ConvertToQbeastCommand.scala | 37 ++++++------------- 4 files changed, 58 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/io/qbeast/core/model/MetadataManager.scala b/core/src/main/scala/io/qbeast/core/model/MetadataManager.scala index 4dc394925..897461113 100644 --- a/core/src/main/scala/io/qbeast/core/model/MetadataManager.scala +++ b/core/src/main/scala/io/qbeast/core/model/MetadataManager.scala @@ -8,6 +8,7 @@ import io.qbeast.IISeq * @tparam FileDescriptor type of file descriptor */ trait MetadataManager[DataSchema, FileDescriptor] { + type Configuration = Map[String, String] /** * Gets the Snapshot for a given table @@ -33,6 +34,16 @@ trait MetadataManager[DataSchema, FileDescriptor] { def updateWithTransaction(tableID: QTableID, schema: DataSchema, append: Boolean)( writer: => (TableChanges, IISeq[FileDescriptor])): Unit + /** + * Updates the table metadata by overwriting the metadata configurations + * with the provided key-value pairs. + * @param tableID QTableID + * @param schema table schema + * @param update configurations used to overwrite the existing metadata + */ + def updateMetadataWithTransaction(tableID: QTableID, schema: DataSchema)( + update: => Configuration): Unit + /** * Updates the Revision with the given RevisionChanges * @param tableID the QTableID diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala b/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala index 1501f51db..af76b47f2 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala @@ -5,14 +5,9 @@ package io.qbeast.spark.delta import io.qbeast.core.model.{QTableID, RevisionID, TableChanges} import io.qbeast.spark.delta.writer.StatsTracker.registerStatsTrackers +import io.qbeast.spark.utils.QbeastExceptionMessages.partitionedTableExceptionMsg import io.qbeast.spark.utils.TagColumns -import org.apache.spark.sql.delta.actions.{ - Action, - AddFile, - FileAction, - RemoveFile, - SetTransaction -} +import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.commands.DeltaCommand import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, DeltaOptions, OptimisticTransaction} import org.apache.spark.sql.execution.datasources.{ @@ -84,6 +79,24 @@ private[delta] case class DeltaMetadataWriter( } } + def updateMetadataWithTransaction(update: => Configuration): Unit = { + deltaLog.withNewTransaction { txn => + if (txn.metadata.partitionColumns.nonEmpty) { + throw AnalysisExceptionFactory.create(partitionedTableExceptionMsg) + } + + val config = update + val updatedConfig = config.foldLeft(txn.metadata.configuration) { case (accConf, (k, v)) => + accConf.updated(k, v) + } + val updatedMetadata = txn.metadata.copy(configuration = updatedConfig) + + val op = DeltaOperations.SetTableProperties(config) + txn.updateMetadata(updatedMetadata) + txn.commit(Seq.empty, op) + } + } + private def updateReplicatedFiles(tableChanges: TableChanges): Seq[Action] = { val revision = tableChanges.updatedRevision diff --git a/src/main/scala/io/qbeast/spark/delta/SparkDeltaMetadataManager.scala b/src/main/scala/io/qbeast/spark/delta/SparkDeltaMetadataManager.scala index 2c70c3395..fe4e36307 100644 --- a/src/main/scala/io/qbeast/spark/delta/SparkDeltaMetadataManager.scala +++ b/src/main/scala/io/qbeast/spark/delta/SparkDeltaMetadataManager.scala @@ -4,11 +4,11 @@ package io.qbeast.spark.delta import io.qbeast.IISeq -import io.qbeast.core.model.{MetadataManager, _} -import org.apache.spark.sql.{SaveMode, SparkSession} -import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions} +import io.qbeast.core.model._ import org.apache.spark.sql.delta.actions.FileAction +import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{SaveMode, SparkSession} /** * Spark+Delta implementation of the MetadataManager interface @@ -26,6 +26,18 @@ object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction] metadataWriter.writeWithTransaction(writer) } + override def updateMetadataWithTransaction(tableID: QTableID, schema: StructType)( + update: => Configuration): Unit = { + val deltaLog = loadDeltaQbeastLog(tableID).deltaLog + val options = + new DeltaOptions(Map("path" -> tableID.id), SparkSession.active.sessionState.conf) + + val metadataWriter = + DeltaMetadataWriter(tableID, mode = SaveMode.Append, deltaLog, options, schema) + + metadataWriter.updateMetadataWithTransaction(update) + } + override def loadSnapshot(tableID: QTableID): DeltaQbeastSnapshot = { DeltaQbeastSnapshot(loadDeltaQbeastLog(tableID).deltaLog.update()) } diff --git a/src/main/scala/io/qbeast/spark/internal/commands/ConvertToQbeastCommand.scala b/src/main/scala/io/qbeast/spark/internal/commands/ConvertToQbeastCommand.scala index 222cacaee..48dfeae5e 100644 --- a/src/main/scala/io/qbeast/spark/internal/commands/ConvertToQbeastCommand.scala +++ b/src/main/scala/io/qbeast/spark/internal/commands/ConvertToQbeastCommand.scala @@ -4,7 +4,7 @@ package io.qbeast.spark.internal.commands import io.qbeast.core.model._ -import io.qbeast.spark.delta.DeltaQbeastSnapshot +import io.qbeast.spark.delta.{DeltaQbeastSnapshot, SparkDeltaMetadataManager} import io.qbeast.spark.utils.MetadataConfig.{lastRevisionID, revision} import io.qbeast.spark.utils.QbeastExceptionMessages.{ incorrectIdentifierFormat, @@ -16,7 +16,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.qbeast.config.DEFAULT_CUBE_SIZE import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.delta.DeltaLog -import org.apache.spark.sql.delta.DeltaOperations.Convert import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.{AnalysisException, AnalysisExceptionFactory, Row, SparkSession} @@ -76,33 +75,21 @@ case class ConvertToQbeastCommand( case _ => throw AnalysisExceptionFactory.create(unsupportedFormatExceptionMsg(fileFormat)) } - // Convert delta to qbeast - deltaLog.update() + // Convert delta to qbeast through metadata modification + val tableID = QTableID(tableId.table) + val schema = deltaLog.snapshot.schema - val txn = deltaLog.startTransaction() + SparkDeltaMetadataManager.updateMetadataWithTransaction(tableID, schema) { + val convRevision = Revision.emptyRevision(tableID, cubeSize, columnsToIndex) + val revisionID = convRevision.revisionID - // Converting a partitioned delta table is not supported, for qbeast files - // are not partitioned. - val isPartitionedDelta = txn.metadata.partitionColumns.nonEmpty - if (isPartitionedDelta) { - throw AnalysisExceptionFactory.create(partitionedTableExceptionMsg) + // Add staging revision to Revision Map, set it as the latestRevision + Map( + lastRevisionID -> revisionID.toString, + s"$revision.$revisionID" -> mapper.writeValueAsString(convRevision)) } - - val convRevision = Revision.emptyRevision(QTableID(tableId.table), cubeSize, columnsToIndex) - val revisionID = convRevision.revisionID - - // Update revision map - val updatedConf = - txn.metadata.configuration - .updated(lastRevisionID, revisionID.toString) - .updated(s"$revision.$revisionID", mapper.writeValueAsString(convRevision)) - - val newMetadata = - txn.metadata.copy(configuration = updatedConf) - - txn.updateMetadata(newMetadata) - txn.commit(Seq.empty, Convert(0, Seq.empty, collectStats = false, None)) } + Seq.empty[Row] }