diff --git a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala index d3b38601ccc4f..e5ae20299f48d 100644 --- a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala +++ b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala @@ -17,11 +17,11 @@ package org.apache.spark -import java.io.File +import java.io.{BufferedInputStream, File, FileInputStream} import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.databind.exc.MismatchedInputException import org.json4s.{DefaultFormats, MappingException} -import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ import org.apache.spark.internal.Logging @@ -132,4 +132,20 @@ private[spark] object ResourceDiscoverer extends Logging { } } } + + def parseAllocatedFromJsonFile(resourcesFile: String): Map[String, ResourceInformation] = { + implicit val formats = DefaultFormats + // case class to make json4s parsing easy + case class JsonResourceInformation(val name: String, val addresses: Array[String]) + val resourceInput = new BufferedInputStream(new FileInputStream(resourcesFile)) + val resources = try { + parse(resourceInput).extract[Seq[JsonResourceInformation]] + } catch { + case e@(_: MappingException | _: MismatchedInputException | _: ClassCastException) => + throw new SparkException(s"Exception parsing the resources in $resourcesFile", e) + } finally { + resourceInput.close() + } + resources.map(r => (r.name, new ResourceInformation(r.name, r.addresses))).toMap + } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 878010df94657..6266ce62669e3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -365,29 +365,24 @@ class SparkContext(config: SparkConf) extends Logging { /** * Checks to see if any resources (GPU/FPGA/etc) are available to the driver by looking - * at and processing the spark.driver.resource.resourceName.addresses and + * at and processing the spark.driver.resourcesFile and * spark.driver.resource.resourceName.discoveryScript configs. The configs have to be * present when the driver starts, setting them after startup does not work. * - * If any resource addresses configs were specified then assume all resources will be specified - * in that way. Otherwise use the discovery scripts to find the resources. Users should - * not really be setting the addresses config directly and should not be mixing methods - * for different types of resources since the addresses config is meant for Standalone mode + * If a resources file was specified then assume all resources will be specified + * in that file. Otherwise use the discovery scripts to find the resources. Users should + * not be setting the resources file config directly and should not be mixing methods + * for different types of resources since the resources file config is meant for Standalone mode * and other cluster managers should use the discovery scripts. */ private def setupDriverResources(): Unit = { // Only call getAllWithPrefix once and filter on those since there could be a lot of spark // configs. val allDriverResourceConfs = _conf.getAllWithPrefix(SPARK_DRIVER_RESOURCE_PREFIX) - val resourcesWithAddrsInConfs = - SparkConf.getConfigsWithSuffix(allDriverResourceConfs, SPARK_RESOURCE_ADDRESSES_SUFFIX) - - _resources = if (resourcesWithAddrsInConfs.nonEmpty) { - resourcesWithAddrsInConfs.map { case (rName, addrString) => - val addrsArray = addrString.split(",").map(_.trim()) - (rName -> new ResourceInformation(rName, addrsArray)) - }.toMap - } else { + val resourcesFile = _conf.get(DRIVER_RESOURCES_FILE) + _resources = resourcesFile.map { rFile => { + ResourceDiscoverer.parseAllocatedFromJsonFile(rFile) + }}.getOrElse { // we already have the resources confs here so just pass in the unique resource names // rather then having the resource discoverer reparse all the configs. val uniqueResources = SparkConf.getBaseOfConfigs(allDriverResourceConfs) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index fac4d40a1c5af..b262c235d06c2 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -87,25 +87,11 @@ private[spark] class CoarseGrainedExecutorBackend( def parseOrFindResources(resourcesFile: Option[String]): Map[String, ResourceInformation] = { // only parse the resources if a task requires them val resourceInfo = if (env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX).nonEmpty) { - val actualExecResources = resourcesFile.map { resourceFileStr => { - val source = new BufferedInputStream(new FileInputStream(resourceFileStr)) - val resourceMap = try { - val parsedJson = parse(source).asInstanceOf[JArray].arr - parsedJson.map { json => - val name = (json \ "name").extract[String] - val addresses = (json \ "addresses").extract[Array[String]] - new ResourceInformation(name, addresses) - }.map(x => (x.name -> x)).toMap - } catch { - case e @ (_: MappingException | _: MismatchedInputException) => - throw new SparkException( - s"Exception parsing the resources in $resourceFileStr", e) - } finally { - source.close() - } - resourceMap - }}.getOrElse(ResourceDiscoverer.discoverResourcesInformation(env.conf, - SPARK_EXECUTOR_RESOURCE_PREFIX)) + val actualExecResources = resourcesFile.map { rFile => { + ResourceDiscoverer.parseAllocatedFromJsonFile(rFile) + }}.getOrElse { + ResourceDiscoverer.discoverResourcesInformation(env.conf, SPARK_EXECUTOR_RESOURCE_PREFIX) + } if (actualExecResources.isEmpty) { throw new SparkException("User specified resources per task via: " + diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 469b54c842db0..a5d36b590d617 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -36,9 +36,17 @@ package object config { private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource." private[spark] val SPARK_RESOURCE_COUNT_SUFFIX = ".count" - private[spark] val SPARK_RESOURCE_ADDRESSES_SUFFIX = ".addresses" private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = ".discoveryScript" + private[spark] val DRIVER_RESOURCES_FILE = + ConfigBuilder("spark.driver.resourcesFile") + .internal() + .doc("Path to a file containing the resources allocated to the driver. " + + "The file should be formatted as a JSON array of ResourceInformation objects. " + + "Only used internally in standalone mode.") + .stringConf + .createOptional + private[spark] val DRIVER_CLASS_PATH = ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index abd7d8ac01639..ded914d3d896f 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -29,10 +29,13 @@ import scala.concurrent.duration._ import com.google.common.io.Files import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} +import org.json4s.JsonAST.JArray +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods.{compact, render} import org.scalatest.Matchers._ import org.scalatest.concurrent.Eventually @@ -760,19 +763,30 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } - test("test gpu driver resource address and discovery under local-cluster mode") { + private def writeJsonFile(dir: File, strToWrite: JArray): String = { + val f1 = File.createTempFile("test-resource-parser1", "", dir) + JavaFiles.write(f1.toPath(), compact(render(strToWrite)).getBytes()) + f1.getPath() + } + + test("test gpu driver resource files and discovery under local-cluster mode") { withTempDir { dir => val gpuFile = new File(dir, "gpuDiscoverScript") val scriptPath = mockDiscoveryScript(gpuFile, """'{"name": "gpu","addresses":["5", "6"]}'""") + val gpusAllocated = + ("name" -> "gpu") ~ + ("addresses" -> Seq("0", "1", "8")) + val ja = JArray(List(gpusAllocated)) + val resourcesFile = writeJsonFile(dir, ja) + val conf = new SparkConf() .set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_COUNT_SUFFIX, "1") - .set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" + - SPARK_RESOURCE_ADDRESSES_SUFFIX, "0, 1, 8") .set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" + SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath) + .set(DRIVER_RESOURCES_FILE, resourcesFile) .setMaster("local-cluster[1, 1, 1024]") .setAppName("test-cluster") sc = new SparkContext(conf) diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 43913d1ddb022..b3d16d179cb42 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.executor -import java.io.{File, PrintWriter} +import java.io.File import java.net.URL import java.nio.charset.StandardCharsets import java.nio.file.{Files => JavaFiles} @@ -26,7 +26,7 @@ import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, O import java.util.EnumSet import com.google.common.io.Files -import org.json4s.JsonAST.{JArray, JObject, JString} +import org.json4s.JsonAST.{JArray, JObject} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, render} import org.mockito.Mockito.when