Skip to content

Commit

Permalink
[SPARK-14602][YARN] Use SparkConf to propagate the list of cached files.
Browse files Browse the repository at this point in the history
This change avoids using the environment to pass this information, since
with many jars it's easy to hit limits on certain OSes. Instead, it encodes
the information into the Spark configuration propagated to the AM.

The first problem that needed to be solved is a chicken & egg issue: the
config file is distributed using the cache, and it needs to contain information
about the files that are being distributed. To solve that, the code now treats
the config archive especially, and uses slightly different code to distribute
it, so that only its cache path needs to be saved to the config file.

The second problem is that the extra information would show up in the Web UI,
which made the environment tab even more noisy than it already is when lots
of jars are listed. This is solved by two changes: the list of cached files
is now read only once in the AM, and propagated down to the ExecutorRunnable
code (which actually sends the list to the NMs when starting containers). The
second change is to unset those config entries after the list is read, so that
the SparkContext never sees them.

Tested with both client and cluster mode by running "run-example SparkPi". This
uploads a whole lot of files when run from a build dir (instead of a distribution,
where the list is cleaned up), and I verified that the configs do not show
up in the UI.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #12487 from vanzin/SPARK-14602.
  • Loading branch information
Marcelo Vanzin committed Apr 20, 2016
1 parent 334c293 commit f47dbf2
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 175 deletions.
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}

private[spark] def remove(entry: ConfigEntry[_]): SparkConf = {
remove(entry.key)
}

/** Get a parameter; throws a NoSuchElementException if it's not set */
def get(key: String): String = {
getOption(key).getOrElse(throw new NoSuchElementException(key))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ package org.apache.spark.deploy.yarn

import java.io.{File, IOException}
import java.lang.reflect.InvocationTargetException
import java.net.{Socket, URL}
import java.net.{Socket, URI, URL}
import java.util.concurrent.atomic.AtomicReference

import scala.collection.mutable.HashMap
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -120,6 +122,61 @@ private[spark] class ApplicationMaster(

private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None

// Load the list of localized files set by the client. This is used when launching executors,
// and is loaded here so that these configs don't pollute the Web UI's environment page in
// cluster mode.
private val localResources = {
logInfo("Preparing Local resources")
val resources = HashMap[String, LocalResource]()

def setupDistributedCache(
file: String,
rtype: LocalResourceType,
timestamp: String,
size: String,
vis: String): Unit = {
val uri = new URI(file)
val amJarRsrc = Records.newRecord(classOf[LocalResource])
amJarRsrc.setType(rtype)
amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
amJarRsrc.setTimestamp(timestamp.toLong)
amJarRsrc.setSize(size.toLong)

val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName())
resources(fileName) = amJarRsrc
}

val distFiles = sparkConf.get(CACHED_FILES)
val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
val resTypes = sparkConf.get(CACHED_FILES_TYPES)

for (i <- 0 to distFiles.size - 1) {
val resType = LocalResourceType.valueOf(resTypes(i))
setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString,
visibilities(i))
}

// Distribute the conf archive to executors.
sparkConf.get(CACHED_CONF_ARCHIVE).foreach { uri =>
val fs = FileSystem.get(new URI(uri), yarnConf)
val status = fs.getFileStatus(new Path(uri))
setupDistributedCache(uri, LocalResourceType.ARCHIVE, status.getModificationTime().toString,
status.getLen.toString, LocalResourceVisibility.PRIVATE.name())
}

// Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy).
CACHE_CONFIGS.foreach { e =>
sparkConf.remove(e)
sys.props.remove(e.key)
}

logInfo("Prepared Local resources " + resources)
resources.toMap
}

def getAttemptId(): ApplicationAttemptId = {
client.getAttemptId()
}
Expand Down Expand Up @@ -292,7 +349,8 @@ private[spark] class ApplicationMaster(
_sparkConf,
uiAddress,
historyAddress,
securityMgr)
securityMgr,
localResources)

allocator.allocateResources()
reporterThread = launchReporterThread()
Expand Down
42 changes: 34 additions & 8 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,14 @@ private[spark] class Client(
private[yarn] def copyFileToRemote(
destDir: Path,
srcPath: Path,
replication: Short): Path = {
replication: Short,
force: Boolean = false,
destName: Option[String] = None): Path = {
val destFs = destDir.getFileSystem(hadoopConf)
val srcFs = srcPath.getFileSystem(hadoopConf)
var destPath = srcPath
if (!compareFs(srcFs, destFs)) {
destPath = new Path(destDir, srcPath.getName())
if (force || !compareFs(srcFs, destFs)) {
destPath = new Path(destDir, destName.getOrElse(srcPath.getName()))
logInfo(s"Uploading resource $srcPath -> $destPath")
FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
destFs.setReplication(destPath, replication)
Expand Down Expand Up @@ -553,12 +555,37 @@ private[spark] class Client(
distribute(f, targetDir = targetDir)
}

// Distribute an archive with Hadoop and Spark configuration for the AM and executors.
// Update the configuration with all the distributed files, minus the conf archive. The
// conf archive will be handled by the AM differently so that we avoid having to send
// this configuration by other means. See SPARK-14602 for one reason of why this is needed.
distCacheMgr.updateConfiguration(sparkConf)

// Upload the conf archive to HDFS manually, and record its location in the configuration.
// This will allow the AM to know where the conf archive is in HDFS, so that it can be
// distributed to the containers.
//
// This code forces the archive to be copied, so that unit tests pass (since in that case both
// file systems are the same and the archive wouldn't normally be copied). In most (all?)
// deployments, the archive would be copied anyway, since it's a temp file in the local file
// system.
val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE)
val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf)
sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString())

val localConfArchive = new Path(createConfArchive().toURI())
copyFileToRemote(destDir, localConfArchive, replication, force = true,
destName = Some(LOCALIZED_CONF_ARCHIVE))

val (_, confLocalizedPath) = distribute(createConfArchive().toURI().getPath(),
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_CONF_DIR))
require(confLocalizedPath != null)

