From a8b369dad154b7773ec8b535cf399b428ec3952c Mon Sep 17 00:00:00 2001 From: Bill Havanki Date: Thu, 17 Jul 2014 12:01:22 -0400 Subject: [PATCH 1/2] SPARK-2083 Add support for spark.local.maxFailures configuration property --- .../scala/org/apache/spark/SparkContext.scala | 8 +++-- .../SparkContextSchedulerCreationSuite.scala | 32 +++++++++++++++++++ docs/configuration.md | 9 ++++++ 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8052499ab7526..c16619db521ef 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1463,12 +1463,13 @@ object SparkContext extends Logging { // Regular expression for connection to Simr cluster val SIMR_REGEX = """simr://(.*)""".r - // When running locally, don't try to re-execute tasks on failure. + // When running locally, by default don't try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 master match { case "local" => - val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) + val localTaskFailures = sc.conf.getInt("spark.local.maxFailures", MAX_LOCAL_TASK_FAILURES) + val scheduler = new TaskSchedulerImpl(sc, localTaskFailures, isLocal = true) val backend = new LocalBackend(scheduler, 1) scheduler.initialize(backend) scheduler @@ -1477,7 +1478,8 @@ object SparkContext extends Logging { def localCpuCount = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. val threadCount = if (threads == "*") localCpuCount else threads.toInt - val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) + val localTaskFailures = sc.conf.getInt("spark.local.maxFailures", MAX_LOCAL_TASK_FAILURES) + val scheduler = new TaskSchedulerImpl(sc, localTaskFailures, isLocal = true) val backend = new LocalBackend(scheduler, threadCount) scheduler.initialize(backend) scheduler diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 67e3be21c3c93..83a1ea35813c2 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -59,6 +59,22 @@ class SparkContextSchedulerCreationSuite } } + test("local-conf-failures") { + val defaultLocalMaxFailures = System.getProperty("spark.local.maxFailures") + System.setProperty("spark.local.maxFailures", "10") + val sched = createTaskScheduler("local") + assert(sched.maxTaskFailures === 10) + sched.backend match { + case s: LocalBackend => assert(s.totalCores === 1) + case _ => fail() + } + + Option(defaultLocalMaxFailures) match { + case Some(v) => System.setProperty("spark.local.maxFailures", v) + case _ => System.clearProperty("spark.local.maxFailures") + } + } + test("local-n") { val sched = createTaskScheduler("local[5]") assert(sched.maxTaskFailures === 1) @@ -68,6 +84,22 @@ class SparkContextSchedulerCreationSuite } } + test("local-n-conf-failures") { + val defaultLocalMaxFailures = System.getProperty("spark.local.maxFailures") + System.setProperty("spark.local.maxFailures", "10") + val sched = createTaskScheduler("local[5]") + assert(sched.maxTaskFailures === 10) + sched.backend match { + case s: LocalBackend => assert(s.totalCores === 5) + case _ => fail() + } + + Option(defaultLocalMaxFailures) match { + case Some(v) => System.setProperty("spark.local.maxFailures", v) + case _ => System.clearProperty("spark.local.maxFailures") + } + } + test("local-n-failures") { val sched = createTaskScheduler("local[4, 2]") assert(sched.maxTaskFailures === 2) diff --git a/docs/configuration.md b/docs/configuration.md index a70007c165442..59d38d0537ca1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -599,6 +599,15 @@ Apart from these, the following properties are also available, and may be useful Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1. + Does not apply to running Spark locally. + + + + spark.local.maxFailures + 1 + + Number of individual task failures before giving up on the job, when running Spark locally. + Should be greater than or equal to 1. No retries are allowed. From 22cbe22e6981a0135835784ccb6429b866f263ca Mon Sep 17 00:00:00 2001 From: Bill Havanki Date: Tue, 22 Jul 2014 14:41:43 -0400 Subject: [PATCH 2/2] SPARK-2083 Update variable name in SparkContext, fix documentation --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++++---- docs/configuration.md | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c16619db521ef..4bb553f14f18d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1468,8 +1468,8 @@ object SparkContext extends Logging { master match { case "local" => - val localTaskFailures = sc.conf.getInt("spark.local.maxFailures", MAX_LOCAL_TASK_FAILURES) - val scheduler = new TaskSchedulerImpl(sc, localTaskFailures, isLocal = true) + val maxTaskFailures = sc.conf.getInt("spark.local.maxFailures", MAX_LOCAL_TASK_FAILURES) + val scheduler = new TaskSchedulerImpl(sc, maxTaskFailures, isLocal = true) val backend = new LocalBackend(scheduler, 1) scheduler.initialize(backend) scheduler @@ -1478,8 +1478,8 @@ object SparkContext extends Logging { def localCpuCount = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. val threadCount = if (threads == "*") localCpuCount else threads.toInt - val localTaskFailures = sc.conf.getInt("spark.local.maxFailures", MAX_LOCAL_TASK_FAILURES) - val scheduler = new TaskSchedulerImpl(sc, localTaskFailures, isLocal = true) + val maxTaskFailures = sc.conf.getInt("spark.local.maxFailures", MAX_LOCAL_TASK_FAILURES) + val scheduler = new TaskSchedulerImpl(sc, maxTaskFailures, isLocal = true) val backend = new LocalBackend(scheduler, threadCount) scheduler.initialize(backend) scheduler diff --git a/docs/configuration.md b/docs/configuration.md index 59d38d0537ca1..88486aef18fb2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -607,7 +607,7 @@ Apart from these, the following properties are also available, and may be useful 1 Number of individual task failures before giving up on the job, when running Spark locally. - Should be greater than or equal to 1. No retries are allowed. + Should be greater than or equal to 1. Number of allowed retries = this value - 1.