Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-2083 Add support for spark.local.maxFailures configuration property #1465

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1463,12 +1463,13 @@ object SparkContext extends Logging {
// Regular expression for connection to Simr cluster
val SIMR_REGEX = """simr://(.*)""".r

// When running locally, don't try to re-execute tasks on failure.
// When running locally, by default don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1

master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val localTaskFailures = sc.conf.getInt("spark.local.maxFailures", MAX_LOCAL_TASK_FAILURES)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd rename the variable maxTaskFailures

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

val scheduler = new TaskSchedulerImpl(sc, localTaskFailures, isLocal = true)
val backend = new LocalBackend(scheduler, 1)
scheduler.initialize(backend)
scheduler
Expand All @@ -1477,7 +1478,8 @@ object SparkContext extends Logging {
def localCpuCount = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val localTaskFailures = sc.conf.getInt("spark.local.maxFailures", MAX_LOCAL_TASK_FAILURES)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

val scheduler = new TaskSchedulerImpl(sc, localTaskFailures, isLocal = true)
val backend = new LocalBackend(scheduler, threadCount)
scheduler.initialize(backend)
scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ class SparkContextSchedulerCreationSuite
}
}

test("local-conf-failures") {
val defaultLocalMaxFailures = System.getProperty("spark.local.maxFailures")
System.setProperty("spark.local.maxFailures", "10")
val sched = createTaskScheduler("local")
assert(sched.maxTaskFailures === 10)
sched.backend match {
case s: LocalBackend => assert(s.totalCores === 1)
case _ => fail()
}

Option(defaultLocalMaxFailures) match {
case Some(v) => System.setProperty("spark.local.maxFailures", v)
case _ => System.clearProperty("spark.local.maxFailures")
}
}

test("local-n") {
val sched = createTaskScheduler("local[5]")
assert(sched.maxTaskFailures === 1)
Expand All @@ -68,6 +84,22 @@ class SparkContextSchedulerCreationSuite
}
}

test("local-n-conf-failures") {
val defaultLocalMaxFailures = System.getProperty("spark.local.maxFailures")
System.setProperty("spark.local.maxFailures", "10")
val sched = createTaskScheduler("local[5]")
assert(sched.maxTaskFailures === 10)
sched.backend match {
case s: LocalBackend => assert(s.totalCores === 5)
case _ => fail()
}

Option(defaultLocalMaxFailures) match {
case Some(v) => System.setProperty("spark.local.maxFailures", v)
case _ => System.clearProperty("spark.local.maxFailures")
}
}

test("local-n-failures") {
val sched = createTaskScheduler("local[4, 2]")
assert(sched.maxTaskFailures === 2)
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,15 @@ Apart from these, the following properties are also available, and may be useful
<td>
Number of individual task failures before giving up on the job.
Should be greater than or equal to 1. Number of allowed retries = this value - 1.
Does not apply to running Spark locally.
</td>
</tr>
<tr>
<td><code>spark.local.maxFailures</code></td>
<td>1</td>
<td>
Number of individual task failures before giving up on the job, when running Spark locally.
Should be greater than or equal to 1. No retries are allowed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"No retries are allowed." ? What does this mean?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's an error. It should say "Number of allowed retries is this value - 1." Will fix.

</td>
</tr>
<tr>
Expand Down