Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6650] [core] Stop ExecutorAllocationManager when context stops. #5311

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark

import java.util.concurrent.{Executors, TimeUnit}

import scala.collection.mutable

import org.apache.spark.scheduler._
import org.apache.spark.util.{SystemClock, Clock}
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* An agent that dynamically allocates and removes executors based on the workload.
Expand Down Expand Up @@ -129,6 +131,10 @@ private[spark] class ExecutorAllocationManager(
// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener

// Executor that handles the scheduling task.
private val executor = Executors.newSingleThreadScheduledExecutor(
Utils.namedThreadFactory("spark-dynamic-executor-allocation"))

/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
Expand Down Expand Up @@ -173,32 +179,24 @@ private[spark] class ExecutorAllocationManager(
}

/**
* Register for scheduler callbacks to decide when to add and remove executors.
* Register for scheduler callbacks to decide when to add and remove executors, and start
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
startPolling()

val scheduleTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}

/**
* Start the main polling thread that keeps track of when to add and remove executors.
* Stop the allocation manager.
*/
private def startPolling(): Unit = {
val t = new Thread {
override def run(): Unit = {
while (true) {
try {
schedule()
} catch {
case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
}
Thread.sleep(intervalMillis)
}
}
}
t.setName("spark-dynamic-executor-allocation")
t.setDaemon(true)
t.start()
def stop(): Unit = {
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking aloud here:
We stop the ExecutorAllocationManager after stopping the DAGScheduler, which means, in yarn-client mode, we'll have already torn down the YARN application. If schedule is called after that, we could be trying to make an RPC to the AM, which is no longer there. So it seems like waiting the full 10 seconds for that RPC to time out could be a common case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to follow the order of initialization during shutdown (the alloc manager is started after the event logger so should be stopped right before it - yeah, I know my code doesn't do exactly that).

But we could move the stop before the scheduler's stop.

}

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Return whether dynamically adjusting the amount of resources allocated to
* this application is supported. This is currently only available for YARN.
*/
private[spark] def supportDynamicAllocation =
private[spark] def supportDynamicAllocation =
master.contains("yarn") || dynamicAllocationTesting

/**
Expand Down Expand Up @@ -1402,6 +1402,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
env.metricsSystem.report()
metadataCleaner.cancel()
cleaner.foreach(_.stop())
executorAllocationManager.foreach(_.stop())
dagScheduler.stop()
dagScheduler = null
listenerBus.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import scala.collection.mutable

import org.scalatest.{FunSuite, PrivateMethodTester}
import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
Expand All @@ -28,29 +28,40 @@ import org.apache.spark.util.ManualClock
/**
* Test add and remove behavior of ExecutorAllocationManager.
*/
class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
import ExecutorAllocationManager._
import ExecutorAllocationManagerSuite._

private val contexts = new mutable.ListBuffer[SparkContext]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this ever have more than a single element?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. See test("verify min/max executors").


before {
contexts.clear()
}

after {
contexts.foreach(_.stop())
}

test("verify min/max executors") {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
val sc0 = new SparkContext(conf)
contexts += sc0
assert(sc0.executorAllocationManager.isDefined)
sc0.stop()

// Min < 0
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
intercept[SparkException] { new SparkContext(conf1) }
intercept[SparkException] { contexts += new SparkContext(conf1) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()

// Max < 0
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
intercept[SparkException] { new SparkContext(conf2) }
intercept[SparkException] { contexts += new SparkContext(conf2) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()

Expand Down Expand Up @@ -665,16 +676,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("executor-2"))
assert(!removeTimes(manager).contains("executor-1"))
}
}

/**
* Helper methods for testing ExecutorAllocationManager.
* This includes methods to access private methods and fields in ExecutorAllocationManager.
*/
private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val schedulerBacklogTimeout = 1L
private val sustainedSchedulerBacklogTimeout = 2L
private val executorIdleTimeout = 3L

private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
val conf = new SparkConf()
Expand All @@ -688,9 +689,22 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
sustainedSchedulerBacklogTimeout.toString)
.set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
.set("spark.dynamicAllocation.testing", "true")
new SparkContext(conf)
val sc = new SparkContext(conf)
contexts += sc
sc
}

}

/**
* Helper methods for testing ExecutorAllocationManager.
* This includes methods to access private methods and fields in ExecutorAllocationManager.
*/
private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val schedulerBacklogTimeout = 1L
private val sustainedSchedulerBacklogTimeout = 2L
private val executorIdleTimeout = 3L

private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
}
Expand Down