Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Operation and File Metrics [Delta] #139

Merged
merged 13 commits into from
Jan 16, 2023
31 changes: 31 additions & 0 deletions src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.qbeast.spark.delta

import io.qbeast.core.model.{QTableID, RevisionID, TableChanges}
import io.qbeast.spark.delta.writer.StatsTracker.registerStatsTrackers
import io.qbeast.spark.utils.TagColumns
import org.apache.spark.sql.delta.actions.{
Action,
Expand All @@ -14,9 +15,16 @@ import org.apache.spark.sql.delta.actions.{
}
import org.apache.spark.sql.delta.commands.DeltaCommand
import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, DeltaOptions, OptimisticTransaction}
import org.apache.spark.sql.execution.datasources.{
BasicWriteJobStatsTracker,
WriteJobStatsTracker
}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisExceptionFactory, SaveMode, SparkSession}
import org.apache.spark.util.SerializableConfiguration

import scala.collection.mutable.ListBuffer

/**
* DeltaMetadataWriter is in charge of writing data to a table
Expand Down Expand Up @@ -45,10 +53,33 @@ private[delta] case class DeltaMetadataWriter(
DeltaOperations.Write(mode, None, options.replaceWhere, options.userMetadata)
}

/**
* Creates an instance of basic stats tracker on the desired transaction
* @param txn
* @return
*/
private def createStatsTrackers(txn: OptimisticTransaction): Seq[WriteJobStatsTracker] = {
val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
// Create basic stats trackers to add metrics on the Write Operation
val hadoopConf = sparkSession.sessionState.newHadoopConf() // TODO check conf
val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
new SerializableConfiguration(hadoopConf),
BasicWriteJobStatsTracker.metrics)
txn.registerSQLMetrics(sparkSession, basicWriteJobStatsTracker.driverSideMetrics)
statsTrackers.append(basicWriteJobStatsTracker)
statsTrackers
}

def writeWithTransaction(writer: => (TableChanges, Seq[FileAction])): Unit = {
deltaLog.withNewTransaction { txn =>
// Register metrics to use in the Commit Info
val statsTrackers = createStatsTrackers(txn)
registerStatsTrackers(statsTrackers)
// Execute write
val (changes, newFiles) = writer
// Update Qbeast Metadata (replicated set, revision..)
val finalActions = updateMetadata(txn, changes, newFiles)
// Commit the information to the DeltaLog
txn.commit(finalActions, deltaOperation)
}
}
Expand Down
63 changes: 47 additions & 16 deletions src/main/scala/io/qbeast/spark/delta/writer/BlockWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID}
import org.apache.hadoop.mapreduce.TaskType
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.{
OutputWriter,
OutputWriterFactory,
WriteJobStatsTracker,
WriteTaskStatsTracker
}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

