Skip to content

Commit

Permalink
Additional improvements.
Browse files Browse the repository at this point in the history
- Made driver & executor options consistent.
- Some doc improvements to YARN code.
- Handled special flags on YARN.
  • Loading branch information
pwendell committed Apr 13, 2014
1 parent ace4ead commit b08893b
Show file tree
Hide file tree
Showing 15 changed files with 96 additions and 53 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
throw new Exception(msg)
}
if (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")) {
val msg = s"$executorOptsKey is not allowed to alter memory settings (was '$javaOpts'). Please use " +
"spark.executor.memory."
val msg = s"$executorOptsKey is not allowed to alter memory settings (was '$javaOpts'). Use " +
"spark.executor.memory instead."
throw new Exception(msg)
}
}
Expand Down
15 changes: 8 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -246,27 +246,28 @@ class SparkContext(config: SparkConf) extends Logging {
.map(Utils.memoryStringToMb)
.getOrElse(512)

// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
// Environment variables to pass to our executors.
// NOTE: This should only be used for test related settings.
private[spark] val testExecutorEnvs = HashMap[String, String]()

// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing"))
for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
testExecutorEnvs(envKey) = value
}
// The Mesos scheduler backend relies on this environment variable to set executor memory.
// TODO: Set this only in the Mesos scheduler.
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
executorEnvs ++= conf.getExecutorEnv
testExecutorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
testExecutorEnvs ++= conf.getExecutorEnv

// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Option {
Option(System.getProperty("user.name")).getOrElse(System.getenv("SPARK_USER"))
}.getOrElse {
SparkContext.SPARK_UNKNOWN_USER
}
executorEnvs("SPARK_USER") = sparkUser
testExecutorEnvs("SPARK_USER") = sparkUser

// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
System.getenv().foreach{case (k, v) => env(k) = v}

val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathEntries = sys.props.get("spark.driver.classPath").toSeq.flatMap { cp =>
val classPathEntries = sys.props.get("spark.driver.extraClassPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathEntries = sys.props.get("spark.driver.libraryPath").toSeq.flatMap { cp =>
val libraryPathEntries = sys.props.get("spark.driver.extraLibraryPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val javaOpts = sys.props.get("spark.driver.extraJavaOptions")
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ object SparkSubmit {
new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),

new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.classPath"),
sysProp = "spark.driver.extraClassPath"),
new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true,
sysProp = "spark.driver.javaOpts"),
sysProp = "spark.driver.extraJavaOptions"),
new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.libraryPath"),
sysProp = "spark.driver.extraLibraryPath"),

new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,16 @@ private[spark] class SparkDeploySchedulerBackend(
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}",
"{{CORES}}", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
val classPathEntries = sys.props.get("spark.executor.extraClassPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathEntries = sys.props.get("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}

// TODO (pwendell) LOOK AT THIS
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs,
Seq(), Seq(), extraJavaOpts)
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.testExecutorEnvs,
classPathEntries, libraryPathEntries, extraJavaOpts)
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,15 @@ private[spark] class CoarseMesosSchedulerBackend(

def createCommand(offer: Offer, numCores: Int): CommandInfo = {
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
val extraClassPath = conf.getOption("spark.executor.extraClassPath")
extraClassPath.foreach { cp =>
envrionment.addVariables(Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
}
val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions", "")
val extraLibraryPath = conf.getOption("spark.executor.extraLibraryPath").map(p => s"-Djava.library.path=$p")
val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")

sc.testExecutorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
.setValue(value)
Expand All @@ -123,7 +131,7 @@ private[spark] class CoarseMesosSchedulerBackend(
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val extraOpts = conf.get("spark.executor.extraJavaOptions")

val uri = conf.get("spark.executor.uri", null)
if (uri == null) {
val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[spark] class MesosSchedulerBackend(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
sc.testExecutorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
.setValue(value)
Expand Down
16 changes: 16 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,22 @@ Apart from these, the following properties are also available, and may be useful
option.
</td>
</tr>
<tr>
<td>spark.executor.extraClassPath</td>
<td>(none)</td>
<td>
Extra classpath entries to append to the classpath of executors. This exists primarily
for backwards-compatiblity with older versions of Spark. Users typically should not need
to set this option.
</td>
</tr>
<tr>
<td>spark.executor.extraLibraryPath</td>
<td>(none)</td>
<td>
Set a special library path to use when launching executor JVM's.
</td>
</tr>

</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import org.apache.hadoop.yarn.util.{Apps, Records}

import org.apache.spark.{Logging, SparkConf}


/**
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API.
*/
class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
extends YarnClientImpl with ClientBase with Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ import org.apache.spark.{Logging, SparkConf}

/**
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
* Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
* which will launch a Spark master process and negotiate resources throughout its duration.
* Client submits an application to the YARN ResourceManager.
*
* Depending on the deployment mode this will launch one of two application master classes:
* 1. In standalone mode, it will launch an [[org.apache.spark.deploy.yarn.ApplicationMaster]] which embeds a driver.
* 2. In client mode, it will launch an [[org.apache.spark.deploy.yarn.ExecutorLauncher]].
*/
trait ClientBase extends Logging {
val args: ClientArguments
Expand Down Expand Up @@ -259,8 +262,8 @@ trait ClientBase extends Logging {

val env = new HashMap[String, String]()

ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP),
env)
val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
ClientBase.populateClasspath(yarnConf, sparkConf, localResources.contains(ClientBase.LOG4J_PROP), env, extraCp)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
Expand All @@ -272,9 +275,6 @@ trait ClientBase extends Logging {
// Allow users to specify some environment variables.
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))

// Add each SPARK_* key to the environment.
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }

env
}

Expand Down Expand Up @@ -427,29 +427,29 @@ object ClientBase {
}
}

def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String],
extraClassPath: Option[String] = None) {

/** Add entry to the classpath. */
def addClasspathEntry(entry: String) = pps.addToEnvironment(env, Environment.CLASSPATH.name, entry)
/** Add entry to the classpath. Interpreted as a path relative to the working directory. */
def addPwdClasspathEntry(path: String) = addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + entry)

extraClassPath.foreach(addClasspathEntry)

addClasspathEntry(Environment.PWD.$())
// If log4j present, ensure ours overrides all others
if (addLog4j) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP)
}
if (addLog4j) addPwdClasspathEntry(LOG4J_PROP)
// Normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)

if (!userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR)
if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
addPwdClasspathEntry(APP_JAR)
addPwdClasspathEntry(SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)
} else {
addPwdClasspathEntry(SPARK_JAR)
ClientBase.populateHadoopClasspath(conf, env)
addPwdClasspathEntry(APP_JAR)
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*")
addPwdClasspathEntry("*")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ trait ExecutorRunnableUtil extends Logging {
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()

ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env, extraCp)

// Allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils


/**
* An application master that runs the users driver program and allocates executors.
*/
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkConf: SparkConf) extends Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ import org.apache.spark.{Logging, SparkConf}


/**
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
* Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
* which will launch a Spark master process and negotiate resources throughout its duration.
* Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API.
*/
class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
extends YarnClientImpl with ClientBase with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest

/**
* An application master that allocates executors on behalf of a driver that is running outside the cluster.
*
* This is used only in yarn-client mode.
*/
class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ object AllocationType extends Enumeration {
// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
// more info on how we are requesting for containers.

/**
* Acquires resources for executors from a ResourceMAnager and launches executors in new containers.
*/
private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val amClient: AMRMClient[ContainerRequest],
Expand Down

0 comments on commit b08893b

Please sign in to comment.