Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1870] Make spark-submit --jars work in yarn-cluster mode. #848

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| --class CLASS_NAME Your application's main class (for Java / Scala apps).
| --name NAME A name of your application.
| --jars JARS Comma-separated list of local jars to include on the driver
| and executor classpaths. Doesn't work for drivers in
| standalone mode with "cluster" deploy mode.
| and executor classpaths.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a reason for taking this out? My impression is that this still won't work on standalone with cluster deploy mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not have been taken out actually. It can be put back in. But we found out just now that the "cluster mode" of Spark Standalone cluster is sort of semi-broken with spark submit.

| --py-files PY_FILES Comma-separated list of .zip or .egg files to place on the
| PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext}
* Client submits an application to the YARN ResourceManager.
*
* Depending on the deployment mode this will launch one of two application master classes:
* 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
* 1. In cluster mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
* which launches a driver program inside of the cluster.
* 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
* request executors on behalf of a driver running outside of the cluster.
Expand Down Expand Up @@ -220,10 +220,11 @@ trait ClientBase extends Logging {
}
}

var cachedSecondaryJarLinks = ListBuffer.empty[String]
val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
(args.files, LocalResourceType.FILE, false),
(args.archives, LocalResourceType.ARCHIVE, false) )
fileLists.foreach { case (flist, resType, appMasterOnly) =>
fileLists.foreach { case (flist, resType, addToClasspath) =>
if (flist != null && !flist.isEmpty()) {
flist.split(',').foreach { case file: String =>
val localURI = new URI(file.trim())
Expand All @@ -232,11 +233,15 @@ trait ClientBase extends Logging {
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyRemoteFile(dst, localPath, replication)
distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
linkname, statCache, appMasterOnly)
linkname, statCache)
if (addToClasspath) {
cachedSecondaryJarLinks += linkname
}
}
}
}
}
sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))

UserGroupInformation.getCurrentUser().addCredentials(credentials)
localResources
Expand Down Expand Up @@ -374,11 +379,12 @@ trait ClientBase extends Logging {
}

object ClientBase {
val SPARK_JAR: String = "spark.jar"
val APP_JAR: String = "app.jar"
val SPARK_JAR: String = "__spark__.jar"
val APP_JAR: String = "__app__.jar"
val LOG4J_PROP: String = "log4j.properties"
val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF"
val LOCAL_SCHEME = "local"
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"

def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head)

Expand Down Expand Up @@ -479,66 +485,25 @@ object ClientBase {

extraClassPath.foreach(addClasspathEntry)

addClasspathEntry(Environment.PWD.$())
val cachedSecondaryJarLinks =
sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",")
// Normally the users app.jar is last in case conflicts with spark jars
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's difference between spark.yarn.user.classpath.first and spark.files.userClassPathFirst? For me, it seems to be the same thing with two different configuration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS, in line 47, * 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
should it be cluster mode now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.files.userClassPath is a global configuration that controls the ordering of dynamically added jars, while spark.yarn.user.classpath.first is only for YARN. I agree it is a little confusing, but this is independent of this PR. We can create a new JIRA for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update the doc. Thanks!

addPwdClasspathEntry(APP_JAR)
cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
addPwdClasspathEntry(SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)
} else {
addPwdClasspathEntry(SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)
addPwdClasspathEntry(APP_JAR)
cachedSecondaryJarLinks.foreach(addPwdClasspathEntry)
}
// Append all class files and jar files under the working directory to the classpath.
addClasspathEntry(Environment.PWD.$())
addPwdClasspathEntry("*")
}

/**
* Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
* to the classpath.
*/
private def addUserClasspath(args: ClientArguments, env: HashMap[String, String]) = {
if (args != null) {
addClasspathEntry(args.userJar, APP_JAR, env)
}

if (args != null && args.addJars != null) {
args.addJars.split(",").foreach { case file: String =>
addClasspathEntry(file, null, env)
}
}
}

/**
* Adds the given path to the classpath, handling "local:" URIs correctly.
*
* If an alternate name for the file is given, and it's not a "local:" file, the alternate
* name will be added to the classpath (relative to the job's work directory).
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
* @param path Path to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
private def addClasspathEntry(path: String, fileName: String,
env: HashMap[String, String]) : Unit = {
if (path != null) {
scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
val localPath = getLocalPath(path)
if (localPath != null) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, localPath,
File.pathSeparator)
return
}
}
}
if (fileName != null) {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + fileName, File.pathSeparator);
}
}

/**
* Returns the local path if the URI is a "local:" URI, or null otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[spark] class YarnClientSchedulerBackend(
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
"--class", "notused",
"--jar", null,
"--jar", null, // The primary jar will be added dynamically in SparkContext.
"--args", hostport,
"--am-class", classOf[ExecutorLauncher].getName
)
Expand Down