Skip to content

Commit

Permalink
[SPARK-22845][SCHEDULER] Modify spark.kubernetes.allocation.batch.del…
Browse files Browse the repository at this point in the history
…ay to take time instead of int

## What changes were proposed in this pull request?

Fixing configuration that was taking an int which should take time. Discussion in #19946 (comment)
Made the granularity milliseconds as opposed to seconds since there's a use-case for sub-second reactions to scale-up rapidly especially with dynamic allocation.

## How was this patch tested?

TODO: manual run of integration tests against this PR.
PTAL

cc/ mccheah liyinan926 kimoonkim vanzin mridulm jiangxb1987 ueshin

Author: foxish <ramanathana@google.com>

Closes #20032 from foxish/fix-time-conf.
  • Loading branch information
foxish authored and Marcelo Vanzin committed Dec 21, 2017
1 parent b176014 commit 0114c89
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ private[spark] object Config extends Logging {

val KUBERNETES_ALLOCATION_BATCH_DELAY =
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
.doc("Number of seconds to wait between each round of executor allocation.")
.longConf
.checkValue(value => value > 0, "Allocation batch delay should be a positive integer")
.createWithDefault(1)
.doc("Time to wait between each round of executor allocation.")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(value => value > 0, "Allocation batch delay must be a positive time value.")
.createWithDefaultString("1s")

val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS =
ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
.watch(new ExecutorPodsWatcher()))

allocatorExecutor.scheduleWithFixedDelay(
allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS)
allocatorRunnable, 0L, podAllocationInterval, TimeUnit.MILLISECONDS)

if (!Utils.isDynamicAllocationEnabled(conf)) {
doRequestTotalExecutors(initialExecutors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
private val NAMESPACE = "test-namespace"
private val SPARK_DRIVER_HOST = "localhost"
private val SPARK_DRIVER_PORT = 7077
private val POD_ALLOCATION_INTERVAL = 60L
private val POD_ALLOCATION_INTERVAL = "1m"
private val DRIVER_URL = RpcEndpointAddress(
SPARK_DRIVER_HOST, SPARK_DRIVER_PORT, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
private val FIRST_EXECUTOR_POD = new PodBuilder()
Expand Down Expand Up @@ -144,7 +144,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
.set(KUBERNETES_NAMESPACE, NAMESPACE)
.set("spark.driver.host", SPARK_DRIVER_HOST)
.set("spark.driver.port", SPARK_DRIVER_PORT.toString)
.set(KUBERNETES_ALLOCATION_BATCH_DELAY, POD_ALLOCATION_INTERVAL)
.set(KUBERNETES_ALLOCATION_BATCH_DELAY.key, POD_ALLOCATION_INTERVAL)
executorPodsWatcherArgument = ArgumentCaptor.forClass(classOf[Watcher[Pod]])
allocatorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
requestExecutorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
Expand All @@ -162,8 +162,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
when(allocatorExecutor.scheduleWithFixedDelay(
allocatorRunnable.capture(),
mockitoEq(0L),
mockitoEq(POD_ALLOCATION_INTERVAL),
mockitoEq(TimeUnit.SECONDS))).thenReturn(null)
mockitoEq(TimeUnit.MINUTES.toMillis(1)),
mockitoEq(TimeUnit.MILLISECONDS))).thenReturn(null)
// Creating Futures in Scala backed by a Java executor service resolves to running
// ExecutorService#execute (as opposed to submit)
doNothing().when(requestExecutorsService).execute(requestExecutorRunnable.capture())
Expand Down

0 comments on commit 0114c89

Please sign in to comment.