Expand All @@ -34,6 +39,7 @@ case class BlockWriter(
schemaIndex: StructType,
factory: OutputWriterFactory,
serConf: SerializableConfiguration,
statsTrackers: Seq[WriteJobStatsTracker],
qbeastColumns: QbeastColumns,
tableChanges: TableChanges)
extends Serializable {
Expand All @@ -44,7 +50,7 @@ case class BlockWriter(
* @param iter iterator of rows
* @return the sequence of files added
*/
def writeRow(iter: Iterator[InternalRow]): Iterator[AddFile] = {
def writeRow(iter: Iterator[InternalRow]): Iterator[(AddFile, TaskStats)] = {
if (!iter.hasNext) {
return Iterator.empty
}
Expand Down Expand Up @@ -74,17 +80,23 @@ case class BlockWriter(
val rowWeight = Weight(row.getInt(qbeastColumns.weightColumnIndex))

// Writing the data in a single file.
blockCtx.writer.write(InternalRow.fromSeq(cleanRow.result()))
val internalRow = InternalRow.fromSeq(cleanRow.result())
blockCtx.writer.write(internalRow)
blockCtx.blockStatsTracker.foreach(
_.newRow(blockCtx.path.toString, internalRow)
) // Update statsTrackers
blocks.updated(cubeId, blockCtx.update(rowWeight))

}
.values
.flatMap {
case BlockContext(blockStats, _, _) if blockStats.elementCount == 0 =>
case BlockContext(blockStats, _, _, _) if blockStats.elementCount == 0 =>
Iterator.empty // Do nothing, this is a empty partition
case BlockContext(
BlockStats(cube, maxWeight, minWeight, state, rowCount),
writer,
path) =>
path,
blockStatsTracker) =>
val tags = Map(
TagUtils.cube -> cube,
TagUtils.minWeight -> minWeight.value.toString,
Expand All @@ -95,19 +107,27 @@ case class BlockWriter(

writer.close()

// Process final stats
blockStatsTracker.foreach(_.closeFile(path.toString))
val endTime = System.currentTimeMillis()
val finalStats = blockStatsTracker.map(_.getFinalStats(endTime))
val taskStats = TaskStats(finalStats, endTime)

// Process file status
val fileStatus = path
.getFileSystem(serConf.value)
.getFileStatus(path)

Iterator(
AddFile(
path = path.getName(),
partitionValues = Map(),
size = fileStatus.getLen,
modificationTime = fileStatus.getModificationTime,
dataChange = true,
stats = "",
tags = tags))
val addFile = AddFile(
path = path.getName(),
partitionValues = Map(),
size = fileStatus.getLen,
modificationTime = fileStatus.getModificationTime,
dataChange = true,
stats = "",
tags = tags)

Iterator((addFile, taskStats))

}
}.toIterator
Expand All @@ -119,14 +139,20 @@ case class BlockWriter(
* @return
*/
private def buildWriter(cubeId: CubeId, state: String, maxWeight: Weight): BlockContext = {
val blockStatsTracker = statsTrackers.map(_.newTaskInstance())
val writtenPath = new Path(dataPath, s"${UUID.randomUUID()}.parquet")
val writer: OutputWriter = factory.newInstance(
writtenPath.toString,
schema,
new TaskAttemptContextImpl(
new JobConf(serConf.value),
new TaskAttemptID("", 0, TaskType.REDUCE, 0, 0)))
BlockContext(BlockStats(cubeId.string, state, maxWeight), writer, writtenPath)
blockStatsTracker.foreach(_.newFile(writtenPath.toString)) // Update stats trackers
BlockContext(
BlockStats(cubeId.string, state, maxWeight),
writer,
writtenPath,
blockStatsTracker)
}

/*
Expand All @@ -136,7 +162,12 @@ case class BlockWriter(
* @param writer an instance of the file writer
* @param path the path of the written file
*/
private case class BlockContext(stats: BlockStats, writer: OutputWriter, path: Path) {
private case class BlockContext(
stats: BlockStats,
writer: OutputWriter,
path: Path,
blockStatsTracker: Seq[WriteTaskStatsTracker])
extends Serializable {

def update(minWeight: Weight): BlockContext =
this.copy(stats = stats.update(minWeight))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@ import io.qbeast.spark.index.QbeastColumns.cubeColumnName
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.qbeast.config.{MAX_FILE_SIZE_COMPACTION, MIN_FILE_SIZE_COMPACTION}
import org.apache.spark.sql.delta.DeltaStatsCollectionUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.delta.actions.FileAction
import org.apache.spark.sql.delta.stats.DeltaFileStatistics
import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, WriteTaskStats}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

import java.net.URI
import scala.collection.parallel.immutable.ParVector

/**
* Spark implementation of the DataWriter interface.
*/
object SparkDeltaDataWriter extends DataWriter[DataFrame, StructType, FileAction] {
object SparkDeltaDataWriter
extends DataWriter[DataFrame, StructType, FileAction]
with DeltaStatsCollectionUtils {

override def write(
tableID: QTableID,
Expand All @@ -35,19 +41,26 @@ object SparkDeltaDataWriter extends DataWriter[DataFrame, StructType, FileAction
val job = Job.getInstance()
val factory = new ParquetFileFormat().prepareWrite(sparkSession, job, Map.empty, schema)
val serConf = new SerializableConfiguration(job.getConfiguration)
val statsTrackers = StatsTracker.getStatsTrackers()

// Get Stats Trackers for each file
val qbeastColumns = QbeastColumns(qbeastData)

val blockWriter =
BlockWriter(
dataPath = tableID.id,
schema = schema,
schemaIndex = qbeastData.schema,
factory = factory,
serConf = serConf,
qbeastColumns = qbeastColumns,
tableChanges = tableChanges)
qbeastData
val dataColumns = qbeastData.schema.map(_.name).filterNot(QbeastColumns.contains)
val cleanedData = qbeastData.selectExpr(dataColumns: _*)
val fileStatsTrackers = getDeltaOptionalTrackers(cleanedData, sparkSession, tableID)

val blockWriter = BlockWriter(
dataPath = tableID.id,
schema = schema,
schemaIndex = qbeastData.schema,
factory = factory,
serConf = serConf,
statsTrackers = statsTrackers ++ fileStatsTrackers,
qbeastColumns = qbeastColumns,
tableChanges = tableChanges)

val finalActionsAndStats = qbeastData
.repartition(col(cubeColumnName))
.queryExecution
.executedPlan
Expand All @@ -56,6 +69,34 @@ object SparkDeltaDataWriter extends DataWriter[DataFrame, StructType, FileAction
.collect()
.toIndexedSeq

val fileActions = finalActionsAndStats.map(_._1)
val stats = finalActionsAndStats.map(_._2)

// Process BasicWriteJobStatsTrackers
var fileWriteTaskStats = Seq.empty[WriteTaskStats]
var basicWriteTaskStats = Seq.empty[WriteTaskStats]
var endTime = 0L
stats.foreach(taskStats => {
fileWriteTaskStats =
fileWriteTaskStats ++ taskStats.writeTaskStats.filter(_.isInstanceOf[DeltaFileStatistics])
basicWriteTaskStats = (basicWriteTaskStats ++
taskStats.writeTaskStats.filter(_.isInstanceOf[BasicWriteTaskStats]))

endTime = math.max(endTime, taskStats.endTime)
})
statsTrackers.foreach(_.processStats(basicWriteTaskStats, endTime))
fileStatsTrackers.foreach(_.processStats(fileWriteTaskStats, endTime))

// Process DeltaWriteStats
val resultFiles = fileActions.map { a =>
a.copy(stats = fileStatsTrackers
.map(_.recordedStats(new Path(new URI(a.path)).toString))
.getOrElse(a.stats))
}

// Return FileAction
resultFiles

}

/**
Expand Down
20 changes: 20 additions & 0 deletions src/main/scala/io/qbeast/spark/delta/writer/StatsTracker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package io.qbeast.spark.delta.writer

import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker, WriteTaskStats}

case class TaskStats(writeTaskStats: Seq[WriteTaskStats], endTime: Long)

object StatsTracker {

private var statsTrackers: Seq[WriteJobStatsTracker] = Seq.empty

def registerStatsTrackers(newStatsTrackers: Seq[WriteJobStatsTracker]): Unit = {
statsTrackers = newStatsTrackers
}

def getStatsTrackers(): Seq[WriteJobStatsTracker] = statsTrackers

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package org.apache.spark.sql.delta

import io.qbeast.context.QbeastContext
import io.qbeast.core.model.QTableID
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.{DeltaJobStatisticsTracker, StatisticsCollection}
import org.apache.spark.sql.functions.to_json

trait DeltaStatsCollectionUtils {

protected def getDeltaOptionalTrackers(
data: DataFrame,
sparkSession: SparkSession,
tableID: QTableID): Option[DeltaJobStatisticsTracker] = {

if (QbeastContext.config.get(DeltaSQLConf.DELTA_COLLECT_STATS)) {
val output = data.queryExecution.analyzed.output
val statsDataSchema = output

val deltaLog = DeltaLog.forTable(sparkSession, tableID.id)
val metadata = deltaLog.snapshot.metadata
val outputPath = deltaLog.dataPath

val indexedCols = DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.fromMetaData(metadata)

val statsCollection = new StatisticsCollection {
override def dataSchema = statsDataSchema.toStructType

override val spark: SparkSession = data.sparkSession
override val numIndexedCols = indexedCols
}

val statsColExpr: Expression = {
val dummyDF = Dataset.ofRows(sparkSession, LocalRelation(statsDataSchema))
dummyDF
.select(to_json(statsCollection.statsCollector))
.queryExecution
.analyzed
.expressions
.head
}

Some(
new DeltaJobStatisticsTracker(
sparkSession.sessionState.newHadoopConf(),
outputPath,
statsDataSchema,
statsColExpr))
} else {
None
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BlockWriterTest extends AnyFlatSpec with Matchers with QbeastIntegrationTe
}
.toSet

files.map(_.tags(TagUtils.cube)).forall(cube => cubes.contains(cube)) shouldBe true
files.map(_._1.tags(TagUtils.cube)).forall(cube => cubes.contains(cube)) shouldBe true
}

it should "work with empty partitions" in withSparkAndTmpDir { (spark, tmpDir) =>
Expand Down
Loading