Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14649][CORE] DagScheduler should not run duplicate tasks on fe… #12436

Closed
wants to merge 1 commit into from

Conversation

sitalkedia
Copy link

@sitalkedia sitalkedia commented Apr 15, 2016

What changes were proposed in this pull request?

When a fetch failure occurs, the DAGScheduler re-launches the previous stage (to re-generate output that was missing), and then re-launches all tasks in the stage with the fetch failure that hadn't completed when the fetch failure occurred (the DAGScheduler re-lanches all of the tasks whose output data is not available -- which is equivalent to the set of tasks that hadn't yet completed). This some times leads to wasteful duplicate task run for the jobs with long running task.

To address the issue following changes have been made.

  1. When a fetch failure happens, the task set manager ask the dag scheduler to abort all the non-running tasks. However, the running tasks in the task set are not killed.
  2. When a task is aborted, the dag scheduler adds the task to the pending task list.
  3. In case of resubmission of the stage, the dag scheduler only resubmits the tasks which are in pending stage.

How was this patch tested?

Added a new test case for it and made sure the test case failed without the change.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@sitalkedia
Copy link
Author

sitalkedia commented Apr 18, 2016

@kayousterhout, @markhamstra

@sitalkedia
Copy link
Author

I found a bug in my change and the job was stuck because of that. I am going to fix the issue (with an updated test case to test the scenario) and update the PR.

@@ -1259,11 +1282,6 @@ class DAGScheduler(
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleToMapStage(shuffleId)

if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note that after this change, we can not ignore fetch failure from the previous attempt because we don't have duplicate tasks for fetch failed tasks anymore.

@sitalkedia sitalkedia force-pushed the avoid_duplicate_tasks branch from 272244f to 548212b Compare April 30, 2016 00:46
@sitalkedia
Copy link
Author

Can someone take a look?
cc - @srowen

@srowen
Copy link
Member

srowen commented May 5, 2016

This would need a rebase; is it the same as https://issues.apache.org/jira/browse/SPARK-14915 ?

@sitalkedia sitalkedia force-pushed the avoid_duplicate_tasks branch from 548212b to b8d4e05 Compare May 6, 2016 02:18
@sitalkedia
Copy link
Author

No, this is not the same issue. SPARK-14915 deals with duplicate tasks in case of Speculation. But this change has nothing to do with Speculation. This is fixing the issue of running duplicate tasks in case of fetch failure.

@sitalkedia sitalkedia force-pushed the avoid_duplicate_tasks branch from b8d4e05 to 042fe43 Compare May 11, 2016 14:17
@sitalkedia
Copy link
Author

Found an issue of job being stuck in a corner case, fixed it and added a test case as well.

@kayousterhout
Copy link
Contributor

@sitalkedia can you update this to resolve the merge conflicts?

@sitalkedia
Copy link
Author

@kayousterhout - Sure, I will resolve the conflicts. Can you take a cursory look at the diff and let me know if the approach is reasonable?

@kayousterhout
Copy link
Contributor

@sitalkedia What's the use case for this? In the cases I've seen, if there's one fetch failure, it typically means that a machine that ran a map task has failed / gone done / been revoked by the cluster manager, and as a result, none of the reduce tasks will succeed. As a result, the tasks from the first attempt of the reduce stage fail eventually, because they require the output that's being re-computed in the map phase. Why isn't this happening in the cases you're seeing?

I do think it would be worthwhile to implement the TODO in TaskSetManager.abort (which says we should kill running tasks), which would be a simpler fix to avoid the duplicate tasks (but I'm wondering if there's some reason you're seeing that the still-running tasks might actually succeed?).

@sitalkedia
Copy link
Author

@kayousterhout - Our use case is very large workload on Spark. We are processing around 100TBs of data in a single Spark job with 100k tasks in it (BTW the single threaded DagScheduler is becoming the bottleneck for such large workload). Each individual task can run for more than an hour and the jobs run for more than 10 hours. Seeing few machine reboot during the job run is very common in this case. Now, if we break the task execution time, out of 1 hour we spend around 5 - 10 minutes in fetching the data and rest in actual execution. When a machine goes down, it is very common for us to see few tasks failure who are still in shuffle fetch phase, but other tasks which have already fetched the data are not effected. Thats why we don't want already running tasks to rerun in case of fetch failure.

Again, killing all the tasks in case of fetch failure is also a not good idea because it would just waste a lot of resources.

@sitalkedia
Copy link
Author

ping.

@rxin
Copy link
Contributor

rxin commented Jul 14, 2016

Is the idea here to not rerun jobs that are already running in the case of a fetch failure, because they might finish?

What happens after the change if those tasks end up coming back as failures?

@sitalkedia
Copy link
Author

@rxin - The idea is not to rerun or kill already running tasks in case of fetch failure because they might finish. If those tasks end up failing later, the dag scheduler will rerun them.

@markhamstra
Copy link
Contributor

@sitalkedia
Copy link
Author

ping.

@davies
Copy link
Contributor

davies commented Sep 2, 2016

@sitalkedia Have a quick look at this one, the use case sounds good, we should improve the stability for long running tasks. Could you explain a bit more how the current patch works? (in the PR description).

@sitalkedia
Copy link
Author

@davies - Thanks for looking into this. Updated the PR description with details of the change. Let me know if the approach seem reasonable, I will work on rebasing the change against latest master.

@kayousterhout
Copy link
Contributor

@sitalkedia I was thinking about this over the weekend and I'm not sure this is the right approach. I suspect it might be better to re-use the same task set manager for the new stage. This copying of information is confusing and I'm concerned it will be bug-prone in the future. Did you consider that approach?

Also, separately from what approach is used, how do you deal with the following: suppose map task 1 loses its output (e.g., the reducer where that task is located dies). Now, suppose reduce task A gets a fetch failure for map task 1, triggering map task 1 to be re-run. Meanwhile, reduce task B is still running. Now the re-run map task 1 completes and the scheduler launches the reduce phase again. Suppose after that happens, task B fails (this is the old task B, that started before the fetch failure) because it can't get the data from map task 1, but that's because it still has the old location for map task 1. My understanding is that, with the current code, that would cause the map stage to get re-triggered again, but really, reduce task B should be re-started with the correct location for the output from map 1.

@mridulm
Copy link
Contributor

mridulm commented Oct 5, 2016

I am curious how this is resilient to epoch changes which will be triggered due to executor loss for a shuffle task when its shuffle map task executor is gone.
Wont it not create issues if we are trying to continue to (re)use the earlier stage @rxin @kayousterhout ?

@kayousterhout
Copy link
Contributor

Yeah @mridulm that also seems like an issue with this approach.

@kayousterhout
Copy link
Contributor

@sitalkedia this has been inactive for a while and there were a few issues pointed out above that haven't yet been resolved. Do you have time to work on this? Otherwise, can you close the PR?

@sitalkedia
Copy link
Author

@kayousterhout - Thanks for taking a look at the PR. Currently I don't have time to work on it. I will close the PR and open a new PR with issues addressed.

@sitalkedia sitalkedia closed this Dec 22, 2016
@jisookim0513
Copy link
Contributor

@sitalkedia have you had a chance to work on this issue and open a new PR?

@sitalkedia
Copy link
Author

Also, separately from what approach is used, how do you deal with the following: suppose map task 1 loses its output (e.g., the reducer where that task is located dies). Now, suppose reduce task A gets a fetch failure for map task 1, triggering map task 1 to be re-run. Meanwhile, reduce task B is still running. Now the re-run map task 1 completes and the scheduler launches the reduce phase again. Suppose after that happens, task B fails (this is the old task B, that started before the fetch failure) because it can't get the data from map task 1, but that's because it still has the old location for map task 1. My understanding is that, with the current code, that would cause the map stage to get re-triggered again, but really, reduce task B should be re-started with the correct location for the output from map 1.

@kayousterhout -How do you think we can handle this issue?

@sitalkedia
Copy link
Author

@jisookim0513 - created a new PR - #17297

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants