From 7321a3effb71ff8f65f664b1dc858c0b99514f6a Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 2 Mar 2017 16:17:42 -0800 Subject: [PATCH] Address comments --- .../spark/deploy/kubernetes/Client.scala | 24 +++++++++---------- .../spark/deploy/kubernetes/config.scala | 9 +++---- .../KubernetesClusterSchedulerBackend.scala | 19 ++++++++------- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index fc7ae954d00f2..24f8647a1641e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -64,17 +64,17 @@ private[spark] class Client( .getOrElse(Array.empty[String]) // Memory settings - private val driverMemory = sparkConf.get("spark.driver.memory", "1g") - private val driverMemoryBytes = Utils.byteStringAsBytes(driverMemory) - private val driverSubmitServerMemory = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY) - private val driverSubmitServerMemoryBytes = Utils.byteStringAsBytes(driverSubmitServerMemoryBytes) - private val driverContainerMemoryBytes = driverMemoryBytes + driverSubmitServerMemoryBytes - private val memoryOverheadBytes = sparkConf + private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverSubmitServerMemoryMb = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY) + private val driverSubmitServerMemoryString = sparkConf.get( + KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.key, + KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.defaultValueString) + private val driverContainerMemoryMb = driverMemoryMb + driverSubmitServerMemoryMb + private val memoryOverheadMb = sparkConf .get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) - .map(overhead => Utils.byteStringAsBytes(overhead)) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverContainerMemoryBytes).toInt, + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverContainerMemoryMb).toInt, MEMORY_OVERHEAD_MIN)) - private val driverContainerMemoryWithOverhead = driverContainerMemoryBytes + memoryOverheadBytes + private val driverContainerMemoryWithOverhead = driverContainerMemoryMb + memoryOverheadMb private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION) @@ -387,10 +387,10 @@ private[spark] class Client( .withNewPort(SUBMISSION_SERVER_PORT_NAME) .build() val driverMemoryQuantity = new QuantityBuilder(false) - .withAmount(driverContainerMemoryBytes.toString) + .withAmount(s"${driverContainerMemoryMb}M") .build() val driverMemoryLimitQuantity = new QuantityBuilder(false) - .withAmount(driverContainerMemoryWithOverhead.toString) + .withAmount(s"${driverContainerMemoryWithOverhead}M") .build() kubernetesClient.pods().createNew() .withNewMetadata() @@ -428,7 +428,7 @@ private[spark] class Client( // Note that SPARK_DRIVER_MEMORY only affects the REST server via spark-class. .addNewEnv() .withName(ENV_DRIVER_MEMORY) - .withValue(driverSubmitServerMemory) + .withValue(driverSubmitServerMemoryString) .endEnv() .addToEnv(sslConfiguration.sslPodEnvVars: _*) .withNewResources() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 978816565fb8f..c7f97b70e5181 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.{SPARK_VERSION => sparkVersion} import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit package object config { @@ -103,7 +104,7 @@ package object config { | overheads, etc. This tends to grow with the executor size | (typically 6-10%). """.stripMargin) - .stringConf + .bytesConf(ByteUnit.MiB) .createOptional private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD = @@ -115,7 +116,7 @@ package object config { | interned strings, other native overheads, etc. This tends | to grow with the driver's memory size (typically 6-10%). """.stripMargin) - .stringConf + .bytesConf(ByteUnit.MiB) .createOptional private[spark] val KUBERNETES_DRIVER_LABELS = @@ -173,8 +174,8 @@ package object config { .doc(""" | The amount of memory to allocate for the driver submission server. """.stripMargin) - .stringConf - .createWithDefault("256m") + .bytesConf(ByteUnit.MiB) + .createWithDefaultString("256m") private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT = ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index f2cf325b16eda..90907ff83ed84 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -60,15 +60,16 @@ private[spark] class KubernetesClusterSchedulerBackend( .getOrElse( throw new SparkException("Must specify the driver pod name")) - private val executorMemory = conf.get("spark.executor.memory", "1g") - private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory) + private val executorMemoryMb = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = conf.get( + org.apache.spark.internal.config.EXECUTOR_MEMORY.key, + org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) - private val memoryOverheadBytes = conf + private val memoryOverheadMb = conf .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) - .map(overhead => Utils.byteStringAsBytes(overhead)) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryBytes).toInt, + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt, MEMORY_OVERHEAD_MIN)) - private val executorMemoryWithOverhead = executorMemoryBytes + memoryOverheadBytes + private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1") @@ -165,10 +166,10 @@ private[spark] class KubernetesClusterSchedulerBackend( val selectors = Map(SPARK_EXECUTOR_ID_LABEL -> executorId, SPARK_APP_ID_LABEL -> applicationId()).asJava val executorMemoryQuantity = new QuantityBuilder(false) - .withAmount(executorMemoryBytes.toString) + .withAmount(s"${executorMemoryMb}M") .build() val executorMemoryLimitQuantity = new QuantityBuilder(false) - .withAmount(executorMemoryWithOverhead.toString) + .withAmount(s"${executorMemoryWithOverhead}M") .build() val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCores) @@ -177,7 +178,7 @@ private[spark] class KubernetesClusterSchedulerBackend( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), (ENV_EXECUTOR_CORES, executorCores), - (ENV_EXECUTOR_MEMORY, executorMemory), + (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId()), (ENV_EXECUTOR_ID, executorId) ).map(env => new EnvVarBuilder()