-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19326] Speculated task attempts do not get launched in few scenarios #18492
[SPARK-19326] Speculated task attempts do not get launched in few scenarios #18492
Conversation
@tejasapatil Do you want to look at this PR? Thanks! |
} | ||
|
||
@Override | ||
public void onExtraExecutorNeeded() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing something obvious, but what's use of this except for adding some log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When there is one executor left but with multiple cpu cores, the task is running on that executor. But due to locality requirements, speculative jobs cannot launch on the same host. We will have to request for one extra executor. Thats is what this event is for.
@@ -572,20 +572,35 @@ private[spark] class ExecutorAllocationManager( | |||
} | |||
|
|||
/** | |||
* Callback invokded when an extra executor is needed (See SPARK-19326) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: invokded
-> invoked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
@@ -281,6 +281,20 @@ class DAGScheduler( | |||
eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception)) | |||
} | |||
|
|||
/** | |||
* Called by the TaskSetManager when it needs a speculative task is needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: needs
-> decides
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
* Callback invoked when an extra executor is needed (See SPARK-19326) | ||
*/ | ||
private def onExtraExecutorNeeded(): Unit = synchronized { | ||
val maxNeeded = math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps what we need is just to ensure there are more than two active executors left, so we may meet the locality requirement and launch the speculative jobs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW how do we ensure we run new executors on a different host?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we cannot just ensure there are more than two active executors left. it will depends on if there is any speculative jobs not launched.
hostToLocalTaskCount will make the new executors request satisfies the locality requirement. Also, on the other hand, if even the new executor was not guaranteed to be different host, it will idle and then die. and the speculative task will request another executor.
ok to test |
Test build #80022 has finished for PR 18492 at commit
|
private var numRunningTasks: Int = _ | ||
|
||
private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at the first glance I thought stageIdToNumSpeculativeTasks
is just stageIdToSpeculativeTaskIndices.mapValues(_.size)
, but seems it's not true, can you add some comments to explain these 2 variables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent | ||
|
||
@DeveloperApi | ||
case class SparkListenerExtraExecutorNeeded() extends SparkListenerEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case object wont compile.
|
||
@Override | ||
public void onExtraExecutorNeeded() { | ||
onEvent(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkFirehoseListener
is a public API and users may assume onEvent
will never accept null, how about we just do nothing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. updated.
Test build #80378 has finished for PR 18492 at commit
|
Test build #80380 has finished for PR 18492 at commit
|
Jenkins test this again please. |
retest this please |
Test build #80387 has finished for PR 18492 at commit
|
/** | ||
* Called when an extra executor is needed | ||
*/ | ||
def onExtraExecutorNeeded(): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm hesitant to add this. SparkListener
is public and should be the interface to listen to various Spark internal events, and do whatever they want. However, onExtraExecutorNeeded
sounds like something Spark asks the listener to do, which is against the pattern.
In another word, onExtraExecutorNeeded
looks specific to the executor allocation manager, but not a general spark event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you know how executor allocation manager adjust the number of executors currently? can we follow it instead of hacking the SparkListener
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan after thoughts, yes, I think we can get rid of extraExecutorNeeded event and handle it in ExecutorAllocationManager.scala.
ping @janewangfb |
Test build #80714 has finished for PR 18492 at commit
|
Test build #80712 has finished for PR 18492 at commit
|
retest this please |
Test build #80742 has finished for PR 18492 at commit
|
@@ -291,6 +294,11 @@ private[spark] trait SparkListenerInterface { | |||
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit | |||
|
|||
/** | |||
* Called when a speculative task is submitted | |||
*/ | |||
def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For normal task, we have onTaskStart
, onTaskEnd
, etc. but don't have onTaskSubmitted
. Shall we make the name consistent for speculative task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep the name onSpeculativeTaskSubmitted. Because when the event happens, it only submit a speculative task to be launched in the future, the task has not started yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh i see. So we don't track the submission of normal tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I grepped, and dont think we have events related to normal task submitted.
@@ -980,10 +1014,12 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { | |||
taskLocalityPreferences = taskLocalityPreferences) | |||
} | |||
|
|||
private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = { | |||
new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative = false) | |||
private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
private def xxxx(
para1: XX,
para2: XX)
4 spaces indention for the parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
LGTM except 2 minor comments, thanks for working on it! |
Test build #80791 has finished for PR 18492 at commit
|
LGTM |
assert(addExecutors(manager) === 1) | ||
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) | ||
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) | ||
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the stage submitted event is posted after speculative task submitted event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In real life, it is possible that a job has multiple stages, one stage is still running some tasks but the next stage starts already. This test tries to micmic.
val manager = sc.executorAllocationManager.get | ||
|
||
// Verify that we're capped at number of tasks including the speculative ones in the stage | ||
sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a possible case? the first event is speculative task submitted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is not likely. Speculative job is only submitted when certain percentage of jobs have finished successfully.
assert(numExecutorsTarget(manager) === 5) | ||
assert(numExecutorsToAdd(manager) === 1) | ||
|
||
// Verify that running a task doesn't affect the target |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain more about this test? Why the first 3 SparkListenerSpeculativeTaskSubmitted
events can trigger to allocate more executors, but here we don't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is because we use the sum of (running + appending) jobs to calculate how many executors are needed (maxNumExecutorsNeeded). so wether a task is pending or running, the executors needed are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then why speculative task submission adds running/appending jobs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
speculative task is also a task which needs executor to execute it, so, when we calculate how many executors are needed, we need to count the speculative tasks.
thanks, merging to master! |
// Check if there is any speculative jobs pending | ||
if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) { | ||
numExecutorsTarget = | ||
math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@janewangfb Would you please explain why here + 1
if there's pending speculativeTasks, should the number of executors be calculated based on the number of pending tasks? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
maxNumExecutorsNeeded + 1
doesn't quite make sense.
@janewangfb could you please post/update some comments here? And I wonder why we didn't take pendingSpeculativeTasks
into account when calculating maxNumExecutorsNeeded()
Or @jerryshao, do you know the rationale?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also confused by +1
here. And I think we have already take pendingSpeculativeTasks
into account @advancedxy :
def totalPendingTasks(): Int = {
pendingTasks + pendingSpeculativeTasks
}
Seems this check is redundant.
And it doesn't sync to CM if numExecutorsTarget
change(after +1
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@janewangfb @cloud-fan Sorry, I realize this is very old PR but found it because I was confused by this logic as well, is there a reason we are adding 1 here with speculative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't remember the details as it's too old. But when I look at it again now, this looks a mistake to me: the +1
seems to try to match the numExecutorsToAdd = 1
in the previous code. However, numExecutorsToAdd = 1
doesn't mean we want to allocate one more executor right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, filed https://issues.apache.org/jira/browse/SPARK-28403
What changes were proposed in this pull request?
Add a new listener event when a speculative task is created and notify it to ExecutorAllocationManager for requesting more executor.
How was this patch tested?
val n = 100
val someRDD = sc.parallelize(1 to n, n)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 1) {
Thread.sleep(Long.MaxValue) // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
With this code change, spark indicates 101 jobs are running (99 succeeded, 2 running and 1 is speculative job)