Skip to content

Commit

Permalink
add comments and rename variables/functions
Browse files Browse the repository at this point in the history
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
  • Loading branch information
cindyyuanjiang committed Dec 10, 2024
1 parent 45712e2 commit 3786fd9
Showing 1 changed file with 38 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
* @return Option containing a tuple of (min, median, max, sum)
* if there are task updates, or None if no updates exist.
*/
private def getAccumStatisticsInStage(accumInfo: AccumInfo, stageTaskIds: Set[Long]):
private def getAccumInfoStatisticsInStage(accumInfo: AccumInfo, stageTaskIds: Set[Long]):
Option[(Long, Long, Long, Long)] = {
// Filter task updates to only include those matching the stage's task IDs
val filteredTaskUpdates =
Expand Down Expand Up @@ -411,32 +411,47 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
}

/**
* Generate IO-related diagnostic metrics for the SQL plan:
* output rows, scan time, output batches, buffer time, shuffle write time, fetch wait time, GPU
* decode time.
* @return a sequence of IODiagnosticResult
* Generate IO-related diagnostic metrics for the SQL plan. Metrics include:
* - Output rows
* - Scan time
* - Output batches
* - Buffer time
* - Shuffle write time
* - Fetch wait time
* - GPU decode time
*
* @return A sequence of `IODiagnosticResult` objects containing diagnostic metrics.
*/
def generateIODiagnosticAccums(): Seq[IODiagnosticResult] = {
val zeroRecord = StatisticsMetrics.ZERO_RECORD

// Transform the diagnostic metrics map into a sequence of results
IODiagnosticMetricsMap.toSeq.flatMap { case ((sqlId, nodeId, stageIds), sqlAccums) =>
// Process each stage ID and compute diagnostic results
stageIds.split(",").filter(_.nonEmpty).map(_.toInt).flatMap { stageId =>
val nodeName = sqlAccums.head.nodeName
val stageDiagnosticInfo = HashMap.empty[String, StatisticsMetrics]
val stageTaskIds = getStageTaskIds(stageId)
// A mapping from metric name to its statistical results (min, median, max, sum)
val metricNameToStatistics = HashMap.empty[String, StatisticsMetrics]

// Iterate through each IO metric
sqlAccums.foreach { sqlAccum =>
val accumInfo = app.accumManager.accumInfoMap.getOrElse(sqlAccum.accumulatorId,
new AccumInfo(AccumMetaRef(0L, AccumNameRef(""))))
val accumStatistics = getAccumStatisticsInStage(accumInfo, stageTaskIds)

if (accumStatistics.nonEmpty) {
val (min, median, max, sum) = accumStatistics.get
val accumInfo = app.accumManager.accumInfoMap.getOrElse(
sqlAccum.accumulatorId,
new AccumInfo(AccumMetaRef(0L, AccumNameRef("")))
)
// Compute the metric's statistics (min, median, max, sum) for the given stage
val accumInfoStatistics = getAccumInfoStatisticsInStage(accumInfo, stageTaskIds)
// If statistics are available, store the results
if (accumInfoStatistics.nonEmpty) {
val (min, median, max, sum) = accumInfoStatistics.get
val metricName = normalizeToIODiagnosticMetric(sqlAccum.name)
stageDiagnosticInfo(metricName) = StatisticsMetrics(min, median, max, sum)
metricNameToStatistics(metricName) = StatisticsMetrics(min, median, max, sum)
}
}

if (stageDiagnosticInfo.isEmpty) {
if (metricNameToStatistics.isEmpty) {
// metricNameToStatistics is not updated - there is no IO metrics result for this stage
None
} else {
Some(IODiagnosticResult(
Expand All @@ -448,13 +463,13 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
app.stageManager.getDurationById(stageId),
nodeId,
nodeName,
stageDiagnosticInfo.getOrElse(OUTPUT_ROWS_METRIC, zeroRecord),
stageDiagnosticInfo.getOrElse(SCAN_TIME_METRIC, zeroRecord),
stageDiagnosticInfo.getOrElse(OUTPUT_BATCHES_METRIC, zeroRecord),
stageDiagnosticInfo.getOrElse(BUFFER_TIME_METRIC, zeroRecord),
stageDiagnosticInfo.getOrElse(SHUFFLE_WRITE_TIME_METRIC, zeroRecord),
stageDiagnosticInfo.getOrElse(FETCH_WAIT_TIME_METRIC, zeroRecord),
stageDiagnosticInfo.getOrElse(GPU_DECODE_TIME_METRIC, zeroRecord)))
metricNameToStatistics.getOrElse(OUTPUT_ROWS_METRIC, zeroRecord),
metricNameToStatistics.getOrElse(SCAN_TIME_METRIC, zeroRecord),
metricNameToStatistics.getOrElse(OUTPUT_BATCHES_METRIC, zeroRecord),
metricNameToStatistics.getOrElse(BUFFER_TIME_METRIC, zeroRecord),
metricNameToStatistics.getOrElse(SHUFFLE_WRITE_TIME_METRIC, zeroRecord),
metricNameToStatistics.getOrElse(FETCH_WAIT_TIME_METRIC, zeroRecord),
metricNameToStatistics.getOrElse(GPU_DECODE_TIME_METRIC, zeroRecord)))
}
}
}.toSeq
Expand All @@ -474,7 +489,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
app.accumManager.accumInfoMap.flatMap { accumMapEntry =>
val accumInfo = accumMapEntry._2
accumInfo.stageValuesMap.keySet.flatMap( stageId => {
getAccumStatisticsInStage(accumInfo, getStageTaskIds(stageId)).map( stats => {
getAccumInfoStatisticsInStage(accumInfo, getStageTaskIds(stageId)).map( stats => {
val (min, median, max, sum) = stats
// create and reuse AccumProfileResults object to avoid generating extra memory
val accumProfileResults = AccumProfileResults(
Expand All @@ -485,6 +500,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
median,
max,
sum)
// update stageToDiagnosticMetrics mapping if accumInfo is a diagnostic metric
if (accumInfo.infoRef.name.isDiagnosticMetrics()) {
updateStageDiagnosticMetrics(accumProfileResults)
}
Expand Down

0 comments on commit 3786fd9

Please sign in to comment.