Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Register executors using pod IPs instead of pod host names #215

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ private[spark] class Client(
sparkConf.set("spark.app.id", kubernetesAppId)
sparkConf.setIfMissing("spark.app.name", appName)
sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
sparkConf.setIfMissing("spark.blockmanager.port",
DEFAULT_BLOCKMANAGER_PORT.toString)
sparkConf.setIfMissing("spark.driver.blockManager.port", DEFAULT_BLOCKMANAGER_PORT.toString)
sparkConf.setIfMissing("spark.blockManager.port", DEFAULT_BLOCKMANAGER_PORT.toString)
sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ =>
sparkConf.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "<present_but_redacted>")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ package object constants {
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
private[spark] val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"

// Annotation keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package org.apache.spark.scheduler.cluster.kubernetes

import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}

import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder}
import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder,
EnvVarSourceBuilder, Pod, QuantityBuilder}
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -177,11 +178,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
(ENV_EXECUTOR_CORES, executorCores),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, applicationId()),
(ENV_EXECUTOR_ID, executorId)
).map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build())
(ENV_EXECUTOR_ID, executorId))
.map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build()
) ++ Seq(
new EnvVarBuilder()
.withName(ENV_EXECUTOR_POD_IP)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
.build())
.build()
)
val requiredPorts = Seq(
(EXECUTOR_PORT_NAME, executorPort),
(BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ ENV SPARK_HOME /opt/spark
WORKDIR /opt/spark

# TODO support spark.executor.extraClassPath
CMD exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp ${SPARK_HOME}/jars/\* org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $HOSTNAME
CMD exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp ${SPARK_HOME}/jars/\* org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP