Skip to content

Commit

Permalink
Move abort logic out of inner loop.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Aug 30, 2016
1 parent f51111c commit 5d20b44
Showing 1 changed file with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,6 @@ private[spark] class TaskSchedulerImpl(
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
if (availableCpus.sum == 0) {
return false
}
var launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
Expand All @@ -281,9 +278,6 @@ private[spark] class TaskSchedulerImpl(
}
}
}
if (!launchedTask) {
taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
}
return launchedTask
}

Expand Down Expand Up @@ -329,12 +323,19 @@ private[spark] class TaskSchedulerImpl(
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtMaxLocality = false
for (maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTaskAtMaxLocality = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
launchedAnyTask |= launchedTaskAtMaxLocality
} while (launchedTaskAtMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
}
}

if (tasks.size > 0) {
Expand Down

0 comments on commit 5d20b44

Please sign in to comment.