diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 40d8d6d6e6961..83781b3e7515b 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -147,7 +147,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { "Options:\n" + " --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" + " --class CLASS_NAME Name of your application's main class (required)\n" + - " --arg ARGS Argument to be passed to your application's main class.\n" + + " --arg ARG Argument to be passed to your application's main class.\n" + " Multiple invocations are possible, each will be passed in order.\n" + " --num-executors NUM Number of executors to start (Default: 2)\n" + " --executor-cores NUM Number of cores for the executors (Default: 1).\n" + diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index a825c41667e25..da4196843629b 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -39,45 +39,22 @@ private[spark] class YarnClientSchedulerBackend( var stopping: Boolean = false var totalExpectedExecutors = 0 - private[spark] def addArg(optionName: String, envVar: String, sysProp: String, - arrayBuf: ArrayBuffer[String]) { - if (System.getenv(envVar) != null) { - arrayBuf += (optionName, System.getenv(envVar)) - } else if (sc.getConf.contains(sysProp)) { - arrayBuf += (optionName, sc.getConf.get(sysProp)) - } - } - + /** + * Create a Yarn client to submit an application to the ResourceManager. + * This waits until the application is running. + */ override def start() { super.start() - val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort) val argsArrayBuf = new ArrayBuffer[String]() - argsArrayBuf += ( - "--args", hostport - ) - - // process any optional arguments, given either as environment variables - // or system properties. use the defaults already defined in ClientArguments - // if things aren't specified. system properties override environment - // variables. - List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), - ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), - ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"), - ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), - ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), - ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), - ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), - ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), - ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), - ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")) - .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } - - logDebug("ClientArguments called with: " + argsArrayBuf) + argsArrayBuf += ("--arg", hostport) + argsArrayBuf ++= getExtraClientArguments + + logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) val args = new ClientArguments(argsArrayBuf.toArray, conf) totalExpectedExecutors = args.numExecutors client = new Client(args, conf) @@ -86,6 +63,35 @@ private[spark] class YarnClientSchedulerBackend( asyncMonitorApplication() } + /** + * Return any extra command line arguments to be passed to Client provided in the form of + * environment variables or Spark properties. + */ + private def getExtraClientArguments: Seq[String] = { + val extraArgs = new ArrayBuffer[String] + val optionTuples = // List of (target Client argument, environment variable, Spark property) + List( + ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), + ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), + ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"), + ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), + ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), + ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), + ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), + ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), + ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), + ("--name", "SPARK_YARN_APP_NAME", "spark.app.name") + ) + optionTuples.foreach { case (argName, envVar, sparkProp) => + if (System.getenv(envVar) != null) { + extraArgs += (optionName, System.getenv(envVar)) + } else if (sc.getConf.contains(sysProp)) { + extraArgs += (optionName, sc.getConf.get(sysProp)) + } + } + extraArgs + } + /** * Report the state of the application until it is running. * If the application has finished, failed or been killed in the process, throw an exception.