// Clear the cache-related entries from the configuration to avoid them polluting the
// UI's environment page. This works for client mode; for cluster mode, this is handled
// by the AM.
CACHE_CONFIGS.foreach(sparkConf.remove)

localResources
}

Expand Down Expand Up @@ -787,10 +814,6 @@ private[spark] class Client(
val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)

// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(launchEnv)
distCacheMgr.setDistArchivesEnv(launchEnv)

val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources.asJava)
amContainer.setEnvironment(launchEnv.asJava)
Expand Down Expand Up @@ -1150,6 +1173,9 @@ private object Client extends Logging {
// Subdirectory where the user's Spark and Hadoop config files will be placed.
val LOCALIZED_CONF_DIR = "__spark_conf__"

// File containing the conf archive in the AM. See prepareLocalResources().
val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip"

// Name of the file in the conf archive containing Spark configuration.
val SPARK_CONF_FILE = "__spark_conf__.properties"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,29 @@ package org.apache.spark.deploy.yarn

import java.net.URI

import scala.collection.mutable.{HashMap, LinkedHashMap, Map}
import scala.collection.mutable.{HashMap, ListBuffer, Map}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging

private case class CacheEntry(
uri: URI,
size: Long,
modTime: Long,
visibility: LocalResourceVisibility,
resType: LocalResourceType)

/** Client side methods to setup the Hadoop distributed cache */
private[spark] class ClientDistributedCacheManager() extends Logging {

// Mappings from remote URI to (file status, modification time, visibility)
private val distCacheFiles: Map[String, (String, String, String)] =
LinkedHashMap[String, (String, String, String)]()
private val distCacheArchives: Map[String, (String, String, String)] =
LinkedHashMap[String, (String, String, String)]()

private val distCacheEntries = new ListBuffer[CacheEntry]()

/**
* Add a resource to the list of distributed cache resources. This list can
Expand Down Expand Up @@ -72,61 +76,33 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
amJarRsrc.setTimestamp(destStatus.getModificationTime())
amJarRsrc.setSize(destStatus.getLen())
if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
require(link != null && link.nonEmpty, "You must specify a valid link name.")
localResources(link) = amJarRsrc

if (!appMasterOnly) {
val uri = destPath.toUri()
val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
if (resourceType == LocalResourceType.FILE) {
distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(),
destStatus.getModificationTime().toString(), visibility.name())
} else {
distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(),
destStatus.getModificationTime().toString(), visibility.name())
}
distCacheEntries += CacheEntry(pathURI, destStatus.getLen(), destStatus.getModificationTime(),
visibility, resourceType)
}
}

/**
* Adds the necessary cache file env variables to the env passed in
* Writes down information about cached files needed in executors to the given configuration.
*/
def setDistFilesEnv(env: Map[String, String]): Unit = {
val (keys, tupleValues) = distCacheFiles.unzip
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
if (keys.size > 0) {
env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
}
}

/**
* Adds the necessary cache archive env variables to the env passed in
*/
def setDistArchivesEnv(env: Map[String, String]): Unit = {
val (keys, tupleValues) = distCacheArchives.unzip
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
if (keys.size > 0) {
env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
}
def updateConfiguration(conf: SparkConf): Unit = {
conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString))
conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size))
conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime))
conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name()))
conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name()))
}

/**
* Returns the local resource visibility depending on the cache file permissions
* @return LocalResourceVisibility
*/
def getVisibility(
private[yarn] def getVisibility(
conf: Configuration,
uri: URI,
statCache: Map[URI, FileStatus]): LocalResourceVisibility = {
Expand All @@ -141,7 +117,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* Returns a boolean to denote whether a cache file is visible to all (public)
* @return true if the path in the uri is visible to all, false otherwise
*/
def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
val fs = FileSystem.get(uri, conf)
val current = new Path(uri.getPath())
// the leaf level file should be readable by others
Expand All @@ -157,7 +133,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* the directory hierarchy to the given path)
* @return true if all ancestors have the 'execute' permission set for all users
*/
def ancestorsHaveExecutePermissions(
private def ancestorsHaveExecutePermissions(
fs: FileSystem,
path: Path,
statCache: Map[URI, FileStatus]): Boolean = {
Expand All @@ -177,7 +153,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* imply the permission in the passed FsAction
* @return true if the path in the uri is visible to all, false otherwise
*/
def checkPermissionOfOther(
private def checkPermissionOfOther(
fs: FileSystem,
path: Path,
action: FsAction,
Expand All @@ -194,7 +170,10 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* it in the cache, and returns the FileStatus.
* @return FileStatus
*/
def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
private[yarn] def getFileStatus(
fs: FileSystem,
uri: URI,
statCache: Map[URI, FileStatus]): FileStatus = {
val stat = statCache.get(uri) match {
case Some(existstat) => existstat
case None =>
Expand Down
Loading

0 comments on commit f47dbf2

Please sign in to comment.