Skip to content

Commit

Permalink
Merge remote-tracking branch 'nv/dev' into issue-815-taskUpdate-bilal
Browse files Browse the repository at this point in the history
  • Loading branch information
sayedbilalbari committed Feb 20, 2025
2 parents 77ccac6 + 78cab00 commit c3537e8
Show file tree
Hide file tree
Showing 21 changed files with 947 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,8 @@ package com.nvidia.spark.rapids.tool.planparser
import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.store.{WriteOperationMetaBuilder, WriteOperationMetadataTrait}
import org.apache.spark.sql.rapids.tool.util.StringUtils

case class DataWritingCommandExecParser(
node: SparkPlanGraphNode,
Expand Down Expand Up @@ -106,13 +108,13 @@ object DataWritingCommandExecParser {
// This used for expressions that do not show the format as part of the description.
private val specialWriteFormatMap = Map[String, String](
// if appendDataExecV1 is not deltaLakeProvider, then we want to mark it as unsupported
appendDataExecV1 -> "unknown",
appendDataExecV1 -> StringUtils.UNKNOWN_EXTRACT,
// if overwriteByExprExecV1 is not deltaLakeProvider, then we want to mark it as unsupported
overwriteByExprExecV1 -> "unknown",
overwriteByExprExecV1 -> StringUtils.UNKNOWN_EXTRACT,
// if atomicReplaceTableExec is not deltaLakeProvider, then we want to mark it as unsupported
atomicReplaceTableExec -> "unknown",
atomicReplaceTableExec -> StringUtils.UNKNOWN_EXTRACT,
// if atomicCreateTableExec is not deltaLakeProvider, then we want to mark it as unsupported
atomicCreateTableExec -> "unknown"
atomicCreateTableExec -> StringUtils.UNKNOWN_EXTRACT
)

// Checks whether a node is a write CMD Exec
Expand Down Expand Up @@ -175,4 +177,180 @@ object DataWritingCommandExecParser {
parsedString.split(",")(0) // return third parameter from the input string
}
}

/**
* Extracts metadata information from a write operation node description.
* This method is specifically designed to parse the description of
* `InsertIntoHadoopFsRelationCommand` nodes and extract relevant details
* such as the output path, data format, write mode, catalog information,
* and output columns.
* An example of the pattern is:
* Execute InsertIntoHadoopFsRelationCommand /path/to/warehouse/database/table, false, format,
* [key1=value1, key2=value2], Append, `SparkCatalog`.`database`.`table`, ClassName,
* [outputColumns]
*
* The method performs the following steps:
* — Extracts the output path and data format from the node description.
* — Determines the write mode (e.g., Append, Overwrite) based on specific keywords in the
* description.
* — Extracts catalog information (database and table name) from the output path.
* — Extracts the output columns if available in the description.
* — Builds and returns a `WriteOperationMetadataTrait` object encapsulating the extracted
* metadata.
*
* This method includes error handling to ensure graceful fallback to default values
* (e.g., `UNKNOWN_EXTRACT`) in case of unexpected input or parsing errors.
*
* @param execName The name of the execution command (e.g., `InsertIntoHadoopFsRelationCommand`).
* @param nodeDescr The description of the node, typically containing details about the write
* operation.
* @return A `WriteOperationMetadataTrait` object containing the extracted metadata.
*/
private def extractWriteOpRecord(
execName: String, nodeDescr: String): WriteOperationMetadataTrait = {
// Helper function to extract catalog information (database and table name) from the output
// path.
def extractCatalog(path: String): (String, String) = {
try {
// The location path contains the database and the table as the last 2 entries.
// Example: gs:///path/to/warehouse/database/table
// Split the URI into parts by "/"
val pathParts = path.split("/").filter(_.nonEmpty)
if (pathParts.length >= 2) {
// Extract the last two parts as database and table name
val database = pathParts(pathParts.length - 2)
val tableName = pathParts.last
(database, tableName)
} else {
// If not enough parts, return UNKNOWN_EXTRACT
(StringUtils.UNKNOWN_EXTRACT, StringUtils.UNKNOWN_EXTRACT)
}
} catch {
// Handle any unexpected errors gracefully
case _: Exception => (StringUtils.UNKNOWN_EXTRACT, StringUtils.UNKNOWN_EXTRACT)
}
}

// Helper function to extract the output path and data format from the node description.
def extractPathAndFormat(args: Array[String]): (String, String) = {
// This method expects the arguments to be nodeDescr.split(",", 3)
// `Execute cmd path/to/warehouse/db/table, false, parquet, [write options],.*`.
// — 1st arg is always the cmd followed by the path.
// — 2nd arg is boolean argument that we do not care about.
// — 3rd arg is either the format, or the list of write options.

// Extract the path from the first argument
val path =
args.headOption.map(_.split("\\s+").last.trim).getOrElse(StringUtils.UNKNOWN_EXTRACT)
// Extract the data format from the third argument
val thirdArg = args.lift(2).getOrElse("").trim
val format = if (thirdArg.startsWith("[")) {
// Optional parameter is present in the eventlog. Get the fourth parameter by skipping the
// optional parameter string.
thirdArg.split("(?<=],)")
.map(_.trim).lift(1).getOrElse("").split(",").headOption.getOrElse("").trim
} else {
thirdArg.split(",").headOption.getOrElse("").trim
}
(path, format)
}

// Helper function to determine the write mode (e.g., Append, Overwrite) from the description.
def extractWriteMode(description: String): String = {
val modes = Map(
", Append," -> "Append",
", Overwrite," -> "Overwrite",
", ErrorIfExists," -> "ErrorIfExists",
", Ignore," -> "Ignore"
)
// Match the description against known write modes
modes.collectFirst { case (key, mode) if description.contains(key) => mode }
.getOrElse(StringUtils.UNKNOWN_EXTRACT)
}

// Helper function to extract output columns from the node description.
def extractOutputColumns(description: String): Option[String] = {
// The output columns is found as the last sequence inside a bracket. This method, uses a
// regex to match on string values inside a bracket. Then it picks the last one.
// Use a regular expression to find column definitions enclosed in square brackets.
val columnsRegex = """\[(.*?)\]""".r
columnsRegex.findAllMatchIn(description).map(_.group(1)).toList.lastOption
.map(_.replaceAll(",\\s+", ";")) // Replace commas with semicolons for better readability
}

// Parse the node description into arguments
val splitArgs = nodeDescr.split(",", 3)

// Extract the output path and data format
val (path, format) = extractPathAndFormat(splitArgs)

// Extract the write mode (e.g., Append, Overwrite)
val writeMode = extractWriteMode(nodeDescr)

// Extract catalog information (database and table name) from the output path
val (catalogDB, catalogTable) = extractCatalog(path)

// Extract the output columns, if available
val outColumns = extractOutputColumns(nodeDescr)

// Build and return the metadata object encapsulating all extracted information
WriteOperationMetaBuilder.build(
execName = execName,
dataFormat = format,
outputPath = Option(path),
outputColumns = outColumns,
writeMode = writeMode,
tableName = catalogTable,
dataBaseName = catalogDB,
fullDescr = Some(nodeDescr)
)
}

/**
* Extracts metadata information from a given SparkPlanGraphNode representing a write operation.
*
* This method determines the type of write operation (e.g., Delta Lake or other supported
* commands) and extracts relevant metadata such as execution name, data format, output path,
* write mode, and catalog information. It uses helper methods to parse the node description
* and build a metadata object encapsulating the extracted details.
*
* The method performs the following steps:
* 1. Retrieves the node description from the provided SparkPlanGraphNode.
* 2. Checks if the node is a Delta Lake write operation using DeltaLakeHelper.
* 3. Retrieves the appropriate command wrapper (logical or physical) for the node.
* 4. If the command is `InsertIntoHadoopFsRelationCommand`, it invokes a specialized method
* `extractWriteOpRecord` to extract detailed metadata.
* 5. For other commands, it builds a metadata object using the command wrapper's information.
* 6. If no command wrapper is found, it falls back to building a metadata object with minimal
* information.
*
* @param node The SparkPlanGraphNode representing the write operation.
* @return A WriteOperationMetadataTrait object containing the extracted metadata.
*/
def getWriteOpMetaFromNode(node: SparkPlanGraphNode): WriteOperationMetadataTrait = {
// Determine the appropriate command wrapper based on whether the node is a Delta Lake write
// operation.
val cmdWrapper = if (DeltaLakeHelper.acceptsWriteOp(node)) {
DeltaLakeHelper.getWriteCMDWrapper(node)
} else {
getWriteCMDWrapper(node)
}
// Process the command wrapper to extract metadata
cmdWrapper match {
case Some(cmdWrapper) =>
// If the command is InsertIntoHadoopFsRelationCommand, extract detailed metadata.
if (cmdWrapper.execName == DataWritingCommandExecParser.insertIntoHadoopCMD) {
extractWriteOpRecord(cmdWrapper.execName, node.desc)
} else {
// For other commands, build metadata using the command wrapper information.
WriteOperationMetaBuilder.build(
execName = cmdWrapper.execName,
dataFormat = cmdWrapper.dataFormat,
fullDescr = Some(node.desc))
}
case _ =>
// No command wrapper is found, build metadata with minimal information.
WriteOperationMetaBuilder.buildNoMeta(Some(node.desc))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.{SparkPlanGraphCluster, SparkPlanGraphNode}
import org.apache.spark.sql.rapids.tool.SqlPlanInfoGraphEntry
import org.apache.spark.sql.rapids.tool.util.StringUtils

// A class used to handle the DL writeOps such as:
// - AppendDataExecV1
Expand Down Expand Up @@ -135,8 +136,6 @@ object DeltaLakeHelper {
def parseNode(node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long): ExecInfo = {
val opExec = new DLWriteWithFormatAndSchemaParser(node, checker, sqlID)
opExec.parse
node match {
case n if acceptsWriteOp(n) =>
val opExec = new DLWriteWithFormatAndSchemaParser(node, checker, sqlID)
Expand All @@ -145,6 +144,25 @@ object DeltaLakeHelper {
}
}

/**
* Get the write command wrapper for the given node deltaLake exec node.
* This method should be called only if the node passes the `acceptsWriteOp` check.
* @param node the deltaLake write exec
* @return the write command wrapper
*/
def getWriteCMDWrapper(node: SparkPlanGraphNode): Option[DataWritingCmdWrapper] = {
val wcmd = exclusiveDeltaExecs.find(node.name.contains(_)) match {
case Some(cmd) => cmd
case _ =>
deltaExecsFromSpark.find(node.name.contains(_)) match {
case Some(cmd) => cmd
case _ => StringUtils.UNKNOWN_EXTRACT
}
}
// The format must be delta
Some(DataWritingCmdWrapper(wcmd, DataWritingCommandExecParser.dataWriteCMD, getWriteFormat))
}

// Kept for future use if we find that SerDe library can be used to deduce any information to
// reflect on the support of the Op
def getSerdeLibrary(nodeDesc: String): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.tool.planparser

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.util.EventUtils
import org.apache.spark.sql.rapids.tool.util.{EventUtils, StringUtils}

// A wrapper class to map between
case class HiveScanSerdeClasses(className: String, format: String) extends Logging {
Expand Down Expand Up @@ -68,7 +68,7 @@ object HiveParseHelper extends Logging {
}

def getHiveFormatFromSimpleStr(str: String): String = {
LOADED_SERDE_CLASSES.find(_.accepts(str)).map(_.format).getOrElse("unknown")
LOADED_SERDE_CLASSES.find(_.accepts(str)).map(_.format).getOrElse(StringUtils.UNKNOWN_EXTRACT)
}

// Given a "scan hive" NodeGraph, construct the MetaData based on the SerDe class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.util.StringUtils

case class ReadMetaData(schema: String, location: String, format: String,
tags: Map[String, String] = ReadParser.DEFAULT_METAFIELD_MAP) {
Expand Down Expand Up @@ -60,7 +61,7 @@ object ReadParser extends Logging {
val METAFIELD_TAG_FORMAT = "Format"
val METAFIELD_TAG_LOCATION = "Location"

val UNKNOWN_METAFIELD: String = "unknown"
val UNKNOWN_METAFIELD: String = StringUtils.UNKNOWN_EXTRACT
val DEFAULT_METAFIELD_MAP: Map[String, String] = collection.immutable.Map(
METAFIELD_TAG_DATA_FILTERS -> UNKNOWN_METAFIELD,
METAFIELD_TAG_PUSHED_FILTERS -> UNKNOWN_METAFIELD,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.sql.rapids.tool.util.StringUtils

class OperatorRefBase(val value: String, val opType: OpTypes.OpType) extends OperatorRefTrait {
// Preformatted values for CSV output to avoid reformatting multiple times.
val csvValue: String = StringUtils.reformatCSVString(value)
val csvOpType: String = StringUtils.reformatCSVString(opType.toString)
lazy val csvValue: String = StringUtils.reformatCSVString(value)
lazy val csvOpType: String = StringUtils.reformatCSVString(opType.toString)

override def getOpName: String = value
override def getOpNameCSV: String = csvValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.SparkRapidsBuildInfoEvent
import com.nvidia.spark.rapids.tool.AppSummaryInfoBaseProvider
import com.nvidia.spark.rapids.tool.views.WriteOpProfileResult

case class ApplicationSummaryInfo(
appInfo: Seq[AppInfoProfileResults],
Expand Down Expand Up @@ -47,7 +48,8 @@ case class ApplicationSummaryInfo(
ioMetrics: Seq[IOAnalysisProfileResult],
sysProps: Seq[RapidsPropertyProfileResult],
sqlCleanedAlignedIds: Seq[SQLCleanAndAlignIdsProfileResult],
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent])
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent],
writeOpsInfo: Seq[WriteOpProfileResult])

trait AppInfoPropertyGetter {
// returns all the properties (i.e., spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
ProfDataSourceView.getRawView(apps, cachedSqlAccum)
}

// get the write records information
def getWriteOperationInfo: Seq[WriteOpProfileResult] = {
ProfWriteOpsView.getRawView(apps)
}

// get executor related information
def getExecutorInfo: Seq[ExecutorInfoProfileResult] = {
ProfExecutorView.getRawView(apps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,15 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
val endTime = System.currentTimeMillis()
logInfo(s"Took ${endTime - startTime}ms to Process [${appInfo.head.appId}]")
(ApplicationSummaryInfo(appInfo, dsInfo,
collect.getExecutorInfo, collect.getJobInfo, rapidsProps,
rapidsJar, sqlMetrics, stageMetrics, analysis.jobAggs, analysis.stageAggs,
analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo,
appLogPath, analysis.ioAggs, systemProps, sqlIdAlign, sparkRapidsBuildInfo),
compareRes, DiagnosticSummaryInfo(analysis.stageDiagnostics, collect.getIODiagnosticMetrics))
collect.getExecutorInfo, collect.getJobInfo, rapidsProps,
rapidsJar, sqlMetrics, stageMetrics, analysis.jobAggs, analysis.stageAggs,
analysis.sqlAggs, analysis.sqlDurAggs, analysis.taskShuffleSkew,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, collect.getSQLToStage, wholeStage, maxTaskInputInfo,
appLogPath, analysis.ioAggs, systemProps, sqlIdAlign, sparkRapidsBuildInfo,
collect.getWriteOperationInfo),
compareRes,
DiagnosticSummaryInfo(analysis.stageDiagnostics, collect.getIODiagnosticMetrics))
}

/**
Expand Down Expand Up @@ -502,7 +504,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
appsSum.flatMap(_.ioMetrics).sortBy(_.appIndex),
combineProps("system", appsSum).sortBy(_.key),
appsSum.flatMap(_.sqlCleanedAlignedIds).sortBy(_.appIndex),
appsSum.flatMap(_.sparkRapidsBuildInfo)
appsSum.flatMap(_.sparkRapidsBuildInfo),
appsSum.flatMap(_.writeOpsInfo).sortBy(_.appIndex)
)
Seq(reduced)
} else {
Expand Down Expand Up @@ -546,6 +549,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
Some(AGG_DESCRIPTION(SQL_AGG_LABEL)))
profileOutputWriter.write(IO_LABEL, app.ioMetrics)
profileOutputWriter.write(SQL_DUR_LABEL, app.durAndCpuMet)
// writeOps are generated in only CSV format
profileOutputWriter.writeCSVTable(ProfWriteOpsView.getLabel, app.writeOpsInfo)
val skewHeader = TASK_SHUFFLE_SKEW
val skewTableDesc = AGG_DESCRIPTION(TASK_SHUFFLE_SKEW)
profileOutputWriter.write(skewHeader, app.skewInfo, tableDesc = Some(skewTableDesc))
Expand Down
Loading

0 comments on commit c3537e8

Please sign in to comment.