Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17675] [CORE] Expand Blacklist for TaskSets #15249

Closed
wants to merge 65 commits into from
Closed
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
9a6aaed
enhance blacklist mechanism
wei-mao-intel Dec 29, 2015
5bfe941
Update for new design
squito May 10, 2016
d7adc67
(node,task) blacklisting
squito Jul 7, 2016
a34e9ae
go back to having the blacklist tracker as an option, rather than the…
squito Jul 7, 2016
cf58374
dont count shuffle-fetch failures
squito Jul 7, 2016
7fcb266
make sure we clear the (node, task) blacklist on stage completion, ad…
squito Jul 8, 2016
487eb66
review feedback
squito Jul 11, 2016
c22aaad
fix
squito Jul 12, 2016
dc2b3ed
all taskset specific blacklisting is now in TaskSetManager
squito Jul 13, 2016
338db65
fix
squito Jul 13, 2016
fa3e34a
Merge branch 'master' into blacklist-SPARK-8425
squito Jul 13, 2016
16afb43
review feedback
squito Jul 14, 2016
7aff08a
review feedback
squito Jul 14, 2016
e181546
rename conf
squito Jul 14, 2016
351a9a7
use typed confs consistently
squito Jul 14, 2016
572c777
docs
squito Jul 20, 2016
8cebb01
api simplification
squito Jul 20, 2016
dbf904e
review feedback
squito Jul 20, 2016
f0de0db
fix for config name change
squito Jul 20, 2016
8a12adf
exclude killed tasks and preempted tasks from blacklist
squito Jul 22, 2016
c9e3662
combine imports
squito Jul 26, 2016
497e626
review feedback
squito Aug 11, 2016
515b18a
add task timeouts
squito Aug 18, 2016
f0428b4
separate datastructure to track blacklisted execs in a tsm, to simpli…
squito Aug 18, 2016
a5fbce7
Merge branch 'master' into blacklist-SPARK-8425
squito Aug 18, 2016
b582d8e
fix missing import
squito Aug 18, 2016
cec36c9
fix line wrapping
squito Aug 18, 2016
290b315
fix test by turning off blacklist
squito Aug 18, 2016
8c58ad9
unused import
squito Aug 18, 2016
f012780
review feedback
squito Aug 22, 2016
fc45f5b
fix some typos
squito Aug 22, 2016
f8b1bff
add validation for blacklist confs
squito Aug 22, 2016
e56bb90
update test to turn off blacklist
squito Aug 22, 2016
cc3b968
fix timeout of individual tasks
squito Aug 26, 2016
5fdfe49
simplify task expiry by doing it lazily
squito Aug 26, 2016
e10fa10
review feedback
squito Aug 31, 2016
1297788
Merge branch 'master' into blacklist-SPARK-8425
squito Aug 31, 2016
c78964f
fix bad merge
squito Aug 31, 2016
b679953
Merge branch 'master' into blacklist-SPARK-8425
squito Sep 21, 2016
463b837
more cleanup of TaskEndReason -> TaskFailedReason
squito Sep 21, 2016
9a2cf84
review feedback
squito Sep 21, 2016
d0f43c7
review feedback
squito Sep 21, 2016
cfb653e
Merge branch 'master' into blacklist-SPARK-8425
squito Sep 22, 2016
18ef5c6
review feedback
squito Sep 22, 2016
0c3ceba
pull out TaskSetBlacklist helper
squito Sep 26, 2016
2381b25
oops, put class in the right place
squito Sep 26, 2016
3ca2f79
more refactor for TaskSetBlacklist
squito Sep 26, 2016
27b4bde
fix logging
squito Sep 26, 2016
278fff3
undo some un-intentional changes
squito Sep 26, 2016
882b385
remove app level blacklisting (wip)
squito Sep 26, 2016
21e6789
typos, formatting
squito Sep 27, 2016
9b953ea
review feedback
squito Sep 30, 2016
5568973
make spark.task.maxFailures a proper ConfigEntry
squito Sep 30, 2016
9c9d816
Merge branch 'master' into taskset_blacklist_only
squito Sep 30, 2016
b90930f
Merge branch 'master' into taskset_blacklist_only
squito Oct 4, 2016
ab2ad38
review feedback
squito Oct 5, 2016
89d3c5e
revert changes to the speculative execution test; instead add a seperate
squito Oct 5, 2016
a6c863f
review feedback
squito Oct 5, 2016
9086106
blacklisting off by default; small style changes
squito Oct 5, 2016
bb654bb
minor cleanup
squito Oct 6, 2016
354f36b
remove timeout from this pr, as its only relevant at the app-level
squito Oct 6, 2016
34eff27
review feedback
squito Oct 6, 2016
c805a0b
remove some unnecessary changes
squito Oct 10, 2016
445cc97
Merge branch 'master' into taskset_blacklist_only
squito Oct 12, 2016
4501e6c
fix merge
squito Oct 12, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,9 @@ private[spark] object SparkConf extends Logging {
"Please use spark.kryoserializer.buffer instead. The default value for " +
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
DeprecatedConfig("spark.rpc", "2.0", "Not used any more.")
DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
"Please use the new blacklisting options, spark.blacklist.*")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ case class FetchFailed(
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " +
s"message=\n$message\n)"
}

