Skip to content

Commit

Permalink
Cleaning up line limits and two compile errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Apr 14, 2014
1 parent b08893b commit afc9ed8
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 25 deletions.
4 changes: 2 additions & 2 deletions conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory to workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_DAEMON_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
64 changes: 58 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
}

validateSettings()

/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
if (key == null) {
Expand Down Expand Up @@ -210,25 +208,79 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
new SparkConf(false).setAll(settings)
}

/** Checks for illegal or deprecated config settings. Throws an exception for the former. */
private def validateSettings() {
/** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
* idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
private[spark] def validateSettings() {
if (settings.contains("spark.local.dir")) {
val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
"the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
logWarning(msg)
}

val executorOptsKey = "spark.executor.extraJavaOptions"
val executorClasspathKey = "spark.executor.extraClassPath"
val driverOptsKey = "spark.driver.extraJavaOptions"
val driverClassPathKey = "spark.driver.extraClassPath"

// Validate spark.executor.extraJavaOptions
settings.get(executorOptsKey).map { javaOpts =>
if (javaOpts.contains("-Dspark")) {
val msg = s"$executorOptsKey is not allowed to set Spark options. Was '$javaOpts'"
throw new Exception(msg)
}
if (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")) {
val msg = s"$executorOptsKey is not allowed to alter memory settings (was '$javaOpts'). Use " +
"spark.executor.memory instead."
val msg = s"$executorOptsKey is not allowed to alter memory settings (was '$javaOpts'). " +
"Use spark.executor.memory instead."
throw new Exception(msg)
}
}

// Check for legacy configs
sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
val error =
s"""
|SPARK_JAVA_OPTS was detected (set to '$value').
|This has undefined behavior when running on a cluster and is deprecated in Spark 1.0+.
|
|Please instead use:
| - ./spark-submit with conf/spark-defaults.conf to set properties for an application
| - ./spark-submit with --driver-java-options to set -X options for a driver
| - spark.executor.executor.extraJavaOptions to set -X options for executors
| - SPARK_DAEMON_OPTS to set java options for standalone daemons (i.e. master, worker)
""".stripMargin
logError(error)

for (key <- Seq(executorOptsKey, driverOptsKey)) {
if (getOption(key).isDefined) {
throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.")
} else {
logWarning(s"Setting '$key' to '$value' as a work-around.")
set(key, value)
}
}
}

sys.env.get("SPARK_CLASSPATH").foreach { value =>
val error =
s"""
|SPARK_CLASSPATH was detected (set to '$value').
| This has undefined behavior when running on a cluster and is deprecated in Spark 1.0+.
|
|Please instead use:
| - ./spark-submit with --driver-class-path to augment the driver classpath
| - spark.executor.executor.extraClassPath to augment the executor classpath
""".stripMargin
logError(error)

for (key <- Seq(executorClasspathKey, driverClassPathKey)) {
if (getOption(key).isDefined) {
throw new SparkException(s"Found both $key and SPARK_CLASSPATH. Use only the former.")
} else {
logWarning(s"Setting '$key' to '$value' as a work-around.")
set(key, value)
}
}
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class SparkContext(config: SparkConf) extends Logging {
this(master, appName, sparkHome, jars, Map(), Map())

private[spark] val conf = config.clone()
conf.validateSettings()

/**
* Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,19 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
System.getenv().foreach{case (k, v) => env(k) = v}

val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathEntries = sys.props.get("spark.driver.extraClassPath").toSeq.flatMap { cp =>

val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathEntries = sys.props.get("spark.driver.extraLibraryPath").toSeq.flatMap { cp =>

val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val javaOpts = sys.props.get("spark.driver.extraJavaOptions")

val javaOptionsConf = "spark.driver.extraJavaOptions"
val javaOpts = sys.props.get(javaOptionsConf)
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)

Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ object SparkSubmit {
new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraClassPath"),
new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true,
sysProp = "spark.driver.extraJavaOptions"),
sysProp = "spark.driver.extraJavaOpts"),
new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraLibraryPath"),

Expand Down Expand Up @@ -259,7 +259,9 @@ object SparkSubmit {
try {
properties.load(inputStream)
} catch {
case e: IOException => throw new SparkException(s"Failed when loading Spark properties file ${file.getName}", e)
case e: IOException =>
val message = s"Failed when loading Spark properties file ${file.getName}"
throw new SparkException(message, e)
}
properties.stringPropertyNames().toSeq.map(k => (k, properties(k)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,13 @@ private[spark] class CoarseMesosSchedulerBackend(
val environment = Environment.newBuilder()
val extraClassPath = conf.getOption("spark.executor.extraClassPath")
extraClassPath.foreach { cp =>
envrionment.addVariables(Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
}
val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions", "")
val extraLibraryPath = conf.getOption("spark.executor.extraLibraryPath").map(p => s"-Djava.library.path=$p")
val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions")

val libraryPathOption = "spark.executor.extraLibraryPath"
val extraLibraryPath = conf.getOption(libraryPathOption).map(p => s"-Djava.library.path=$p")
val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")

sc.testExecutorEnvs.foreach { case (key, value) =>
Expand Down
7 changes: 3 additions & 4 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,9 @@ Apart from these, the following properties are also available, and may be useful
<td>
A string of extra JVM options to pass to executors. For instance, GC settings or other
logging. Note that it is illegal to set Spark properties or heap size settings with this
option.
option. Spark properties should be set using a SparkConf object or the
spark-defaults.conf file used with the spark-submit script. Heap size settings can be set
with spark.executor.memory.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -694,9 +696,6 @@ The following variables can be set in `spark-env.sh`:
* `JAVA_HOME`, the location where Java is installed (if it's not on your default `PATH`)
* `PYSPARK_PYTHON`, the Python binary to use for PySpark
* `SPARK_LOCAL_IP`, to configure which IP address of the machine to bind to.
* `SPARK_CLASSPATH`, to add elements to Spark's classpath that you want to be present for _all_ applications.
Note that applications can also add dependencies for themselves through `SparkContext.addJar` -- we recommend
doing that when possible.
* `SPARK_PUBLIC_DNS`, the hostname your Spark program will advertise to other machines.
* Options for the Spark [standalone cluster scripts](spark-standalone.html#cluster-launch-scripts), such as number of cores
to use on each machine and maximum memory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ import org.apache.spark.{Logging, SparkConf}
* Client submits an application to the YARN ResourceManager.
*
* Depending on the deployment mode this will launch one of two application master classes:
* 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]] which embeds a driver.
* 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]].
* 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]]
* which launches a driver program inside of the cluster.
* 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]] to
* request executors on behalf of a driver running outside of the cluster.
*/
trait ClientBase extends Logging {
val args: ClientArguments
Expand Down Expand Up @@ -263,7 +265,8 @@ trait ClientBase extends Logging {
val env = new HashMap[String, String]()

val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP), env, extraCp)
ClientBase.populateClasspath(yarnConf, sparkConf,
localResources.contains(ClientBase.LOG4J_PROP), env, extraCp)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
Expand Down Expand Up @@ -342,7 +345,6 @@ trait ClientBase extends Logging {
for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
JAVA_OPTS += s"-D$k=$v"
}
// TODO: honor driver classpath here: sys.props.get("spark.driver.classPath")
sys.props.get("spark.driver.extraJavaOptions").foreach(opts => JAVA_OPTS += opts)
sys.props.get("spark.driver.libraryPath").foreach(p => JAVA_OPTS += s"-Djava.library.path=$p")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ trait ExecutorRunnableUtil extends Logging {
val env = new HashMap[String, String]()

val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env, extraCp)
ClientBase.populateClasspath(yarnConf, sparkConf,
System.getenv("SPARK_YARN_LOG4J_PATH") != null, env, extraCp)

// Allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
Expand Down

0 comments on commit afc9ed8

Please sign in to comment.