From dc3c825934cbd62566d09d3f2b4334dcc444879a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 21 May 2014 10:51:43 -0700 Subject: [PATCH 1/6] add secondary jars to classpath in yarn --- .../apache/spark/deploy/yarn/ClientBase.scala | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 27a518ccda459..b3d7a2d7fcc36 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -220,10 +220,11 @@ trait ClientBase extends Logging { } } + var cachedJarLinks = ListBuffer.empty[String] val fileLists = List( (args.addJars, LocalResourceType.FILE, true), (args.files, LocalResourceType.FILE, false), (args.archives, LocalResourceType.ARCHIVE, false) ) - fileLists.foreach { case (flist, resType, appMasterOnly) => + fileLists.foreach { case (flist, resType, addToClasspath) => if (flist != null && !flist.isEmpty()) { flist.split(',').foreach { case file: String => val localURI = new URI(file.trim()) @@ -232,11 +233,15 @@ trait ClientBase extends Logging { val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) val destPath = copyRemoteFile(dst, localPath, replication) distCacheMgr.addResource(fs, conf, destPath, localResources, resType, - linkname, statCache, appMasterOnly) + linkname, statCache) + if (addToClasspath) { + cachedJarLinks += linkname + } } } } } + conf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedJarLinks.mkString(",")) UserGroupInformation.getCurrentUser().addCredentials(credentials) localResources @@ -379,6 +384,7 @@ object ClientBase { val LOG4J_PROP: String = "log4j.properties" val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF" val LOCAL_SCHEME = "local" + val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars" def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head) @@ -479,36 +485,22 @@ object ClientBase { extraClassPath.foreach(addClasspathEntry) - addClasspathEntry(Environment.PWD.$()) + val localSecondaryJarLinks = conf.getStrings(CONF_SPARK_YARN_SECONDARY_JARS) // Normally the users app.jar is last in case conflicts with spark jars if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) { addPwdClasspathEntry(APP_JAR) + localSecondaryJarLinks.foreach(addPwdClasspathEntry) addPwdClasspathEntry(SPARK_JAR) ClientBase.populateHadoopClasspath(conf, env) } else { addPwdClasspathEntry(SPARK_JAR) ClientBase.populateHadoopClasspath(conf, env) addPwdClasspathEntry(APP_JAR) + localSecondaryJarLinks.foreach(addPwdClasspathEntry) } addPwdClasspathEntry("*") } - /** - * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly - * to the classpath. - */ - private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = { - if (args != null) { - addClasspathEntry(args.userJar, APP_JAR, env) - } - - if (args != null && args.addJars != null) { - args.addJars.split(",").foreach { case file: String => - addClasspathEntry(file, null, env) - } - } - } - /** * Adds the given path to the classpath, handling "local:" URIs correctly. * From 3e7e1c4a2fe1a9d8512c19e56df91b34bea58108 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 21 May 2014 11:21:09 -0700 Subject: [PATCH 2/6] use sparkConf instead of hadoop conf --- .../main/scala/org/apache/spark/deploy/yarn/ClientBase.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index b3d7a2d7fcc36..2d391d58dd677 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -241,7 +241,7 @@ trait ClientBase extends Logging { } } } - conf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedJarLinks.mkString(",")) + sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedJarLinks.mkString(",")) UserGroupInformation.getCurrentUser().addCredentials(credentials) localResources @@ -485,7 +485,7 @@ object ClientBase { extraClassPath.foreach(addClasspathEntry) - val localSecondaryJarLinks = conf.getStrings(CONF_SPARK_YARN_SECONDARY_JARS) + val localSecondaryJarLinks = sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") // Normally the users app.jar is last in case conflicts with spark jars if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) { addPwdClasspathEntry(APP_JAR) From 11e535434940d0809bd8c1380b2d4a92d87ebb6a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 21 May 2014 11:45:25 -0700 Subject: [PATCH 3/6] minor changes --- .../org/apache/spark/deploy/yarn/ClientBase.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 2d391d58dd677..9b7f86126d0b0 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -220,7 +220,7 @@ trait ClientBase extends Logging { } } - var cachedJarLinks = ListBuffer.empty[String] + var cachedSecondaryJarLinks = ListBuffer.empty[String] val fileLists = List( (args.addJars, LocalResourceType.FILE, true), (args.files, LocalResourceType.FILE, false), (args.archives, LocalResourceType.ARCHIVE, false) ) @@ -235,13 +235,13 @@ trait ClientBase extends Logging { distCacheMgr.addResource(fs, conf, destPath, localResources, resType, linkname, statCache) if (addToClasspath) { - cachedJarLinks += linkname + cachedSecondaryJarLinks += linkname } } } } } - sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedJarLinks.mkString(",")) + sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) UserGroupInformation.getCurrentUser().addCredentials(credentials) localResources @@ -485,18 +485,19 @@ object ClientBase { extraClassPath.foreach(addClasspathEntry) - val localSecondaryJarLinks = sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") + val cachedSecondaryJarLinks = + sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") // Normally the users app.jar is last in case conflicts with spark jars if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) { addPwdClasspathEntry(APP_JAR) - localSecondaryJarLinks.foreach(addPwdClasspathEntry) + cachedSecondaryJarLinks.foreach(addPwdClasspathEntry) addPwdClasspathEntry(SPARK_JAR) ClientBase.populateHadoopClasspath(conf, env) } else { addPwdClasspathEntry(SPARK_JAR) ClientBase.populateHadoopClasspath(conf, env) addPwdClasspathEntry(APP_JAR) - localSecondaryJarLinks.foreach(addPwdClasspathEntry) + cachedSecondaryJarLinks.foreach(addPwdClasspathEntry) } addPwdClasspathEntry("*") } From 65e04ad8296969445e4ecfaa8921d55fe1e39c74 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 21 May 2014 11:52:02 -0700 Subject: [PATCH 4/6] update spark-submit help message and add a comment for yarn-client --- .../scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 3 +-- .../spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 264d4544cd31c..0cc05fb95aef0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -326,8 +326,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { | --class CLASS_NAME Your application's main class (for Java / Scala apps). | --name NAME A name of your application. | --jars JARS Comma-separated list of local jars to include on the driver - | and executor classpaths. Doesn't work for drivers in - | standalone mode with "cluster" deploy mode. + | and executor classpaths. | --py-files PY_FILES Comma-separated list of .zip or .egg files to place on the | PYTHONPATH for Python apps. | --files FILES Comma-separated list of files to be placed in the working diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 2924189077b7d..c989375bac55a 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -52,7 +52,7 @@ private[spark] class YarnClientSchedulerBackend( val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ( "--class", "notused", - "--jar", null, + "--jar", null, // The primary jar will be added dynamically in SparkContext. "--args", hostport, "--am-class", classOf[ExecutorLauncher].getName ) From a40f6ed9fc634a8158e89022b68f8d3338b6f05a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 21 May 2014 14:22:51 -0700 Subject: [PATCH 5/6] standalone -> cluster --- .../main/scala/org/apache/spark/deploy/yarn/ClientBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 9b7f86126d0b0..41c6198d982ec 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -44,7 +44,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext} * Client submits an application to the YARN ResourceManager. * * Depending on the deployment mode this will launch one of two application master classes: - * 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]] + * 1. In cluster mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]] * which launches a driver program inside of the cluster. * 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to * request executors on behalf of a driver running outside of the cluster. From 23e7df4d442060851459018cef22333bd1cbbb92 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 21 May 2014 15:54:38 -0700 Subject: [PATCH 6/6] rename spark.jar to __spark__.jar and app.jar to __app__.jar to avoid confliction apped $CWD/ and $CWD/* to the classpath remove unused methods --- .../apache/spark/deploy/yarn/ClientBase.scala | 36 +++---------------- 1 file changed, 4 insertions(+), 32 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 41c6198d982ec..aeb3f0062df3b 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -379,8 +379,8 @@ trait ClientBase extends Logging { } object ClientBase { - val SPARK_JAR: String = "spark.jar" - val APP_JAR: String = "app.jar" + val SPARK_JAR: String = "__spark__.jar" + val APP_JAR: String = "__app__.jar" val LOG4J_PROP: String = "log4j.properties" val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF" val LOCAL_SCHEME = "local" @@ -499,39 +499,11 @@ object ClientBase { addPwdClasspathEntry(APP_JAR) cachedSecondaryJarLinks.foreach(addPwdClasspathEntry) } + // Append all class files and jar files under the working directory to the classpath. + addClasspathEntry(Environment.PWD.$()) addPwdClasspathEntry("*") } - /** - * Adds the given path to the classpath, handling "local:" URIs correctly. - * - * If an alternate name for the file is given, and it's not a "local:" file, the alternate - * name will be added to the classpath (relative to the job's work directory). - * - * If not a "local:" file and no alternate name, the environment is not modified. - * - * @param path Path to add to classpath (optional). - * @param fileName Alternate name for the file (optional). - * @param env Map holding the environment variables. - */ - private def addClasspathEntry(path: String, fileName: String, - env: HashMap[String, String]) : Unit = { - if (path != null) { - scala.util.control.Exception.ignoring(classOf[URISyntaxException]) { - val localPath = getLocalPath(path) - if (localPath != null) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath, - File.pathSeparator) - return - } - } - } - if (fileName != null) { - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, - Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator); - } - } - /** * Returns the local path if the URI is a "local:" URI, or null otherwise. */