/**
* Fetch failures lead to a different failure handling path: (1) we don't abort the stage after
* 4 task failures, instead we immediately go back to the stage which generated the map output,
* and regenerate the missing data. (2) we don't count fetch failures for blacklisting, since
* presumably its not the fault of the executor where the task ran, but the executor which
* stored the data. This is especially important because we we might rack up a bunch of
* fetch-failures in rapid succession, on all nodes of the cluster, due to one bad node.
*/
override def countTowardsTaskFailures: Boolean = false
}

/**
Expand Down Expand Up @@ -204,6 +214,7 @@ case object TaskResultLost extends TaskFailedReason {
@DeveloperApi
case object TaskKilled extends TaskFailedReason {
override def toErrorString: String = "TaskKilled (killed intentionally)"
override def countTowardsTaskFailures: Boolean = false
}

/**
Expand Down
45 changes: 45 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.internal

import java.util.concurrent.TimeUnit

import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -91,6 +93,49 @@ package object config {
.toSequence
.createWithDefault(Nil)

private[spark] val MAX_TASK_FAILURES =
ConfigBuilder("spark.task.maxFailures")
.intConf
.createWithDefault(4)

// Blacklist confs
private[spark] val BLACKLIST_ENABLED =
ConfigBuilder("spark.blacklist.enabled")
.booleanConf
.createOptional

private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR =
ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor")
.intConf
.createWithDefault(1)

private[spark] val MAX_TASK_ATTEMPTS_PER_NODE =
ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerNode")
.intConf
.createWithDefault(2)

private[spark] val MAX_FAILURES_PER_EXEC_STAGE =
ConfigBuilder("spark.blacklist.stage.maxFailedTasksPerExecutor")
.intConf
.createWithDefault(2)

private[spark] val MAX_FAILED_EXEC_PER_NODE_STAGE =
ConfigBuilder("spark.blacklist.stage.maxFailedExecutorsPerNode")
.intConf
.createWithDefault(2)

private[spark] val BLACKLIST_TIMEOUT_CONF =
ConfigBuilder("spark.blacklist.timeout")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
.internal()
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
// End blacklist confs

// Note: This is a SQL config but needs to be in core because the REPL depends on it
private[spark] val CATALOG_IMPLEMENTATION = ConfigBuilder("spark.sql.catalogImplementation")
.internal()
Expand Down
114 changes: 114 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: upper case comes before lowercase (so Logging should be before config)

import org.apache.spark.internal.config
import org.apache.spark.util.Utils

private[scheduler] object BlacklistTracker extends Logging {

private val DEFAULT_TIMEOUT = "1h"
Copy link
Contributor

@mridulm mridulm Oct 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the default blacklist timeout for a node/executor (before it is re-enabled) ?
If yes, a bit too high ?
In past, we had observed :
a) If executor or node was going 'down' - a few seconds was sufficient.
b) A few seconds to 10's of seconds is usually enough if the problem is due to memory or disk pressures.

Ofcourse, this was specific to our cluster/jobs :-) Would like to know if the job/cluster characterstics were different for this value (or it is coming from some other expts/config).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(longer top-level comment responding to this)


/**
* Returns true if the blacklist is enabled, based on checking the configuration in the following
* order:
* 1. Is it specifically enabled or disabled?
* 2. Is it enabled via the legacy timeout conf?
* 3. Default is off
*/
def isBlacklistEnabled(conf: SparkConf): Boolean = {
conf.get(config.BLACKLIST_ENABLED) match {
case Some(enabled) =>
enabled
case None =>
// if they've got a non-zero setting for the legacy conf, always enable the blacklist,
// otherwise, use the default.
val legacyKey = config.BLACKLIST_LEGACY_TIMEOUT_CONF.key
conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF).exists { legacyTimeout =>
if (legacyTimeout == 0) {
logWarning(s"Turning off blacklisting due to legacy configuration: $legacyKey == 0")
false
} else {
logWarning(s"Turning on blacklisting due to legacy configuration: $legacyKey > 0")
true
}
}
}
}

