Skip to content

Commit

Permalink
Cleanup of spark-submit script and Scala quick start guide
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Apr 20, 2014
1 parent af0adf7 commit b16e6a2
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 79 deletions.
14 changes: 9 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ class SparkContext(config: SparkConf) extends Logging {
conf.setIfMissing("spark.driver.host", Utils.localHostName())
conf.setIfMissing("spark.driver.port", "0")

val jars: Seq[String] = if (conf.contains("spark.jars")) {
conf.get("spark.jars").split(",").filter(_.size != 0)
} else {
null
}
val jars: Seq[String] =
conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

val files: Seq[String] =
conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")
Expand Down Expand Up @@ -236,6 +236,10 @@ class SparkContext(config: SparkConf) extends Logging {
jars.foreach(addJar)
}

if (files != null) {
files.foreach(addFile)
}

private def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
Expand Down
47 changes: 11 additions & 36 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

package org.apache.spark.deploy

import java.io.{File, FileInputStream, IOException, PrintStream}
import java.io.{File, PrintStream}
import java.net.{URI, URL}
import java.util.Properties

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

import org.apache.spark.SparkException
import org.apache.spark.executor.ExecutorURLClassLoader

/**
Expand Down Expand Up @@ -110,23 +107,6 @@ object SparkSubmit {
val sysProps = new HashMap[String, String]()
var childMainClass = ""

// Load system properties by default from the file, if present
if (appArgs.verbose) printStream.println(s"Using properties file: ${appArgs.propertiesFile}")
Option(appArgs.propertiesFile).foreach { filename =>
val file = new File(filename)
getDefaultProperties(file).foreach { case (k, v) =>
if (k.startsWith("spark")) {
if (k == "spark.master")
throw new Exception("Setting spark.master in spark-defaults.properties is not " +
"supported. Use MASTER environment variable or --master.")
sysProps(k) = v
if (appArgs.verbose) printStream.println(s"Adding default property: $k=$v")
}
else {
printWarning(s"Ignoring non-spark config property: $k=$v")
}
}
}

if (clusterManager == MESOS && deployOnCluster) {
printErrorAndExit("Mesos does not support running the driver on the cluster")
Expand Down Expand Up @@ -166,10 +146,11 @@ object SparkSubmit {
sysProp = "spark.cores.max"),
new OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"),
new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"),
new OptionAssigner(appArgs.files, STANDALONE | MESOS, true, sysProp = "spark.files"),
new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"),
new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"),
new OptionAssigner(appArgs.jars, STANDALONE | YARN | MESOS, true, sysProp = "spark.jars")
new OptionAssigner(appArgs.jars, STANDALONE | MESOS, false, sysProp = "spark.jars")
)

// For client mode make any added jars immediately visible on the classpath
Expand Down Expand Up @@ -219,6 +200,10 @@ object SparkSubmit {
}
}

for ((k, v) <- appArgs.getDefaultSparkProperties) {
if (!sysProps.contains(k)) sysProps(k) = v
}

(childArgs, childClasspath, sysProps, childMainClass)
}

Expand Down Expand Up @@ -259,22 +244,12 @@ object SparkSubmit {
val url = localJarFile.getAbsoluteFile.toURI.toURL
loader.addURL(url)
}

private def getDefaultProperties(file: File): Seq[(String, String)] = {
require(file.exists(), s"Default properties file ${file.getName} does not exist")
val inputStream = new FileInputStream(file)
val properties = new Properties()
try {
properties.load(inputStream)
} catch {
case e: IOException =>
val message = s"Failed when loading Spark properties file ${file.getName}"
throw new SparkException(message, e)
}
properties.stringPropertyNames().toSeq.map(k => (k, properties(k)))
}
}

/**
* Provides an indirection layer for passing arguments as system properties or flags to
* the user's driver program or to downstream launcher tools.
*/
private[spark] class OptionAssigner(val value: String,
val clusterManager: Int,
val deployOnCluster: Boolean,
Expand Down
128 changes: 104 additions & 24 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

package org.apache.spark.deploy

import scala.collection.mutable.ArrayBuffer
import java.io.File
import java.io.{File, FileInputStream, IOException}
import java.util.Properties

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ArrayBuffer}

import org.apache.spark.SparkException

/**
* Parses and encapsulates arguments from the spark-submit script.
*/
private[spark] class SparkSubmitArguments(args: Array[String]) {
var master: String = "local"
var master: String = null
var deployMode: String = null
var executorMemory: String = null
var executorCores: String = null
Expand All @@ -47,22 +52,70 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
var jars: String = null
var verbose: Boolean = false

loadEnvVars()
parseOpts(args.toList)
loadDefaults()
checkRequiredArguments()

/** Return default present in the currently defined defaults file. */
def getDefaultSparkProperties = {
val defaultProperties = new HashMap[String, String]()
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
Option(propertiesFile).foreach { filename =>
val file = new File(filename)
SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
if (k.startsWith("spark")) {
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
}
else {
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
}
}
}
defaultProperties
}

// Sanity checks
if (args.length == 0) printUsageAndExit(-1)
if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
if (propertiesFile == null) {
sys.env.get("SPARK_HOME").foreach { sparkHome =>
val sep = File.separator
val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.properties"
val file = new File(defaultPath)
if (file.exists()) {
propertiesFile = file.getAbsolutePath
}
/** Fill in any undefined values based on the current properties file or built-in defaults. */
private def loadDefaults() = {

// Use common defaults file, if not specified by user
if (propertiesFile == null) {
sys.env.get("SPARK_HOME").foreach { sparkHome =>
val sep = File.separator
val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.properties"
val file = new File(defaultPath)
if (file.exists()) {
propertiesFile = file.getAbsolutePath
}
}
}

val defaultProperties = getDefaultSparkProperties
// Use properties file as fallback for values which have a direct analog to
// arguments in this script.
master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
executorMemory = Option(executorMemory)
.getOrElse(defaultProperties.get("spark.executor.memory").orNull)
executorCores = Option(executorCores)
.getOrElse(defaultProperties.get("spark.executor.cores").orNull)
totalExecutorCores = Option(totalExecutorCores)
.getOrElse(defaultProperties.get("spark.cores.max").orNull)
name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)

// This supports env vars in older versions of Spark
master = Option(master).getOrElse(System.getenv("MASTER"))
deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE"))

// Global defaults. These should be keep to minimum to avoid confusing behavior.
master = Option(master).getOrElse("local")
}

/** Ensure that required fields exists. Call this only once all defaults are loaded. */
private def checkRequiredArguments() = {
if (args.length == 0) printUsageAndExit(-1)
if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
}

override def toString = {
Expand All @@ -89,14 +142,12 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
| childArgs [${childArgs.mkString(" ")}]
| jars $jars
| verbose $verbose
|
|Default properties from $propertiesFile:
|${getDefaultSparkProperties.mkString(" ", "\n ", "\n")}
""".stripMargin
}

private def loadEnvVars() {
Option(System.getenv("MASTER")).map(master = _)
Option(System.getenv("DEPLOY_MODE")).map(deployMode = _)
}

private def parseOpts(opts: List[String]): Unit = opts match {
case ("--name") :: value :: tail =>
name = value
Expand Down Expand Up @@ -189,6 +240,18 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
parseOpts(tail)

case value :: tail =>
if (value.startsWith("-")) {
val errMessage = s"Unrecognized option '$value'."
val suggestion: Option[String] = value match {
case v if v.startsWith("--") && v.contains("=") =>
val parts = v.split("=")
Some(s"Perhaps you meant '${parts(0)} ${parts(1)}'?")
case _ =>
None
}
SparkSubmit.printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse(""))
}

if (primaryResource != null) {
val error = s"Found two conflicting resources, $value and $primaryResource." +
" Expecting only one resource."
Expand Down Expand Up @@ -217,6 +280,8 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
| --jars JARS A comma-separated list of local jars to include on the
| driver classpath and that SparkContext.addJar will work
| with. Doesn't work on standalone with 'cluster' deploy mode.
| --files FILES Comma separated list of files to be placed in the working dir
| of each executor.
| --properties-file FILE Path to a file from which to load extra properties. If not
| specified, this will look for conf/spark-defaults.properties.
|
Expand All @@ -225,6 +290,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
| --driver-library-path Extra library path entries to pass to the driver
| --driver-class-path Extra class path entries to pass to the driver
|
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
|
| Spark standalone with cluster deploy mode only:
| --driver-cores NUM Cores for driver (Default: 1).
Expand All @@ -235,14 +301,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
|
| YARN-only:
| --executor-cores NUM Number of cores per executor (Default: 1).
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
| --queue QUEUE_NAME The YARN queue to submit to (Default: 'default').
| --num-executors NUM Number of executors to (Default: 2).
| --files FILES Comma separated list of files to be placed in the working dir
| of each executor.
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working dir of each executor.""".stripMargin
)
SparkSubmit.exitFn()
}
}

