Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Add filtered diagnostic output for GPU slowness in Profiler tool #1548

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from

Conversation

cindyyuanjiang
Copy link
Collaborator

@cindyyuanjiang cindyyuanjiang commented Feb 18, 2025

Contributes to #1374

Changes

  • Add a filter diagnostic (operator and SQL metrics) view in Profiler output: filtered_diagnostic_metrics.csv
    • Introduced function generateFilteredDiagnosticAccums to compute filtered diagnostic metrics in core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala
    • Introduced FilteredAccumDiagnosticMetrics to store selected filtered related metric names and methods
    • Added FilteredDiagnosticResult to represent filtered diagnostic metrics result

Testing

  • Add unit test test filtered diagnostic metrics in core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala

Example Output

appIndex,appName,appId,sqlId,stageId,stageDurationMs,nodeId,nodeName,numFilesReadMin,numFilesReadMedian,numFilesReadMax,numFilesReadTotal,numPartitionsMin,numPartitionsMedian,numPartitionsMax,numPartitionsTotal,metadataTimeMin,metadataTimeMedian,metadataTimeMax,metadataTimeTotal,outputBatchesMin,outputBatchesMedian,outputBatchesMax,outputBatchesTotal,inputBatchesMin,inputBatchesMedian,inputBatchesMax,inputBatchesTotal,outputRowsMin,outputRowsMedian,outputRowsMax,outputRowsTotal,sortTimeMin,sortTimeMedian,sortTimeMax,sortTimeTotal,peakMemoryMin,peakMemoryMedian,peakMemoryMax,peakMemoryTotal,shuffleBytesWrittenMin,shuffleBytesWrittenMedian,shuffleBytesWrittenMax,shuffleBytesWrittenTotal,shuffleWriteTimeMin,shuffleWriteTimeMedian,shuffleWriteTimeMax,shuffleWriteTimeTotal
1,Spark shell,local-1622814619968,0,0,1743,16,"GpuColumnarExchange",0,0,0,0,0,0,0,0,0,0,0,0,200,200,200,1200,0,0,0,0,1666666,1666666,1666667,10000000,0,0,0,0,0,0,0,0,6688608,6688707,6688825,40132250,41434653,66714083,100858775,400284505
1,Spark shell,local-1622814619968,0,0,1743,21,"Scan",0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1666666,1666666,1666667,10000000,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,1,1631,8,"GpuColumnarExchange",0,0,0,0,0,0,0,0,0,0,0,0,200,200,200,1200,0,0,0,0,1666666,1666666,1666667,10000000,0,0,0,0,0,0,0,0,6688602,6688708,6688833,40132258,37444140,84791745,108992798,508750471
1,Spark shell,local-1622814619968,0,1,1631,13,"Scan",0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1666666,1666666,1666667,10000000,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,Spark shell,local-1622814619968,0,2,688,3,"GpuColumnarExchange",0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,200,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,77,77,77,15400,139875,465911,9747416,93193331
1,Spark shell,local-1622814619968,0,2,688,4,"GpuHashAggregate",0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,200,0,0,0,0,1,1,1,200,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0

Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
Signed-off-by: cindyyuanjiang <cindyj@nvidia.com>
@cindyyuanjiang cindyyuanjiang changed the title WIP: [FEA] Add filtered diagnostic output for GPU slowness in Profiler tool [FEA] Add filtered diagnostic output for GPU slowness in Profiler tool Feb 24, 2025
@cindyyuanjiang cindyyuanjiang self-assigned this Feb 24, 2025
@cindyyuanjiang cindyyuanjiang added the feature request New feature or request label Feb 24, 2025
@cindyyuanjiang cindyyuanjiang marked this pull request as ready for review February 24, 2025 06:10
Copy link
Collaborator

@sayedbilalbari sayedbilalbari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few doubts

@@ -72,8 +72,7 @@ object IOAccumDiagnosticMetrics {
)

val SHUFFLE_WRITE_TIME_METRIC_NAMES = Set(
"shuffle write time", // common across all Spark eventlogs
"rs. shuffle write time" // only in GPU eventlogs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has this been removed ?

@@ -115,3 +114,100 @@ object IOAccumDiagnosticMetrics {
metricNamesToKeyMap(metric)
}
}

object FilteredAccumDiagnosticMetrics {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming this is named as FilteredAccumDiagnostic metrics because it represents a subset of metrics that are more relevant to profiler output. The candidates for this being the names mentioned. Is there a doc somewhere highlighting why these operators ?

@@ -351,6 +375,8 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap
updateIODiagnosticMetricsMap(sqlAccumProileResult)
}

accumIdToNodeInfoMap(metric.accumulatorId) = (metric.sqlID, metric.nodeID, metric.nodeName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought, so we iterate over all the SQLMetricInfoCase objects, and using that try to maintain a mapping of accumId -> (sqlID, nodeID, nodeName)
Can there be a case where an entry belonging to the same accumID is overridden in the next iteration. So basically can one accumID belong to multiple combinations of sqlID, nodeID and nodeName combos ?
In my minimal understanding, accumID is agnostic of sql and multiple sqls and nodes should be able to update the same accumulable

val nodeInfoToFilterMetricsMap =
HashMap.empty[(Long, Long, String, Int), HashMap[String, StatisticsMetrics]]

for (stageAccumResult <- filteredStageAccumList) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the logic here to be clear ->

  1. We first get the filteredStageAccumList from the accumStage level aggregated metrics by doing a name match
  2. Then iterate over that list ( granularity is accumMeta +StageID)
  3. For the accumID, we find the sqlID,nodeID, nodeName
  4. Maintain a cache of sqlID, nodeID, nodeName, stageID -> accumName, statistic
  5. In this, can there be multiple accumulable updates coming for the same key as above ? Here the value distinction is correct ( name + statistic) but on the key level there can be multiple accum updates
  6. Also, contextual doubt - all the metrics being filtered, is generating these from a SQL Metric View relevant ? Because as part of some previous discussion, the SQL Metrics sometimes may not be that accurate
  7. So a better way could be getting them from the internal.metrics.* accums. But that is only relevant in case SQL View is not needed by downstream ( otherwise can ignore)

for (stageAccumResult <- filteredStageAccumList) {
val accumId = stageAccumResult.accMetaRef.id
// Retrieve node information if available
accumIdToNodeInfoMap.get(accumId).foreach { case (sqlId, nodeId, nodeName) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to eliminate this new map creation ? So we have accumID, we have the stageID, we can get accum + stage stats from accumManager. and I see a map sqlPlanNodeIdToStageIds above. This implementation is cleaner, but just verifying is this can be avoided and kept clean at the same time would be good

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants