Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds filter for failed and non completed stages #1558

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from

Conversation

sayedbilalbari
Copy link
Collaborator

Fixes #1552

Currently we store the stageInfo using the stageModelManager class where we map incoming stage information during the following events -

  1. doSparkListenerStageCompleted
  2. doSparkListenerStageSubmitted. -

So a stage information is updated once when a stage is submitted and once during completion. A stageCompleted event comes for an attempt for a failed stage as well ( eg - there will be two stage Submitted and StageCompleted events for stage that fails on first attempt and succeeds on attempt 2)
Both those stageInfo objects are updated in the map
This PR adds extra filters to calculate aggregate metrics only for stages that have not failed and have completed successfully.

Changes -

  1. Add a new method - getAllCompletedStages for filtering StageInfo objects on being successful and completed
  2. Changes previous usages of getAllStages to getAllCompletedStages

Effects -

This change now ensures that all stage level aggregate metrics generated only take into account the successful ( not failed and completed ) ones

Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you see any change in the output as an impact of that change? If you a diff between the outputs of before/after is there any change in the metrics?

@@ -321,7 +321,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
val emptyNodeNames = Seq.empty[String]
val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults]
// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
app.stageManager.getAllCompletedStages.map { sm =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the Todo? If not we can replace it with explanation about why we are using getAllCompleteStages

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed the TODO

@@ -321,7 +321,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
val emptyNodeNames = Seq.empty[String]
val emptyDiagnosticMetrics = HashMap.empty[String, AccumProfileResults]
// TODO: this has stage attempts. we should handle different attempts
app.stageManager.getAllStages.map { sm =>
app.stageManager.getAllCompletedStages.map { sm =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is tricky part in this: inside the loop, we retrieve the tasks belonging to a certain stage. If a stage is still incomplete some of its tasks could be already there. right?
I am comparing this to the other behavior (BehaviorB) which is to get the most representative attempt among a stageID Vs getting only successful attempts.
For example, in behaviorB we will have the tasks of an incomplete stage given that it did not fail and there is no other attempt that has succeeded.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • So as per my understanding, do we just want the successful attempts of a stage in the diagnostics vs attempts that were running and not completed in case there is no successful attempt of a stage
  • The above behavior can be discussed as it warrants the understanding whether qualX or downstream wants this in the output. If yes, this can be included.
  • Also another point that I came across while debugging, here even for successful stages, there can be tasks that were killed midway ( speculative execution ).
  • These tasks come with accumulables and update the accumulable.
  • Currently these killed task updates are included in the rolling metrics and are part of associated diagnostic metric
  • As per my understanding, a killed task will have a successful attempt coming in but for a true representation for a stage, they should be included in the metric. Question being should they put in a separate category or not ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amahussein Have updated the logic -

  • Updated the current code to accommodate all successful attempts for a stage and aggregate them
  • As for failed/killed tasks, we do accommodate for the accumulables coming in with them.
  • Failed tasks do not come in with accumulables
  • Killed tasks come in with accumulables
  • The logic we follow ->
    • Get all tasks associated with a particular stage attempt ( in this case only the successful ones)
    • This can include tasks which are failed or killed
    • Failed tasks do not contribute to the stage level metrics
    • As for killed tasks, the major metrics that I have seen them contribute are -
      • sw_writeTime
    • executorRuntime
    • jvmGCTime
    • input_bytesRead
    • sw_bytesWritten
    • sw_recordsWritten
    • input_RecordsRead
    • peakExecutionMemory
  • In my understanding, they should ideally be included with the stageAggregated metrics and that is what is happening in the current logic

def hasCompleted: Boolean = {
stageInfo.completionTime.isDefined
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this similar to the logic in Spark code?
We can benefit from having a getState or getStatus similar to what Spark does, then this would be also a column in the final output. Will be helpful for us to troubleshoot the behavior.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[spark] def getStatusString: String = { if (completionTime.isDefined) { if (failureReason.isDefined) { "failed" } else { "succeeded" } } else { "running" } }

This is the logic in spark. A succeeded stage is one for which completionTime is defined and which does not have a failureReason. I have used similar reasoning for filtering completed stages.
There are just 3 statuses - failed, succeeded and running
If has completion time and not failed, then succeeded. Else running

@amahussein amahussein added bug Something isn't working core_tools Scope the core module (scala) labels Feb 21, 2025
@eordentlich
Copy link
Collaborator

@leewyang can you take a look at this wrt qualx?

Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working core_tools Scope the core module (scala)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Aggregate metric per stage is missing filter for stage attempts
3 participants