Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
Register executors using pod IPs instead of pod host names (#215)
Browse files Browse the repository at this point in the history
* Register executors using pod IPs

* Fix block manager port typo

* Fix import

* Keep requiredEnv to be a val

* Clean up indentation
  • Loading branch information
kimoonkim authored and mccheah committed Apr 5, 2017
1 parent 2bf3f9e commit 5acf58f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,28 @@ private[spark] class Client(
sparkConf.set("spark.app.id", kubernetesAppId)
sparkConf.setIfMissing("spark.app.name", appName)
sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
<<<<<<< HEAD
sparkConf.setIfMissing("spark.blockmanager.port",
DEFAULT_BLOCKMANAGER_PORT.toString)
||||||| parent of 0a13206df6... Register executors using pod IPs instead of pod host names (#215)
sparkConf.setIfMissing("spark.blockmanager.port",
DEFAULT_BLOCKMANAGER_PORT.toString)
sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ =>
sparkConf.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "<present_but_redacted>")
}
sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ =>
sparkConf.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "<present_but_redacted>")
}
=======
sparkConf.setIfMissing("spark.driver.blockManager.port", DEFAULT_BLOCKMANAGER_PORT.toString)
sparkConf.setIfMissing("spark.blockManager.port", DEFAULT_BLOCKMANAGER_PORT.toString)
sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ =>
sparkConf.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "<present_but_redacted>")
}
sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ =>
sparkConf.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "<present_but_redacted>")
}
>>>>>>> 0a13206df6... Register executors using pod IPs instead of pod host names (#215)
val driverSubmitter = buildDriverSubmissionClient(
kubernetesClient,
driverServiceManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ package object constants {
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
private[spark] val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"

// Annotation keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@ import java.util.UUID
import java.util.concurrent.Executors
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}

<<<<<<< HEAD
import com.google.common.util.concurrent.ThreadFactoryBuilder
import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder}
||||||| parent of 0a13206df6... Register executors using pod IPs instead of pod host names (#215)
import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder}
=======
import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder,
EnvVarSourceBuilder, Pod, QuantityBuilder}
>>>>>>> 0a13206df6... Register executors using pod IPs instead of pod host names (#215)
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -180,11 +187,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
(ENV_EXECUTOR_CORES, executorCores),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, applicationId()),
(ENV_EXECUTOR_ID, executorId)
).map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build())
(ENV_EXECUTOR_ID, executorId))
.map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build()
) ++ Seq(
new EnvVarBuilder()
.withName(ENV_EXECUTOR_POD_IP)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
.build())
.build()
)
val requiredPorts = Seq(
(EXECUTOR_PORT_NAME, executorPort),
(BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ ENV SPARK_HOME /opt/spark
WORKDIR /opt/spark

# TODO support spark.executor.extraClassPath
CMD exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp ${SPARK_HOME}/jars/\* org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $HOSTNAME
CMD exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp ${SPARK_HOME}/jars/\* org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

0 comments on commit 5acf58f

Please sign in to comment.