Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
[SPARK-21403][MESOS] fix --packages for mesos
Browse files Browse the repository at this point in the history
Fixes --packages flag for mesos in cluster mode. Probably I will handle standalone and Yarn in another commit, I need to investigate those cases as they are different.

Tested with a community 1.9 dc/os cluster. packages were successfully resolved in cluster mode within a container.

andrewor14  susanxhuynh ArtRand srowen  pls review.

Author: Stavros Kontopoulos <st.kontopoulos@gmail.com>

Closes apache#18587 from skonto/fix_packages_mesos_cluster.
  • Loading branch information
skonto authored and ArtRand committed Nov 28, 2017
1 parent 3bde8d4 commit a6e67dd
Showing 1 changed file with 20 additions and 32 deletions.
52 changes: 20 additions & 32 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ object SparkSubmit extends CommandLineUtils {

// scalastyle:off println
private[spark] def printVersionAndExit(): Unit = {
printStream.println(
"""Welcome to
printStream.println("""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
Expand All @@ -113,7 +112,6 @@ object SparkSubmit extends CommandLineUtils {
printStream.println("Type --help for more information.")
exitFn(0)
}

// scalastyle:on println

override def main(args: Array[String]): Unit = {
Expand Down Expand Up @@ -208,7 +206,7 @@ object SparkSubmit extends CommandLineUtils {
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
Expand All @@ -217,14 +215,14 @@ object SparkSubmit extends CommandLineUtils {
/**
* Prepare the environment for submitting an application.
* This returns a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
* Exposed for testing.
*/
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], Map[String, String], String) = {
: (Seq[String], Seq[String], Map[String, String], String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
Expand Down Expand Up @@ -273,7 +271,7 @@ object SparkSubmit extends CommandLineUtils {
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
printErrorAndExit(
"Could not load YARN classes. " +
"This copy of Spark may not have been compiled with YARN support.")
"This copy of Spark may not have been compiled with YARN support.")
}
}

Expand Down Expand Up @@ -334,8 +332,8 @@ object SparkSubmit extends CommandLineUtils {
FileUtils.deleteQuietly(targetDir)
}
})

// scalastyle:on runtimeaddshutdownhook

// Resolve glob path for different resources.
args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull
Expand Down Expand Up @@ -509,28 +507,20 @@ object SparkSubmit extends CommandLineUtils {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
if (args.jars != null) {
childClasspath ++= args.jars.split(",")
}
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
}

if (deployMode == CLIENT) {
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
if (args.childArgs != null) { childArgs ++= args.childArgs }
}

// Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
if (opt.value != null &&
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) {
childArgs += (opt.clOption, opt.value)
}
if (opt.sysProp != null) {
sysProps.put(opt.sysProp, opt.value)
}
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) }
}
}

Expand All @@ -554,9 +544,7 @@ object SparkSubmit extends CommandLineUtils {
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) {
childArgs += "--supervise"
}
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
Expand Down Expand Up @@ -896,10 +884,10 @@ object SparkSubmit extends CommandLineUtils {
* @return A comma separated local files list.
*/
private[deploy] def downloadFile(
path: String,
targetDir: File,
sparkProperties: Map[String, String],
hadoopConf: HadoopConfiguration): String = {
path: String,
targetDir: File,
sparkProperties: Map[String, String],
hadoopConf: HadoopConfiguration): String = {
require(path != null, "path cannot be null.")
val uri = Utils.resolveURI(path)
uri.getScheme match {
Expand Down

0 comments on commit a6e67dd

Please sign in to comment.