def getBlacklistTimeout(conf: SparkConf): Long = {
conf.get(config.BLACKLIST_TIMEOUT_CONF).getOrElse {
conf.get(config.BLACKLIST_LEGACY_TIMEOUT_CONF).getOrElse {
Utils.timeStringAsMs(DEFAULT_TIMEOUT)
}
}
}

/**
* Verify that blacklist configurations are consistent; if not, throw an exception. Should only
* be called if blacklisting is enabled.
*
* The configuration for the blacklist is expected to adhere to a few invariants. Default
* values follow these rules of course, but users may unwittingly change one configuration
* without making the corresponding adjustment elsewhere. This ensures we fail-fast when
* there are such misconfigurations.
*/
def validateBlacklistConfs(conf: SparkConf): Unit = {

def mustBePos(k: String, v: String): Unit = {
throw new IllegalArgumentException(s"$k was $v, but must be > 0.")
}

Seq(
config.MAX_TASK_ATTEMPTS_PER_EXECUTOR,
config.MAX_TASK_ATTEMPTS_PER_NODE,
config.MAX_FAILURES_PER_EXEC_STAGE,
config.MAX_FAILED_EXEC_PER_NODE_STAGE
).foreach { config =>
val v = conf.get(config)
if (v <= 0) {
mustBePos(config.key, v.toString)
}
}

val timeout = getBlacklistTimeout(conf)
if (timeout <= 0) {
// first, figure out where the timeout came from, to include the right conf in the message.
conf.get(config.BLACKLIST_TIMEOUT_CONF) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the duplicate code here and in getBlacklistTimeout makes me a little nervous. Do you think it's too verbose to have a method getBlacklistTimeoutConfigAndValue that returns (config_name, timeout), and then that method could be called here and by getBlacklistTimeout?

If you think that seems like overkill, ignore this comment.

case Some(t) =>
mustBePos(config.BLACKLIST_TIMEOUT_CONF.key, timeout.toString)
case None =>
mustBePos(config.BLACKLIST_LEGACY_TIMEOUT_CONF.key, timeout.toString)
}
}

val maxTaskFailures = conf.get(config.MAX_TASK_FAILURES)
val maxNodeAttempts = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also check that maxNodeAttempts is >= max exec attempts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be OK, actually -- it is the same as turning executor blacklisting off. Gets back to the question of what we think a user might reasonably want.

An alternative would be to add the check, and use -1 as a special value if the user explicitly wants to turn blacklisting off. I dunno if we need to get too fancy in these checks right away, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok cool thanks for the explanation -- that makes sense and seems fine to leave as-is.


if (maxNodeAttempts >= maxTaskFailures) {
throw new IllegalArgumentException(s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key} " +
s"( = ${maxNodeAttempts}) was >= ${config.MAX_TASK_FAILURES.key} " +
s"( = ${maxTaskFailures} ). Though blacklisting is enabled, with this configuration, " +
s"Spark will not be robust to one bad node. Decrease " +
s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.MAX_TASK_FAILURES.key}, " +
s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler

import scala.collection.mutable.HashMap

/**
* Small helper for tracking failed tasks for blacklisting purposes. Info on all failures on one
* executor, within one task set.
*/
private[scheduler] class ExecutorFailuresInTaskSet(val node: String) {
/**
* Mapping from index of the tasks in the taskset, to the number of times it has failed on this
* executor.
*/
val taskToFailureCount = HashMap[Int, Int]()

def updateWithFailure(taskIndex: Int): Unit = {
val prevFailureCount = taskToFailureCount.getOrElse(taskIndex, 0)
taskToFailureCount(taskIndex) = prevFailureCount + 1
}

def numUniqueTasksWithFailures: Int = taskToFailureCount.size

/**
* Return the number of times this executor has failed on the given task index.
*/
def getNumTaskFailures(index: Int): Int = {
taskToFailureCount.getOrElse(index, 0)
}

override def toString(): String = {
s"numUniqueTasksWithFailures = $numUniqueTasksWithFailures; " +
s"tasksToFailureCount = $taskToFailureCount"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import java.util.{Timer, TimerTask}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Random

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.scheduler.local.LocalSchedulerBackend
Expand Down Expand Up @@ -57,7 +57,7 @@ private[spark] class TaskSchedulerImpl(
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))
def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES))

val conf = sc.conf

Expand Down Expand Up @@ -100,7 +100,7 @@ private[spark] class TaskSchedulerImpl(

// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
protected val executorsByHost = new HashMap[String, HashSet[String]]
protected val hostToExecutors = new HashMap[String, HashSet[String]]

protected val hostsByRack = new HashMap[String, HashSet[String]]

Expand Down Expand Up @@ -243,8 +243,8 @@ private[spark] class TaskSchedulerImpl(
}
}
manager.parent.removeSchedulable(manager)
logInfo("Removed TaskSet %s, whose tasks have all completed, from pool %s"
.format(manager.taskSet.id, manager.parent.name))
logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" +
s" ${manager.parent.name}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious - are we preferring String interpolation to format ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think the general idea is while touching something, update to string interpolation, I've seen that in a lot of prs. (In this particular case, I could leave it -- probably I had touched the log msg somewhere along teh way and then backed out.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen this where people are using more string interpolation but I never saw discussion on this or anything, do you know if there is there a specific reason for this? Performance difference or someone just decided it should be standard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont' think there was ever an official discussion, just seemed like there was a gradual switch till it became the norm. I think the motivation was readabillity, not performance.

}

private def resourceOfferSingleTaskSet(
Expand Down Expand Up @@ -291,11 +291,11 @@ private[spark] class TaskSchedulerImpl(
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
if (!executorIdToTaskCount.contains(o.executorId)) {
executorsByHost(o.host) += o.executorId
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount(o.executorId) = 0
Expand Down Expand Up @@ -334,7 +334,7 @@ private[spark] class TaskSchedulerImpl(
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}

Expand Down Expand Up @@ -542,10 +542,10 @@ private[spark] class TaskSchedulerImpl(
executorIdToTaskCount -= executorId

val host = executorIdToHost(executorId)
val execs = executorsByHost.getOrElse(host, new HashSet)
val execs = hostToExecutors.getOrElse(host, new HashSet)
execs -= executorId
if (execs.isEmpty) {
executorsByHost -= host
hostToExecutors -= host
for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
hosts -= host
if (hosts.isEmpty) {
Expand All @@ -565,11 +565,11 @@ private[spark] class TaskSchedulerImpl(
}

def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
executorsByHost.get(host).map(_.toSet)
hostToExecutors.get(host).map(_.toSet)
}

def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
executorsByHost.contains(host)
hostToExecutors.contains(host)
}

def hasHostAliveOnRack(rack: String): Boolean = synchronized {
Expand Down Expand Up @@ -662,5 +662,4 @@ private[spark] object TaskSchedulerImpl {

retval.toList
}

}
Loading