Skip to content

Commit

Permalink
Merge pull request #139 from osopardo1/137-operation-metrics
Browse files Browse the repository at this point in the history
Add Operation and File Metrics [Delta]
  • Loading branch information
osopardo1 authored Jan 16, 2023
2 parents ff91e4f + c34259d commit 99a0fb3
Show file tree
Hide file tree
Showing 12 changed files with 389 additions and 61 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ For example:
sbt assembly

$SPARK_HOME/bin/spark-shell \
--jars ./target/scala-2.12/qbeast-spark-assembly-0.3.0.jar \
--jars ./target/scala-2.12/qbeast-spark-assembly-0.3.1.jar \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--packages io.delta:delta-core_2.12:1.2.0
```
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Dependencies._
import xerial.sbt.Sonatype._

val mainVersion = "0.3.0"
val mainVersion = "0.3.1"

lazy val qbeastCore = (project in file("core"))
.settings(
Expand Down
12 changes: 6 additions & 6 deletions docs/CloudStorages.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ Amazon Web Services S3 does not work with Hadoop 2.7. For this provider you'll n
$SPARK_HOME/bin/spark-shell \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider \
--packages io.qbeast:qbeast-spark_2.12:0.2.0,\
io.delta:delta-core_2.12:1.0.0,\
--packages io.qbeast:qbeast-spark_2.12:0.3.1,\
io.delta:delta-core_2.12:1.2.0,\
com.amazonaws:aws-java-sdk:1.12.20,\
org.apache.hadoop:hadoop-common:3.2.0,\
org.apache.hadoop:hadoop-client:3.2.0,\
Expand All @@ -46,8 +46,8 @@ $SPARK_HOME/bin/spark-shell \
--conf spark.hadoop.fs.s3a.access.key=${AWS_ACCESS_KEY_ID} \
--conf spark.hadoop.fs.s3a.secret.key=${AWS_SECRET_ACCESS_KEY} \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--packages io.qbeast:qbeast-spark_2.12:0.2.0,\
io.delta:delta-core_2.12:1.0.0,\
--packages io.qbeast:qbeast-spark_2.12:0.3.1,\
io.delta:delta-core_2.12:1.2.0,\
org.apache.hadoop:hadoop-common:3.2.0,\
org.apache.hadoop:hadoop-client:3.2.0,\
org.apache.hadoop:hadoop-aws:3.2.0
Expand All @@ -63,7 +63,7 @@ $SPARK_HOME/bin/spark-shell \
--conf spark.hadoop.fs.azure.account.key.blobqsql.blob.core.windows.net="${AZURE_BLOB_STORAGE_KEY}" \
--conf spark.hadoop.fs.AbstractFileSystem.wasb.impl=org.apache.hadoop.fs.azure.Wasb \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--packages io.qbeast:qbeast-spark_2.12:0.2.0,\
io.delta:delta-core_2.12:1.0.0,\
--packages io.qbeast:qbeast-spark_2.12:0.3.1,\
io.delta:delta-core_2.12:1.2.0,\
org.apache.hadoop:hadoop-azure:3.2.0
```
31 changes: 31 additions & 0 deletions src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.qbeast.spark.delta

import io.qbeast.core.model.{QTableID, RevisionID, TableChanges}
import io.qbeast.spark.delta.writer.StatsTracker.registerStatsTrackers
import io.qbeast.spark.utils.TagColumns
import org.apache.spark.sql.delta.actions.{
Action,
Expand All @@ -14,9 +15,16 @@ import org.apache.spark.sql.delta.actions.{
}
import org.apache.spark.sql.delta.commands.DeltaCommand
import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, DeltaOptions, OptimisticTransaction}
import org.apache.spark.sql.execution.datasources.{
BasicWriteJobStatsTracker,
WriteJobStatsTracker
}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisExceptionFactory, SaveMode, SparkSession}
import org.apache.spark.util.SerializableConfiguration

import scala.collection.mutable.ListBuffer

/**
* DeltaMetadataWriter is in charge of writing data to a table
Expand Down Expand Up @@ -45,10 +53,33 @@ private[delta] case class DeltaMetadataWriter(
DeltaOperations.Write(mode, None, options.replaceWhere, options.userMetadata)
}

/**
* Creates an instance of basic stats tracker on the desired transaction
* @param txn
* @return
*/
private def createStatsTrackers(txn: OptimisticTransaction): Seq[WriteJobStatsTracker] = {
val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
// Create basic stats trackers to add metrics on the Write Operation
val hadoopConf = sparkSession.sessionState.newHadoopConf() // TODO check conf
val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
new SerializableConfiguration(hadoopConf),
BasicWriteJobStatsTracker.metrics)
txn.registerSQLMetrics(sparkSession, basicWriteJobStatsTracker.driverSideMetrics)
statsTrackers.append(basicWriteJobStatsTracker)
statsTrackers
}

def writeWithTransaction(writer: => (TableChanges, Seq[FileAction])): Unit = {
deltaLog.withNewTransaction { txn =>
// Register metrics to use in the Commit Info
val statsTrackers = createStatsTrackers(txn)
registerStatsTrackers(statsTrackers)
// Execute write
val (changes, newFiles) = writer
// Update Qbeast Metadata (replicated set, revision..)
val finalActions = updateMetadata(txn, changes, newFiles)
// Commit the information to the DeltaLog
txn.commit(finalActions, deltaOperation)
}
}
Expand Down
63 changes: 47 additions & 16 deletions src/main/scala/io/qbeast/spark/delta/writer/BlockWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID}
import org.apache.hadoop.mapreduce.TaskType
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.{
OutputWriter,
OutputWriterFactory,
WriteJobStatsTracker,
WriteTaskStatsTracker
}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

Expand All @@ -34,6 +39,7 @@ case class BlockWriter(
schemaIndex: StructType,
factory: OutputWriterFactory,
serConf: SerializableConfiguration,
statsTrackers: Seq[WriteJobStatsTracker],
qbeastColumns: QbeastColumns,
tableChanges: TableChanges)
extends Serializable {
Expand All @@ -44,7 +50,7 @@ case class BlockWriter(
* @param iter iterator of rows
* @return the sequence of files added
*/
def writeRow(iter: Iterator[InternalRow]): Iterator[AddFile] = {
def writeRow(iter: Iterator[InternalRow]): Iterator[(AddFile, TaskStats)] = {
if (!iter.hasNext) {
return Iterator.empty
}
Expand Down Expand Up @@ -74,17 +80,23 @@ case class BlockWriter(
val rowWeight = Weight(row.getInt(qbeastColumns.weightColumnIndex))

// Writing the data in a single file.
blockCtx.writer.write(InternalRow.fromSeq(cleanRow.result()))
val internalRow = InternalRow.fromSeq(cleanRow.result())
blockCtx.writer.write(internalRow)
blockCtx.blockStatsTracker.foreach(
_.newRow(blockCtx.path.toString, internalRow)
) // Update statsTrackers
blocks.updated(cubeId, blockCtx.update(rowWeight))

}
.values
.flatMap {
case BlockContext(blockStats, _, _) if blockStats.elementCount == 0 =>
case BlockContext(blockStats, _, _, _) if blockStats.elementCount == 0 =>
Iterator.empty // Do nothing, this is a empty partition
case BlockContext(
BlockStats(cube, maxWeight, minWeight, state, rowCount),
writer,
path) =>
path,
blockStatsTracker) =>
val tags = Map(
TagUtils.cube -> cube,
TagUtils.minWeight -> minWeight.value.toString,
Expand All @@ -95,19 +107,27 @@ case class BlockWriter(

writer.close()

// Process final stats
blockStatsTracker.foreach(_.closeFile(path.toString))
val endTime = System.currentTimeMillis()
val finalStats = blockStatsTracker.map(_.getFinalStats(endTime))
val taskStats = TaskStats(finalStats, endTime)

// Process file status
val fileStatus = path
.getFileSystem(serConf.value)
.getFileStatus(path)

Iterator(
AddFile(
path = path.getName(),
partitionValues = Map(),
size = fileStatus.getLen,
modificationTime = fileStatus.getModificationTime,
dataChange = true,
stats = "",
tags = tags))
val addFile = AddFile(
path = path.getName(),
partitionValues = Map(),
size = fileStatus.getLen,
modificationTime = fileStatus.getModificationTime,
dataChange = true,
stats = "",
tags = tags)

Iterator((addFile, taskStats))

}
}.toIterator
Expand All @@ -119,14 +139,20 @@ case class BlockWriter(
* @return
*/
private def buildWriter(cubeId: CubeId, state: String, maxWeight: Weight): BlockContext = {
val blockStatsTracker = statsTrackers.map(_.newTaskInstance())
val writtenPath = new Path(dataPath, s"${UUID.randomUUID()}.parquet")
val writer: OutputWriter = factory.newInstance(
writtenPath.toString,
schema,
new TaskAttemptContextImpl(
new JobConf(serConf.value),
new TaskAttemptID("", 0, TaskType.REDUCE, 0, 0)))
BlockContext(BlockStats(cubeId.string, state, maxWeight), writer, writtenPath)
blockStatsTracker.foreach(_.newFile(writtenPath.toString)) // Update stats trackers
BlockContext(
BlockStats(cubeId.string, state, maxWeight),
writer,
writtenPath,
blockStatsTracker)
}

/*
Expand All @@ -136,7 +162,12 @@ case class BlockWriter(
* @param writer an instance of the file writer
* @param path the path of the written file
*/
private case class BlockContext(stats: BlockStats, writer: OutputWriter, path: Path) {
private case class BlockContext(
stats: BlockStats,
writer: OutputWriter,
path: Path,
blockStatsTracker: Seq[WriteTaskStatsTracker])
extends Serializable {

def update(minWeight: Weight): BlockContext =
this.copy(stats = stats.update(minWeight))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@ import io.qbeast.spark.index.QbeastColumns.cubeColumnName
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.qbeast.config.{MAX_FILE_SIZE_COMPACTION, MIN_FILE_SIZE_COMPACTION}
import org.apache.spark.sql.delta.DeltaStatsCollectionUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.delta.actions.FileAction
import org.apache.spark.sql.delta.stats.DeltaFileStatistics
import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, WriteTaskStats}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

import java.net.URI
import scala.collection.parallel.immutable.ParVector

/**
* Spark implementation of the DataWriter interface.
*/
object SparkDeltaDataWriter extends DataWriter[DataFrame, StructType, FileAction] {
object SparkDeltaDataWriter
extends DataWriter[DataFrame, StructType, FileAction]
with DeltaStatsCollectionUtils {

override def write(
tableID: QTableID,
Expand All @@ -35,19 +41,26 @@ object SparkDeltaDataWriter extends DataWriter[DataFrame, StructType, FileAction
val job = Job.getInstance()
val factory = new ParquetFileFormat().prepareWrite(sparkSession, job, Map.empty, schema)
val serConf = new SerializableConfiguration(job.getConfiguration)
val statsTrackers = StatsTracker.getStatsTrackers()

// Get Stats Trackers for each file
val qbeastColumns = QbeastColumns(qbeastData)

val blockWriter =
BlockWriter(
dataPath = tableID.id,
schema = schema,
schemaIndex = qbeastData.schema,
factory = factory,
serConf = serConf,
qbeastColumns = qbeastColumns,
tableChanges = tableChanges)
qbeastData
val dataColumns = qbeastData.schema.map(_.name).filterNot(QbeastColumns.contains)
val cleanedData = qbeastData.selectExpr(dataColumns: _*)
val fileStatsTrackers = getDeltaOptionalTrackers(cleanedData, sparkSession, tableID)

val blockWriter = BlockWriter(
dataPath = tableID.id,
schema = schema,
schemaIndex = qbeastData.schema,
factory = factory,
serConf = serConf,
statsTrackers = statsTrackers ++ fileStatsTrackers,
qbeastColumns = qbeastColumns,
tableChanges = tableChanges)

val finalActionsAndStats = qbeastData
.repartition(col(cubeColumnName))
.queryExecution
.executedPlan
Expand All @@ -56,6 +69,34 @@ object SparkDeltaDataWriter extends DataWriter[DataFrame, StructType, FileAction
.collect()
.toIndexedSeq

val fileActions = finalActionsAndStats.map(_._1)
val stats = finalActionsAndStats.map(_._2)

// Process BasicWriteJobStatsTrackers
var fileWriteTaskStats = Seq.empty[WriteTaskStats]
var basicWriteTaskStats = Seq.empty[WriteTaskStats]
var endTime = 0L
stats.foreach(taskStats => {
fileWriteTaskStats =
fileWriteTaskStats ++ taskStats.writeTaskStats.filter(_.isInstanceOf[DeltaFileStatistics])
basicWriteTaskStats = (basicWriteTaskStats ++
taskStats.writeTaskStats.filter(_.isInstanceOf[BasicWriteTaskStats]))

endTime = math.max(endTime, taskStats.endTime)
})
statsTrackers.foreach(_.processStats(basicWriteTaskStats, endTime))
fileStatsTrackers.foreach(_.processStats(fileWriteTaskStats, endTime))

// Process DeltaWriteStats
val resultFiles = fileActions.map { a =>
a.copy(stats = fileStatsTrackers
.map(_.recordedStats(new Path(new URI(a.path)).toString))
.getOrElse(a.stats))
}

// Return FileAction
resultFiles

}

/**
Expand Down
20 changes: 20 additions & 0 deletions src/main/scala/io/qbeast/spark/delta/writer/StatsTracker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*/
package io.qbeast.spark.delta.writer

import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker, WriteTaskStats}

case class TaskStats(writeTaskStats: Seq[WriteTaskStats], endTime: Long)

object StatsTracker {

private var statsTrackers: Seq[WriteJobStatsTracker] = Seq.empty

def registerStatsTrackers(newStatsTrackers: Seq[WriteJobStatsTracker]): Unit = {
statsTrackers = newStatsTrackers
}

def getStatsTrackers(): Seq[WriteJobStatsTracker] = statsTrackers

}
Loading

0 comments on commit 99a0fb3

Please sign in to comment.