object SparkSubmitArguments {
/** Load properties present in the given file. */
def getPropertiesFromFile(file: File): Seq[(String, String)] = {
require(file.exists(), s"Properties file ${file.getName} does not exist")
val inputStream = new FileInputStream(file)
val properties = new Properties()
try {
properties.load(inputStream)
} catch {
case e: IOException =>
val message = s"Failed when loading Spark properties file ${file.getName}"
throw new SparkException(message, e)
}
properties.stringPropertyNames().toSeq.map(k => (k, properties(k)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

package org.apache.spark.deploy

import java.io.{OutputStream, PrintStream}
import java.io.{File, OutputStream, PrintStream}

import scala.collection.mutable.ArrayBuffer

import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers

import org.apache.spark.deploy.SparkSubmit._
import org.scalatest.prop.Tables.Table
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.apache.spark.util.Utils


class SparkSubmitSuite extends FunSuite with ShouldMatchers {
Expand Down Expand Up @@ -71,6 +74,13 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
testPrematureExit(Array("--help"), "Usage: spark-submit") should be (true)
}

test("prints error with unrecognized option") {
testPrematureExit(Array("my.jar --blarg"), "Unrecognized option '--blarg'") should be (true)
testPrematureExit(Array("my.jar -bleg"), "Unrecognized option: '-bleg'") should be (true)
testPrematureExit(Array("my.jar --master=abc"),
"Unrecognized option: '--master=abc'. Perhaps you want '--master abc'?") should be (true)
}

test("handles multiple binary definitions") {
val adjacentJars = Array("foo.jar", "bar.jar")
testPrematureExit(adjacentJars, "error: Found two conflicting resources") should be (true)
Expand Down Expand Up @@ -175,4 +185,14 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.cores.max") should be ("5")
}

def runSparkSubmit(args: Seq[String]): String = {
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
Utils.executeAndGetOutput(
Seq("./bin/spark-submit") ++ args,
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
}


}
8 changes: 8 additions & 0 deletions docs/cluster-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ The following table summarizes terms you'll see used to refer to cluster concept
<td>Application</td>
<td>User program built on Spark. Consists of a <em>driver program</em> and <em>executors</em> on the cluster.</td>
</tr>
<tr>
<td>Application jar</td>
<td>
A jar containing the user's Spark application. In some cases users will want to create
an "uber jar" containing their application along with its dependencies. The user's jar
should never include Hadoop or Spark libraries, however, these will be added at runtime.
</td>
</tr>
<tr>
<td>Driver program</td>
<td>The process running the main() function of the application and creating the SparkContext</td>
Expand Down
Loading

0 comments on commit b16e6a2

Please sign in to comment.