-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-14649][CORE] DagScheduler should not run duplicate tasks on fe… #17297
Conversation
Test build #74558 has finished for PR 17297 at commit
|
cc - @kayousterhout - Addressed your earlier comment about #12436 ignoring fetch failure from stale map output. I have addressed this issue by adding epoch for each map output registered, that way if the task's epoch is smaller than the epoch of the map output, we can ignore the fetch failure. This also takes care of epoch changes which will be triggered due to executor loss for a shuffle task when its shuffle map task executor is gone as pointed out by @mridulm. Let me know what you think of the approach. |
e5429d3
to
279b09a
Compare
Test build #74560 has finished for PR 17297 at commit
|
f127150
to
0bcc69a
Compare
Test build #74562 has finished for PR 17297 at commit
|
Test build #74566 has finished for PR 17297 at commit
|
@sitalkedia I won't have time to review this in detail for at least a few weeks, just so you know (although others may have time to review / merge it). At a very high level, I'm concerned about the amount of complexity that this adds to the scheduler code. We've recently had to deal with a number of subtle bugs with jobs hanging or Spark crashing as a result of trying to handle map output from old tasks. As a result, I'm hesitant to add more complexity -- and the associated risk of bugs that cause job failures + expense of maintaining the code -- to improve performance. At the point I'd lean towards cancelling outstanding map tasks when a fetch failure occurs (there's currently a TODO in the code to do this) to simplify these issues. This would improve performance in some ways, by freeing up slots that could be used for something else, at the expense of wasted work if the tasks have already made significant progress. But it would significantly simplify the scheduler code, which given the debugging + reviewer time that has gone into fixing subtle issues with this code path, I think is worthwhile. Curious what other folks think here. |
@kayousterhout - I understand your concern and I agree that canceling the running tasks is definitely a simpler approach, but this is very inefficient for large jobs where tasks can run for hours. In our environment where fetch failures are common, this change will not only improve the performance of the jobs in case of fetch failure, this also helps reliability. If we cancel all running reducers, we might end of in a state where jobs will not make any progress at all in case of frequent fetch failure, because they will just flip-flop between two stage. Comparing this approach to how Hadoop handles fetch failure, it does not fail any reducer in case it detects any map output missing. The reducers just continue processing output from other mappers while the missing output is being recomputed concurrently. This approach give Hadoop a big edge over Spark for long running jobs with multiple fetch failure. This change is one step towards making Spark robust against fetch failure, we would eventually want to have the hadoop model, where we would not fail any task in case of map output missing. Regarding the approach, please let me know if you can think of some way to reduce the complexity of this change. cc -@markhamstra, @rxin, @sameeragarwal |
@@ -193,13 +193,6 @@ private[spark] class TaskSchedulerImpl private[scheduler]( | |||
val stageTaskSets = | |||
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) | |||
stageTaskSets(taskSet.stageAttemptId) = manager | |||
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please note that this check is not needed anymore because the DagScheduler already keeps track of running tasks and does not submit duplicate tasks anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, that is not really the point of this check. Its just checking if one stage has two tasksets (aka stage attempts), where both are in the "non-zombie" state. It doesn't do any checks at all on what tasks are actually in those tasksets.
This is just checking an invariant which we believe to always be true, but we figure its better to fail-fast if we hit this condition, rather than proceed with some inconsistent state. This check was added because behavior gets really confusing when the invariant is violated, and though we think it should always be true, we've still hit cases where it happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito - That's correct, this is checking that we should not have more than one non-zombie attempts of a stage running. But in the scenario in (d) you described below, we will end up having more than two non-zombie attempts.
However, my point is there is no reason we should not allow multiple concurrent attempts of a stage to run, the only thing we should guarantee is we are running mutually exclusive tasks in those attempts. With this change, since the dag scheduler already keeps track of submitted/running tasks, it can guarantee that it will not resubmit duplicate tasks for a stage.
Test build #74631 has finished for PR 17297 at commit
|
I'm a bit confused by the description:
this is already true. when there is a fetch failure, the TaskSetManager is marked as zombie, and the DAGScheduler resubmits stages, but nothing actively kills running tasks.
I don't think its true that it relaunches all tasks that hadn't completed when the fetch failure occurred. it relaunches all the tasks haven't completed, by the time the stage gets resubmitted. More tasks can complete in between the time of the first failure, and the time the stage is resubmitted. But there are several other potential issues you may be trying to address. Say there is stage 0 and stage 1, each one has 10 tasks. Stage 0 completes fine on the first attempt, then stage 1 starts. Tasks 0 & 1 in stage 1 complete, but then there is a fetch failure in task 2. Lets also say we have an abundance of cluster resources so tasks 3 - 9 from stage 1, attempt 0 are still running. Stage 0 get resubmitted as attempt 1, just to regenerate the map output for whatever executor had the data for the fetch failure -- perhaps its just one task from stage 0 that needs to resubmitted. Now, lots of different scenarios are possible: (a) Tasks 3 - 9 from stage 1 attempt 0 all finish successfully while stage 0 attempt 1 is running. So when stage 0 attempt 1 finishes, then stage 1 attempt 1 is submitted, just with Task 2. If it completely succesfully, we're done (no wasted work). (b) stage 0 attempt 1 finishes, before tasks 3 - 9 from stage 1 attempt 0 have finished. So stage 1 gets submitted again as stage 1 attempt 1, with tasks 2 - 9. So there are now two copies running for tasks 3 - 9. Maybe all the tasks from attempt 0 actually finish shortly after attempt 1 starts. In this case, the stage is complete as soon as there is one complete attempt for each task. But even after the stage completes successfully, all the other tasks keep running anyway. (plenty of wasted work) (c) like (b), but shortly after stage 1 attempt 1 is submitted, we get another fetch failure in one of the old "zombie" tasks from stage 1 attempt 0. But the DAGScheduler realizes it already has a more recent attempt for this stage, so it ignores the fetch failure. All the other tasks keep running as usual. If there aren't any other issues, the stage completes when there is one completed attempt for each task. (same amount of wasted work as (b)). (d) While stage 0 attempt 1 is running, we get another fetch failure from stage 1 attempt 0, say in Task 3, which has a failure from a different executor. Maybe its from a completely different host (just by chance, or there may be cluster maintenance where multiple hosts are serviced at once); or maybe its from another executor on the same host (at least, until we do something about your other pr on unregistering all shuffle files on a host). To be honest, I don't understand how things work in this scenario. We mark stage 0 as failed, we unregister some shuffle output, and we resubmit stage 0. But stage 0 attempt 1 is still running, so I would have expected us to end up with conflicting task sets. Whatever the real behavior is here, it seems we're at risk of having even more duplicated work for yet another attempt for stage 1. etc. So I think in (b) and (c), you are trying to avoid resubmitting tasks 3-9 on stage 1 attempt 1. the thing is, there is a strong reason to believe that the original version of those tasks will fail. Most likely, those tasks needs map output from the same executor that caused the first fetch failure. So Kay is suggesting that we take the opposite approach, and instead actively kill the tasks from stage 1 attempt 0. OTOH, its possible that (i) the issue may have been transient or (ii) the tasks already finished fetching that data before the error occurred. We really have no idea. |
Thanks a lot @squito for taking a look at it and for your feedback.
That is true but currently the DAG scheduler has no idea about which tasks are running and which are being aborted. With this change, the task set manager informs the dag scheduler about currently running/aborted tasks so that the DAG scheduler can avoid resubmitting duplicates.
Yes that's true. I will update the PR description.
In our case, we are observing that any transient issue on the shuffle service might cause few tasks to fail. While other reducers might not see the fetch failure because either they already fetched the data from that shuffle service or they are yet to fetch it. Killing all the reducers in those cases is waste of a lot of work and also as I mentioned above, we might end of in a state where jobs will not make any progress at all in case of frequent fetch failure, because they will just flip-flop between two stage. |
Actually, I realized that it's not true. If you looked at the code (https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1419), when the stage fails because of fetch failure, we remove the stage from the output commiter. So if any task completes between the time of first fetch failure and the time stage is resubmitted, will be denied to commit the output and so the scheduler re-launches all tasks in the stage with the fetch failure that hadn't completed when the fetch failure occurred. |
oh, that is a great point. I was mostly thinking of another shufflemapstage, where that wouldn't matter, but if its a result stage which needs to commit its output, you are right. |
// It is possible that the map output was regenerated by rerun of the stage and the | ||
// fetch failure is being reported for stale map output. In that case, we should just | ||
// ignore the fetch failure and relaunch the task with latest map output info. | ||
if (epochForMapOutput.nonEmpty && epochForMapOutput.get <= task.epoch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be inclined to do this without the extra binding and get
:
for(epochForMapOutput <- mapOutputTracker.getEpochForMapOutput(shuffleId, mapId) if
epochForMapOutput <= task.epoch) {
// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
}
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
}
}
if (changeEpoch) { | ||
incrementEpoch() | ||
} | ||
mapStatuses.put(shuffleId, statuses.clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the point of moving this?
@@ -378,15 +382,17 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf, | |||
val array = mapStatuses(shuffleId) | |||
array.synchronized { | |||
array(mapId) = status | |||
val epochs = epochForMapStatus.get(shuffleId).get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val epochs = epochForMapStatus(shuffleId)
return Some(epochForMapStatus.get(shuffleId).get(mapId)) | ||
} | ||
None | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, arrayOpt.get != null
isn't necessary since we don't put null
values into mapStatuses
. Second, epochForMapStatus.get(shuffleId).get
is the same as epochForMapStatus(shuffleId)
. Third, I don't like all the explicit get
s,null
checks and the unnecessary non-local return
. To my mind, this is better:
def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = {
for {
mapStatus <- mapStatuses.get(shuffleId).flatMap { mapStatusArray =>
Option(mapStatusArray(mapId))
}
} yield epochForMapStatus(shuffleId)(mapId)
}
for (task <- tasks) { | ||
stage.pendingPartitions -= task.partitionId | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for {
stage <- stageIdToStage.get(stageId)
task <- tasks
} stage.pendingPartitions -= task.partitionId
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() | ||
val missingPartitions = stage.findMissingPartitions() | ||
val partitionsToCompute = | ||
missingPartitions.filter(id => !stage.pendingPartitions.contains(id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missingPartitions.filterNot(stage.pendingPartitions)
901c9bf
to
99b4069
Compare
Test build #75029 has finished for PR 17297 at commit
|
Thanks @markhamstra for review comments, addressed. I also found an issue with my previous implementation that we do not allow task commits from old stage attempts, I fixed that issue as well. |
Test build #75030 has started for PR 17297 at commit |
Test build #75126 has finished for PR 17297 at commit
|
Test build #75124 has finished for PR 17297 at commit
|
Test build #75127 has finished for PR 17297 at commit
|
b179439
to
1e6e88a
Compare
To recap the issue that Imran and I discussed here, I think it can be summarized as follows:
If my description above is correct, then this PR is assuming that scenario A is more likely than scenario B, but it seems to me that these two scenarios are equally likely (in which case this PR provides no net benefit). @sitalkedia what are your thoughts here / did I miss something in my description above? |
@squito - I am not able to reproduce this issue locally. The tests fails with some other issue -
|
@sitalkedia how are you trying to run the test? Works fine for me on my laptop on master. Note that the test is referencing a var which is only defined if "spark.testing" is a system property: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L199 which it is in the sbt and maven build. (maybe doens't work inside an IDE? I'd strongly just using |
Test build #75287 has finished for PR 17297 at commit
|
@squito - I am able to reproduce the issue by running Also, one weird thing is that after adding system.testing property to my Intellij, all test cases succeeds without being stuck :/ . |
@sitalkedia they're in core/target/unit-tests.log Sometimes it's easier to move the logs to the tests (so they show up in-line), which you can do by changing core/src/test/resources/log4j.properties to log to the console instead of to a file. |
@kayousterhout - Both the scenario A and B you described above are likely (it totally depend on the nature of the job and available cluster resources) and you are right that in case of scenario B, this PR will not provide any benefit. I am planning to have a follow up PR to make the fetch failure handling logic better by not failing a task at all. In that case, the reducers can just inform the scheduler of lost map output and can still continue processing other available map outputs while the scheduler concurrently recomputes the lost map output. But that will be a bigger change in the scheduler. |
btw I filed https://issues.apache.org/jira/browse/SPARK-20128 for the test timeout -- fwiw I don't think its a problem w/ the test but a potential real issue with the metrics system, though I don't really understand how it can happen. |
@sitalkedia This change is pretty contentious, there are lot of questions about whether or not this is a good change. I don't think discussing this here in github comments on a PR is the best form. I think of PR comments as being more about code details -- clarity, tests, whether the implementation is correct, etc. But here we're discussing whether the behavior is even desirable, as well as trying to discuss this in relation to other changes. I think a better format would be for you to open a jira and submit a design document (maybe a shared google doc at first), where we can focus more on the desired behavior and consider all the changes, even if the PRs are smaller to make them easier to review. I'm explicitly not making a judgement on whether or not this is a good change. Also I do appreciate you having the code changes ready, as a POC, as that can help folks consider the complexity of the change. But it seems clear to me that first we need to come to a decision about the end goal. Also, assuming we do decide this is desirable behavior, there is also a question about how we can get changes like this in without risking breaking things -- I have started a thread on dev@ related to that topic in general, but we should figure that for these changes in particular as well. @kayousterhout @tgravescs @markhamstra makes sense? |
Sounds good to me. |
Agree sounds good! |
@squito - Sounds good to me, let me compile the list of pain points related to fetch failure we are seeing and also a design doc to have better handling of the issues. |
Agreed. Let's establish what we want to do before trying to discuss the
details of how we are going to do it.
…On Tue, Mar 28, 2017 at 8:17 AM, Imran Rashid ***@***.***> wrote:
@sitalkedia <https://github.com/sitalkedia> This change is pretty
contentious, there are lot of questions about whether or not this is a good
change. I don't think discussing this here in github comments on a PR is
the best form. I think of PR comments as being more about code details --
clarity, tests, whether the implementation is correct, etc. But here we're
discussing whether the behavior is even desirable, as well as trying to
discuss this in relation to other changes. I think a better format would be
for you to open a jira and submit a design document (maybe a shared google
doc at first), where we can focus more on the desired behavior and consider
all the changes, even if the PRs are smaller to make them easier to review.
I'm explicitly *not* making a judgement on whether or not this is a good
change. Also I do appreciate you having the code changes ready, as a POC,
as that can help folks consider the complexity of the change. But it seems
clear to me that first we need to come to a decision about the end goal.
Also, assuming we do decide this is desirable behavior, there is also a
question about how we can get changes like this in without risking breaking
things -- I have started a thread on dev@ related to that topic in
general, but we should figure that for these changes in particular as well.
@kayousterhout <https://github.com/kayousterhout> @tgravescs
<https://github.com/tgravescs> @markhamstra
<https://github.com/markhamstra> makes sense?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#17297 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAZ4-pbaJWHOMCLOB2JZFReBYx0E1xOHks5rqSSTgaJpZM4MdN08>
.
|
Test build #75332 has finished for PR 17297 at commit
|
Test build #75339 has finished for PR 17297 at commit
|
@kayousterhout, @squito - Since we need more discussion on this change over a design doc, I have put out a temporary change (#17485) to kill the running tasks in case of fetch failure. Although this is not ideal but that would be better than current situation. |
Should we temporarily close the PR and wait for the design doc to be finalized? @sitalkedia |
okay, closing the PR. |
What changes were proposed in this pull request?
When a fetch failure occurs, the DAGScheduler re-launches the previous stage (to re-generate output that was missing), and then re-launches all tasks in the stage that haven't completed by the time the stage gets resubmitted (the DAGScheduler re-lanches all of the tasks whose output data is not available -- which is equivalent to the set of tasks that hadn't yet completed). This some times leads to wasteful duplicate task run for the jobs with long running task.
To address the issue following changes have been made.
Dag scheduler maintains a pending task list, which is a list of tasks that have been submitted to the lower-level scheduler and they should not be resubmitted when rerun of the stage.
How was this patch tested?
Added new tests.