Skip to content

Commit

Permalink
Clean up YarnClientSchedulerBackend more
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Sep 9, 2014
1 parent 6de9072 commit ef7069a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" +
" --arg ARGS Argument to be passed to your application's main class.\n" +
" --arg ARG Argument to be passed to your application's main class.\n" +
" Multiple invocations are possible, each will be passed in order.\n" +
" --num-executors NUM Number of executors to start (Default: 2)\n" +
" --executor-cores NUM Number of cores for the executors (Default: 1).\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,45 +39,22 @@ private[spark] class YarnClientSchedulerBackend(
var stopping: Boolean = false
var totalExpectedExecutors = 0

private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
arrayBuf: ArrayBuffer[String]) {
if (System.getenv(envVar) != null) {
arrayBuf += (optionName, System.getenv(envVar))
} else if (sc.getConf.contains(sysProp)) {
arrayBuf += (optionName, sc.getConf.get(sysProp))
}
}

/**
* Create a Yarn client to submit an application to the ResourceManager.
* This waits until the application is running.
*/
override def start() {
super.start()

val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)

val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
"--args", hostport
)

// process any optional arguments, given either as environment variables
// or system properties. use the defaults already defined in ClientArguments
// if things aren't specified. system properties override environment
// variables.
List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
("--name", "SPARK_YARN_APP_NAME", "spark.app.name"))
.foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }

logDebug("ClientArguments called with: " + argsArrayBuf)
argsArrayBuf += ("--arg", hostport)
argsArrayBuf ++= getExtraClientArguments

logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExpectedExecutors = args.numExecutors
client = new Client(args, conf)
Expand All @@ -86,6 +63,35 @@ private[spark] class YarnClientSchedulerBackend(
asyncMonitorApplication()
}

/**
* Return any extra command line arguments to be passed to Client provided in the form of
* environment variables or Spark properties.
*/
private def getExtraClientArguments: Seq[String] = {
val extraArgs = new ArrayBuffer[String]
val optionTuples = // List of (target Client argument, environment variable, Spark property)
List(
("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
("--name", "SPARK_YARN_APP_NAME", "spark.app.name")
)
optionTuples.foreach { case (argName, envVar, sparkProp) =>
if (System.getenv(envVar) != null) {
extraArgs += (optionName, System.getenv(envVar))
} else if (sc.getConf.contains(sysProp)) {
extraArgs += (optionName, sc.getConf.get(sysProp))
}
}
extraArgs
}

/**
* Report the state of the application until it is running.
* If the application has finished, failed or been killed in the process, throw an exception.
Expand Down

0 comments on commit ef7069a

Please sign in to comment.