Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mccheah committed Mar 3, 2017
1 parent 3c4aff2 commit 7321a3e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,17 @@ private[spark] class Client(
.getOrElse(Array.empty[String])

// Memory settings
private val driverMemory = sparkConf.get("spark.driver.memory", "1g")
private val driverMemoryBytes = Utils.byteStringAsBytes(driverMemory)
private val driverSubmitServerMemory = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY)
private val driverSubmitServerMemoryBytes = Utils.byteStringAsBytes(driverSubmitServerMemoryBytes)
private val driverContainerMemoryBytes = driverMemoryBytes + driverSubmitServerMemoryBytes
private val memoryOverheadBytes = sparkConf
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
private val driverSubmitServerMemoryMb = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY)
private val driverSubmitServerMemoryString = sparkConf.get(
KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.key,
KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.defaultValueString)
private val driverContainerMemoryMb = driverMemoryMb + driverSubmitServerMemoryMb
private val memoryOverheadMb = sparkConf
.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
.map(overhead => Utils.byteStringAsBytes(overhead))
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverContainerMemoryBytes).toInt,
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverContainerMemoryMb).toInt,
MEMORY_OVERHEAD_MIN))
private val driverContainerMemoryWithOverhead = driverContainerMemoryBytes + memoryOverheadBytes
private val driverContainerMemoryWithOverhead = driverContainerMemoryMb + memoryOverheadMb

private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION)

Expand Down Expand Up @@ -387,10 +387,10 @@ private[spark] class Client(
.withNewPort(SUBMISSION_SERVER_PORT_NAME)
.build()
val driverMemoryQuantity = new QuantityBuilder(false)
.withAmount(driverContainerMemoryBytes.toString)
.withAmount(s"${driverContainerMemoryMb}M")
.build()
val driverMemoryLimitQuantity = new QuantityBuilder(false)
.withAmount(driverContainerMemoryWithOverhead.toString)
.withAmount(s"${driverContainerMemoryWithOverhead}M")
.build()
kubernetesClient.pods().createNew()
.withNewMetadata()
Expand Down Expand Up @@ -428,7 +428,7 @@ private[spark] class Client(
// Note that SPARK_DRIVER_MEMORY only affects the REST server via spark-class.
.addNewEnv()
.withName(ENV_DRIVER_MEMORY)
.withValue(driverSubmitServerMemory)
.withValue(driverSubmitServerMemoryString)
.endEnv()
.addToEnv(sslConfiguration.sslPodEnvVars: _*)
.withNewResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit

import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

package object config {

Expand Down Expand Up @@ -103,7 +104,7 @@ package object config {
| overheads, etc. This tends to grow with the executor size
| (typically 6-10%).
""".stripMargin)
.stringConf
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
Expand All @@ -115,7 +116,7 @@ package object config {
| interned strings, other native overheads, etc. This tends
| to grow with the driver's memory size (typically 6-10%).
""".stripMargin)
.stringConf
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val KUBERNETES_DRIVER_LABELS =
Expand Down Expand Up @@ -173,8 +174,8 @@ package object config {
.doc("""
| The amount of memory to allocate for the driver submission server.
""".stripMargin)
.stringConf
.createWithDefault("256m")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("256m")

private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT =
ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
.getOrElse(
throw new SparkException("Must specify the driver pod name"))

private val executorMemory = conf.get("spark.executor.memory", "1g")
private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory)
private val executorMemoryMb = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
private val executorMemoryString = conf.get(
org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)

private val memoryOverheadBytes = conf
private val memoryOverheadMb = conf
.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
.map(overhead => Utils.byteStringAsBytes(overhead))
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryBytes).toInt,
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt,
MEMORY_OVERHEAD_MIN))
private val executorMemoryWithOverhead = executorMemoryBytes + memoryOverheadBytes
private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb

private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1")

Expand Down Expand Up @@ -165,10 +166,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
val selectors = Map(SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> applicationId()).asJava
val executorMemoryQuantity = new QuantityBuilder(false)
.withAmount(executorMemoryBytes.toString)
.withAmount(s"${executorMemoryMb}M")
.build()
val executorMemoryLimitQuantity = new QuantityBuilder(false)
.withAmount(executorMemoryWithOverhead.toString)
.withAmount(s"${executorMemoryWithOverhead}M")
.build()
val executorCpuQuantity = new QuantityBuilder(false)
.withAmount(executorCores)
Expand All @@ -177,7 +178,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
(ENV_EXECUTOR_PORT, executorPort.toString),
(ENV_DRIVER_URL, driverUrl),
(ENV_EXECUTOR_CORES, executorCores),
(ENV_EXECUTOR_MEMORY, executorMemory),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, applicationId()),
(ENV_EXECUTOR_ID, executorId)
).map(env => new EnvVarBuilder()
Expand Down

0 comments on commit 7321a3e

Please sign in to comment.