Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27835][Core] Resource Scheduling: change driver config from addresses #24730

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.spark

import java.io.File
import java.io.{BufferedInputStream, File, FileInputStream}

import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.exc.MismatchedInputException
import org.json4s.{DefaultFormats, MappingException}
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -132,4 +132,20 @@ private[spark] object ResourceDiscoverer extends Logging {
}
}
}

def parseAllocatedFromJsonFile(resourcesFile: String): Map[String, ResourceInformation] = {
implicit val formats = DefaultFormats
// case class to make json4s parsing easy
case class JsonResourceInformation(val name: String, val addresses: Array[String])
val resourceInput = new BufferedInputStream(new FileInputStream(resourcesFile))
val resources = try {
parse(resourceInput).extract[Seq[JsonResourceInformation]]
} catch {
case e@(_: MappingException | _: MismatchedInputException | _: ClassCastException) =>
throw new SparkException(s"Exception parsing the resources in $resourcesFile", e)
} finally {
resourceInput.close()
}
resources.map(r => (r.name, new ResourceInformation(r.name, r.addresses))).toMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could Seq[JsonResourceInformation] contain duplicated name? might be (very marginally) better to do
resource.toMap.map(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm missing what you are saying here with the toMap.map? I can't do the toMap until I do the first map to (name, ResourceInformation), otherwise you just have a Seq[JsonResourceInformation] and toMap doesn't know how to make that a map. If there are 2 resource with the same name when the current code runs the toMap will choose the last one.

I had actually tested this and found the json4s parse and extract actually are handling duplicates as well, looks like it chooses the last one, I couldn't find docs on that behavior though either. The resourcesfile is built by the standalone master/worker so it shouldn't really have duplicates. I'm happy to update though to be explicit so just let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes ;) I just mean it walking through a Seq when the goal is a Map

}
}
23 changes: 9 additions & 14 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -365,29 +365,24 @@ class SparkContext(config: SparkConf) extends Logging {

/**
* Checks to see if any resources (GPU/FPGA/etc) are available to the driver by looking
* at and processing the spark.driver.resource.resourceName.addresses and
* at and processing the spark.driver.resourcesFile and
* spark.driver.resource.resourceName.discoveryScript configs. The configs have to be
* present when the driver starts, setting them after startup does not work.
*
* If any resource addresses configs were specified then assume all resources will be specified
* in that way. Otherwise use the discovery scripts to find the resources. Users should
* not really be setting the addresses config directly and should not be mixing methods
* for different types of resources since the addresses config is meant for Standalone mode
* If a resources file was specified then assume all resources will be specified
* in that file. Otherwise use the discovery scripts to find the resources. Users should
* not be setting the resources file config directly and should not be mixing methods
* for different types of resources since the resources file config is meant for Standalone mode
* and other cluster managers should use the discovery scripts.
*/
private def setupDriverResources(): Unit = {
// Only call getAllWithPrefix once and filter on those since there could be a lot of spark
// configs.
val allDriverResourceConfs = _conf.getAllWithPrefix(SPARK_DRIVER_RESOURCE_PREFIX)
val resourcesWithAddrsInConfs =
SparkConf.getConfigsWithSuffix(allDriverResourceConfs, SPARK_RESOURCE_ADDRESSES_SUFFIX)

_resources = if (resourcesWithAddrsInConfs.nonEmpty) {
resourcesWithAddrsInConfs.map { case (rName, addrString) =>
val addrsArray = addrString.split(",").map(_.trim())
(rName -> new ResourceInformation(rName, addrsArray))
}.toMap
} else {
val resourcesFile = _conf.get(DRIVER_RESOURCES_FILE)
_resources = resourcesFile.map { rFile => {
ResourceDiscoverer.parseAllocatedFromJsonFile(rFile)
}}.getOrElse {
// we already have the resources confs here so just pass in the unique resource names
// rather then having the resource discoverer reparse all the configs.
val uniqueResources = SparkConf.getBaseOfConfigs(allDriverResourceConfs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,11 @@ private[spark] class CoarseGrainedExecutorBackend(
def parseOrFindResources(resourcesFile: Option[String]): Map[String, ResourceInformation] = {
// only parse the resources if a task requires them
val resourceInfo = if (env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX).nonEmpty) {
val actualExecResources = resourcesFile.map { resourceFileStr => {
val source = new BufferedInputStream(new FileInputStream(resourceFileStr))
val resourceMap = try {
val parsedJson = parse(source).asInstanceOf[JArray].arr
parsedJson.map { json =>
val name = (json \ "name").extract[String]
val addresses = (json \ "addresses").extract[Array[String]]
new ResourceInformation(name, addresses)
}.map(x => (x.name -> x)).toMap
} catch {
case e @ (_: MappingException | _: MismatchedInputException) =>
throw new SparkException(
s"Exception parsing the resources in $resourceFileStr", e)
} finally {
source.close()
}
resourceMap
}}.getOrElse(ResourceDiscoverer.discoverResourcesInformation(env.conf,
SPARK_EXECUTOR_RESOURCE_PREFIX))
val actualExecResources = resourcesFile.map { rFile => {
ResourceDiscoverer.parseAllocatedFromJsonFile(rFile)
}}.getOrElse {
ResourceDiscoverer.discoverResourcesInformation(env.conf, SPARK_EXECUTOR_RESOURCE_PREFIX)
}

if (actualExecResources.isEmpty) {
throw new SparkException("User specified resources per task via: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,17 @@ package object config {
private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource."

private[spark] val SPARK_RESOURCE_COUNT_SUFFIX = ".count"
private[spark] val SPARK_RESOURCE_ADDRESSES_SUFFIX = ".addresses"
private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = ".discoveryScript"

private[spark] val DRIVER_RESOURCES_FILE =
ConfigBuilder("spark.driver.resourcesFile")
.internal()
.doc("Path to a file containing the resources allocated to the driver. " +
"The file should be formatted as a JSON array of ResourceInformation objects. " +
"Only used internally in standalone mode.")
.stringConf
.createOptional

private[spark] val DRIVER_CLASS_PATH =
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional

Expand Down
22 changes: 18 additions & 4 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ import scala.concurrent.duration._

import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
import org.json4s.JsonAST.JArray
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, render}
import org.scalatest.Matchers._
import org.scalatest.concurrent.Eventually

Expand Down Expand Up @@ -760,19 +763,30 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}

test("test gpu driver resource address and discovery under local-cluster mode") {
private def writeJsonFile(dir: File, strToWrite: JArray): String = {
val f1 = File.createTempFile("test-resource-parser1", "", dir)
JavaFiles.write(f1.toPath(), compact(render(strToWrite)).getBytes())
f1.getPath()
}

test("test gpu driver resource files and discovery under local-cluster mode") {
withTempDir { dir =>
val gpuFile = new File(dir, "gpuDiscoverScript")
val scriptPath = mockDiscoveryScript(gpuFile,
"""'{"name": "gpu","addresses":["5", "6"]}'""")

val gpusAllocated =
("name" -> "gpu") ~
("addresses" -> Seq("0", "1", "8"))
val ja = JArray(List(gpusAllocated))
val resourcesFile = writeJsonFile(dir, ja)

val conf = new SparkConf()
.set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
SPARK_RESOURCE_COUNT_SUFFIX, "1")
.set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
SPARK_RESOURCE_ADDRESSES_SUFFIX, "0, 1, 8")
.set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
.set(DRIVER_RESOURCES_FILE, resourcesFile)
.setMaster("local-cluster[1, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
package org.apache.spark.executor


import java.io.{File, PrintWriter}
import java.io.File
import java.net.URL
import java.nio.charset.StandardCharsets
import java.nio.file.{Files => JavaFiles}
import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, OWNER_WRITE}
import java.util.EnumSet

import com.google.common.io.Files
import org.json4s.JsonAST.{JArray, JObject, JString}
import org.json4s.JsonAST.{JArray, JObject}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods.{compact, render}
import org.mockito.Mockito.when
Expand Down