From 5befed79b2ead90f4b08ac1b6f2cbc9d09636b51 Mon Sep 17 00:00:00 2001 From: Janos Matyas Date: Tue, 17 Oct 2017 20:56:21 +0200 Subject: [PATCH 1/3] Support MetricNameSpace in executors and include shuffle to shuffleservice metrics --- .../CoarseGrainedExecutorBackend.scala | 13 ++++++-- .../apache/spark/metrics/MetricsSystem.scala | 2 ++ .../spark/metrics/MetricsSystemSuite.scala | 31 +++++++++++++++++++ .../apache/spark/deploy/k8s/constants.scala | 1 + .../cluster/k8s/ExecutorPodFactory.scala | 3 ++ .../KubernetesClusterSchedulerBackend.scala | 2 ++ .../cluster/k8s/ExecutorPodFactorySuite.scala | 15 ++++----- ...bernetesClusterSchedulerBackendSuite.scala | 1 + .../src/main/docker/executor-py/Dockerfile | 2 +- .../src/main/docker/executor/Dockerfile | 2 +- 10 files changed, 60 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 9fffa536c1296..89c08c3a3e7d6 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -181,7 +181,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { cores: Int, appId: String, workerUrl: Option[String], - userClassPath: Seq[URL]) { + userClassPath: Seq[URL], + metricsNamespace: String) { Utils.initDaemon(log) @@ -201,7 +202,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { clientMode = true) val driver = fetcher.setupEndpointRefByURI(driverUrl) val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(executorId)) - val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId)) + val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId), + ("spark.metrics.namespace", metricsNamespace)) fetcher.shutdown() // Create SparkEnv using properties we fetched from the driver. @@ -239,6 +241,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var hostname: String = null var cores: Int = 0 var appId: String = null + var metricsNameSpace: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() @@ -267,6 +270,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { case ("--user-class-path") :: value :: tail => userClassPath += new URL(value) argv = tail + case ("--metrics-namespace") :: value :: tail => + metricsNameSpace = value + argv = tail case Nil => case tail => // scalastyle:off println @@ -281,7 +287,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { printUsageAndExit() } - run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) + run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath, metricsNameSpace) System.exit(0) } @@ -297,6 +303,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | --hostname | --cores | --app-id + | --metrics-namespace | --worker-url | --user-class-path |""".stripMargin) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 1d494500cdb5c..4ed198b1f36da 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -147,6 +147,8 @@ private[spark] class MetricsSystem private ( } defaultName } + } else if (instance == "shuffleService") { + MetricRegistry.name(metricsNamespace.getOrElse(""), defaultName) } else { defaultName } } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 61db6af830cc5..2f8cfe072bebe 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -268,4 +268,35 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricName === source.sourceName) } + test("MetricsSystem with shuffleService instance with custom namespace") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val metricsNamespace = "testMetricsNamespace" + conf.set(METRICS_NAMESPACE, metricsNamespace) + + val instanceName = "shuffleService" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + + assert(metricName === s"$metricsNamespace.${source.sourceName}") + } + + test("MetricsSystem with shuffleService instance with no custom namespace") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val instanceName = "shuffleService" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + + assert(metricName === s"${source.sourceName}") + } + } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala index 1dad5393ca301..78a26fb1e6e73 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala @@ -72,6 +72,7 @@ package object constants { private[spark] val ENV_R_FILE = "R_FILE" private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR" + private[spark] val ENV_METRICS_NAMESPACE = "METRICS_NAMESPACE" // Bootstrapping dependencies with the init-container private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 98a0d879b6a58..3d5a1845dc8d9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -32,6 +32,7 @@ private[spark] trait ExecutorPodFactory { def createExecutorPod( executorId: String, applicationId: String, + metricsNameSpace: String, driverUrl: String, executorEnvs: Seq[(String, String)], driverPod: Pod, @@ -105,6 +106,7 @@ private[spark] class ExecutorPodFactoryImpl( override def createExecutorPod( executorId: String, applicationId: String, + metricsNameSpace: String, driverUrl: String, executorEnvs: Seq[(String, String)], driverPod: Pod, @@ -151,6 +153,7 @@ private[spark] class ExecutorPodFactoryImpl( (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString), (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId), + (ENV_METRICS_NAMESPACE, metricsNameSpace), (ENV_EXECUTOR_ID, executorId), (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ executorEnvs) .map(env => new EnvVarBuilder() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index d30c88fcc74bf..0671a63f8bf23 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -194,6 +194,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def applicationId(): String = conf.get("spark.app.id", super.applicationId()) + def metricsNameSpace(): String = conf.get("spark.metrics.namespace", applicationId()) override def sufficientResourcesRegistered(): Boolean = { totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio @@ -285,6 +286,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val executorPod = executorPodFactory.createExecutorPod( executorId, applicationId(), + metricsNameSpace(), driverUrl, conf.getExecutorEnv, driverPod, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index bb09cb801b5a9..952ea8505b2c6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -79,7 +79,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + "1", "dummy", "default", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) @@ -119,7 +119,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + "1", "dummy", "default", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) @@ -140,7 +140,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + "1", "dummy", "default", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) @@ -174,7 +174,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + "1", "dummy", "default", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) @@ -205,7 +205,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + "1", "dummy", "default", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) assert(executor.getSpec.getVolumes.size === 1) assert(executor.getSpec.getVolumes.contains(localDirVolume)) assert(executor.getSpec.getContainers.size() === 1) @@ -226,7 +226,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + "1", "dummy", "default", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) @@ -259,7 +259,8 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) + "1", "dummy", "default", "dummy", Seq[(String, String)]("qux" -> "quux"), + driverPod, Map[String, Int]()) verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index a9a2937869edd..22d72dd060158 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -374,6 +374,7 @@ private[spark] class KubernetesClusterSchedulerBackendSuite when(executorPodFactory.createExecutorPod( executorId.toString, APP_ID, + APP_ID, DRIVER_URL, sparkConf.getExecutorEnv, driverPod, diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile index a8bb5b362ab52..459fb61f28b7f 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile @@ -45,4 +45,4 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ - ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP + ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP --metrics-namespace $METRICS_NAMESPACE diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile index ab9f67e95a8e5..00636df354183 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile @@ -31,4 +31,4 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ - ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP + ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP --metrics-namespace $METRICS_NAMESPACE From 4878ae59418c3c5fefc6c902c81bd5fb56486a9c Mon Sep 17 00:00:00 2001 From: Janos Matyas Date: Fri, 10 Nov 2017 09:21:00 +0100 Subject: [PATCH 2/3] Revert passing metrics namespace to executores via environment variable as that is pulled from driver spark conf --- .../executor/CoarseGrainedExecutorBackend.scala | 13 +++---------- .../org/apache/spark/deploy/k8s/constants.scala | 1 - .../scheduler/cluster/k8s/ExecutorPodFactory.scala | 3 --- .../k8s/KubernetesClusterSchedulerBackend.scala | 2 -- .../cluster/k8s/ExecutorPodFactorySuite.scala | 14 +++++++------- .../KubernetesClusterSchedulerBackendSuite.scala | 1 - 6 files changed, 10 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 89c08c3a3e7d6..9fffa536c1296 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -181,8 +181,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { cores: Int, appId: String, workerUrl: Option[String], - userClassPath: Seq[URL], - metricsNamespace: String) { + userClassPath: Seq[URL]) { Utils.initDaemon(log) @@ -202,8 +201,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { clientMode = true) val driver = fetcher.setupEndpointRefByURI(driverUrl) val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(executorId)) - val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId), - ("spark.metrics.namespace", metricsNamespace)) + val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() // Create SparkEnv using properties we fetched from the driver. @@ -241,7 +239,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var hostname: String = null var cores: Int = 0 var appId: String = null - var metricsNameSpace: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() @@ -270,9 +267,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { case ("--user-class-path") :: value :: tail => userClassPath += new URL(value) argv = tail - case ("--metrics-namespace") :: value :: tail => - metricsNameSpace = value - argv = tail case Nil => case tail => // scalastyle:off println @@ -287,7 +281,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { printUsageAndExit() } - run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath, metricsNameSpace) + run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) System.exit(0) } @@ -303,7 +297,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | --hostname | --cores | --app-id - | --metrics-namespace | --worker-url | --user-class-path |""".stripMargin) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala index 117d9e4dea7d1..26cdcaa7f67c8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala @@ -73,7 +73,6 @@ package object constants { private[spark] val ENV_R_FILE = "R_FILE" private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR" - private[spark] val ENV_METRICS_NAMESPACE = "METRICS_NAMESPACE" // Bootstrapping dependencies with the init-container private[spark] val INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 3d5a1845dc8d9..98a0d879b6a58 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -32,7 +32,6 @@ private[spark] trait ExecutorPodFactory { def createExecutorPod( executorId: String, applicationId: String, - metricsNameSpace: String, driverUrl: String, executorEnvs: Seq[(String, String)], driverPod: Pod, @@ -106,7 +105,6 @@ private[spark] class ExecutorPodFactoryImpl( override def createExecutorPod( executorId: String, applicationId: String, - metricsNameSpace: String, driverUrl: String, executorEnvs: Seq[(String, String)], driverPod: Pod, @@ -153,7 +151,6 @@ private[spark] class ExecutorPodFactoryImpl( (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString), (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId), - (ENV_METRICS_NAMESPACE, metricsNameSpace), (ENV_EXECUTOR_ID, executorId), (ENV_MOUNTED_CLASSPATH, s"$executorJarsDownloadDir/*")) ++ executorEnvs) .map(env => new EnvVarBuilder() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 0671a63f8bf23..d30c88fcc74bf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -194,7 +194,6 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def applicationId(): String = conf.get("spark.app.id", super.applicationId()) - def metricsNameSpace(): String = conf.get("spark.metrics.namespace", applicationId()) override def sufficientResourcesRegistered(): Boolean = { totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio @@ -286,7 +285,6 @@ private[spark] class KubernetesClusterSchedulerBackend( val executorPod = executorPodFactory.createExecutorPod( executorId, applicationId(), - metricsNameSpace(), driverUrl, conf.getExecutorEnv, driverPod, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 1d3347d02f9c0..02853ebbf2ca2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -79,7 +79,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "default", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) @@ -119,7 +119,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "default", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) @@ -140,7 +140,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "default", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) @@ -174,7 +174,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "default", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) @@ -204,7 +204,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "default", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) assert(executor.getSpec.getVolumes.size === 1) assert(executor.getSpec.getVolumes.contains(localDirVolume)) assert(executor.getSpec.getContainers.size() === 1) @@ -225,7 +225,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "default", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) @@ -258,7 +258,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( - "1", "dummy", "default", "dummy", Seq[(String, String)]("qux" -> "quux"), + "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) verify(nodeAffinityExecutorPodModifier, times(1)) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 22d72dd060158..a9a2937869edd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -374,7 +374,6 @@ private[spark] class KubernetesClusterSchedulerBackendSuite when(executorPodFactory.createExecutorPod( executorId.toString, APP_ID, - APP_ID, DRIVER_URL, sparkConf.getExecutorEnv, driverPod, From 8c7d90359bba992d8117ccbecc7faa8efd927e4c Mon Sep 17 00:00:00 2001 From: matyix Date: Tue, 14 Nov 2017 11:02:45 +0100 Subject: [PATCH 3/3] Remove unused metrics namespace from Dockerfiles --- .../src/main/docker/executor-py/Dockerfile | 3 +-- .../docker-minimal-bundle/src/main/docker/executor/Dockerfile | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile index d1ad752c3287d..6c48c0bb2533a 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile @@ -45,5 +45,4 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ - ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP --metrics-namespace $METRICS_NAMESPACE - + ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile index a12a8a6608269..310fe2843cd0b 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile @@ -31,5 +31,4 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ - ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP --metrics-namespace $METRICS_NAMESPACE - + ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP