From 3e92d44db372c51fa6db3b962372b42bdeeec1c4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 17 Aug 2014 12:47:56 -0700 Subject: [PATCH] Move local dirs override logic into Utils; fix bugs: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Now, the logic for determining the precedence of the different configuration options is in Utils.getOrCreateLocalRootDirs(). DiskBlockManager now accepts a SparkConf rather than a list of root directories and I’ve updated other tests to reflect this. --- .../org/apache/spark/executor/Executor.scala | 25 ---------- .../apache/spark/storage/BlockManager.scala | 3 +- .../spark/storage/DiskBlockManager.scala | 14 +++--- .../scala/org/apache/spark/util/Utils.scala | 50 ++++++++++++++++++- .../spark/storage/BlockManagerSuite.scala | 3 +- .../spark/storage/DiskBlockManagerSuite.scala | 4 +- 6 files changed, 60 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index f25c8167fd3b6..2f76e532aeb76 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -62,16 +62,6 @@ private[spark] class Executor( val conf = new SparkConf(true) conf.setAll(properties) - // If we are in yarn mode, systems can have different disk layouts so we must set it - // to what Yarn on this system said was available. This will be used later when SparkEnv - // created. - if (java.lang.Boolean.valueOf( - System.getProperty("SPARK_YARN_MODE", conf.getenv("SPARK_YARN_MODE")))) { - conf.set("spark.local.dir", getYarnLocalDirs(conf)) - } else if (conf.getenv("SPARK_LOCAL_DIRS") != null) { - conf.set("spark.local.dir", conf.getenv("SPARK_LOCAL_DIRS")) - } - if (!isLocal) { // Setup an uncaught exception handler for non-local mode. // Make any thread terminations due to uncaught exceptions kill the entire @@ -134,21 +124,6 @@ private[spark] class Executor( threadPool.shutdown() } - /** Get the Yarn approved local directories. */ - private def getYarnLocalDirs(conf: SparkConf): String = { - // Hadoop 0.23 and 2.x have different Environment variable names for the - // local dirs, so lets check both. We assume one of the 2 is set. - // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X - val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS")) - .getOrElse(Option(conf.getenv("LOCAL_DIRS")) - .getOrElse("")) - - if (localDirs.isEmpty) { - throw new Exception("Yarn Local dirs can't be empty") - } - localDirs - } - class TaskRunner( execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer) extends Runnable { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e4c3d58905e7f..7d710d96cf3b7 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -64,8 +64,7 @@ private[spark] class BlockManager( private val port = conf.getInt("spark.blockManager.port", 0) val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager) - val diskBlockManager = new DiskBlockManager(shuffleBlockManager, - conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) + val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf) val connectionManager = new ConnectionManager(port, conf, securityManager, "Connection manager for block manager") diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 4d66ccea211fa..741b42ec814dd 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -21,7 +21,7 @@ import java.io.File import java.text.SimpleDateFormat import java.util.{Date, Random, UUID} -import org.apache.spark.{SparkEnv, Logging} +import org.apache.spark.{SparkConf, SparkEnv, Logging} import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.network.netty.{PathResolver, ShuffleSender} import org.apache.spark.util.Utils @@ -33,9 +33,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager * However, it is also possible to have a block map to only a segment of a file, by calling * mapBlockToFileSegment(). * - * @param rootDirs The directories to use for storing block files. Data will be hashed among these. + * Block files are hashed among the directories listed in spark.local.dir (or in + * SPARK_LOCAL_DIRS, if it's set). */ -private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, rootDirs: String) +private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, conf: SparkConf) extends PathResolver with Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 @@ -46,7 +47,7 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, /* Create one local directory for each path mentioned in spark.local.dir; then, inside this * directory, create multiple subdirectories that we will hash files into, in order to avoid * having really large inodes at the top level. */ - val localDirs: Array[File] = createLocalDirs() + val localDirs: Array[File] = createLocalDirs(conf) if (localDirs.isEmpty) { logError("Failed to create any local dir.") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) @@ -131,10 +132,9 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, (blockId, getFile(blockId)) } - private def createLocalDirs(): Array[File] = { - logDebug(s"Creating local directories at root dirs '$rootDirs'") + private def createLocalDirs(conf: SparkConf): Array[File] = { val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") - rootDirs.split(",").flatMap { rootDir => + Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir => var foundLocalDir = false var localDir: File = null var localDirId: String = null diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 019f68b160894..939bfc9d43efb 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -451,10 +451,56 @@ private[spark] object Utils extends Logging { /** * Get a temporary directory using Spark's spark.local.dir property, if set. This will always * return a single directory, even though the spark.local.dir property might be a list of - * multiple paths. + * multiple paths. If the SPARK_LOCAL_DIRS environment variable is set, then this will return + * a directory from that variable. */ def getLocalDir(conf: SparkConf): String = { - conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0) + getOrCreateLocalRootDirs(conf)(0) + } + + /** + * Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS, + * and returns only the directories that exist / could be created. + */ + private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = { + val isYarn = java.lang.Boolean.valueOf( + System.getProperty("SPARK_YARN_MODE", conf.getenv("SPARK_YARN_MODE"))) + val confValue = if (isYarn) { + // If we are in yarn mode, systems can have different disk layouts so we must set it + // to what Yarn on this system said was available. + getYarnLocalDirs(conf) + } else { + Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse( + conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) + } + val rootDirs = confValue.split(',') + logDebug(s"Getting/creating local root dirs at '$rootDirs'") + + rootDirs.flatMap { rootDir => + val localDir: File = new File(rootDir) + val foundLocalDir = localDir.exists || localDir.mkdirs() + if (!foundLocalDir) { + logError(s"Failed to create local root dir in $rootDir. Ignoring this directory.") + None + } else { + Some(rootDir) + } + } + } + + /** Get the Yarn approved local directories. */ + private def getYarnLocalDirs(conf: SparkConf): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS")) + .getOrElse(Option(conf.getenv("LOCAL_DIRS")) + .getOrElse("")) + + if (localDirs.isEmpty) { + throw new Exception("Yarn Local dirs can't be empty") + } + localDirs } /** diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 20bac66105a69..f32ce6f9fcc7f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -825,8 +825,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter val blockManager = mock(classOf[BlockManager]) val shuffleBlockManager = mock(classOf[ShuffleBlockManager]) when(shuffleBlockManager.conf).thenReturn(conf) - val diskBlockManager = new DiskBlockManager(shuffleBlockManager, - System.getProperty("java.io.tmpdir")) + val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf) when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString)) val diskStoreMapped = new DiskStore(blockManager, diskBlockManager) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 777579bc570db..aabaeadd7a071 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -71,7 +71,9 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before } override def beforeEach() { - diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs) + val conf = testConf.clone + conf.set("spark.local.dir", rootDirs) + diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf) shuffleBlockManager.idToSegmentMap.clear() }