Skip to content

Commit

Permalink
Merge pull request apache#399 from palantir/os/pod-template
Browse files Browse the repository at this point in the history
[SPARK-24434][K8S] pod template files
  • Loading branch information
onursatici authored Sep 3, 2018
2 parents 9698d3b + 10c6420 commit febcd5c
Show file tree
Hide file tree
Showing 23 changed files with 876 additions and 44 deletions.
179 changes: 179 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,21 @@ To use a secret through an environment variable use the following options to the
--conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key
```

## Pod Template
Kubernetes allows defining pods from [template files](https://kubernetes.io/docs/concepts/workloads/pods/pod-overview/#pod-templates).
Spark users can similarly use template files to define the driver or executor pod configurations that Spark configurations do not support.
To do so, specify the spark properties `spark.kubernetes.driver.podTemplateFile` and `spark.kubernetes.executor.podTemplateFile`
to point to local files accessible to the `spark-submit` process. To allow the driver pod access the executor pod template
file, the file will be automatically mounted onto a volume in the driver pod when it's created.

It is important to note that Spark is opinionated about certain pod configurations so there are values in the
pod template that will always be overwritten by Spark. Therefore, users of this feature should note that specifying
the pod template file only lets Spark start with a template pod instead of an empty pod during the pod-building process.
For details, see the [full list](#pod-template-properties) of pod template values that will be overwritten by spark.

Pod template files can also define multiple containers. In such cases, Spark will always assume that the first container in
the list will be the driver or executor container.

## Introspection and Debugging

These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
Expand Down Expand Up @@ -775,4 +790,168 @@ specific to Spark on Kubernetes.
This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.podTemplateFile</code></td>
<td>(none)</td>
<td>
Specify the local file that contains the driver [pod template](#pod-template). For example
<code>spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml`</code>
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.podTemplateFile</code></td>
<td>(none)</td>
<td>
Specify the local file that contains the executor [pod template](#pod-template). For example
<code>spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml`</code>
</td>
</tr>
</table>

#### Pod template properties

See the below table for the full list of pod specifications that will be overwritten by spark.

### Pod Metadata

<table class="table">
<tr><th>Pod metadata key</th><th>Modified value</th><th>Description</th></tr>
<tr>
<td>name</td>
<td>Value of <code>spark.kubernetes.driver.pod.name</code></td>
<td>
The driver pod name will be overwritten with either the configured or default value of
<code>spark.kubernetes.driver.pod.name</code>. The executor pod names will be unaffected.
</td>
</tr>
<tr>
<td>namespace</td>
<td>Value of <code>spark.kubernetes.namespace</code></td>
<td>
Spark makes strong assumptions about the driver and executor namespaces. Both driver and executor namespaces will
be replaced by either the configured or default spark conf value.
</td>
</tr>
<tr>
<td>labels</td>
<td>Adds the labels from <code>spark.kubernetes.{driver,executor}.label.*</code></td>
<td>
Spark will add additional labels specified by the spark configuration.
</td>
</tr>
<tr>
<td>annotations</td>
<td>Adds the annotations from <code>spark.kubernetes.{driver,executor}.annotation.*</code></td>
<td>
Spark will add additional labels specified by the spark configuration.
</td>
</tr>
</table>

### Pod Spec

<table class="table">
<tr><th>Pod spec key</th><th>Modified value</th><th>Description</th></tr>
<tr>
<td>imagePullSecrets</td>
<td>Adds image pull secrets from <code>spark.kubernetes.container.image.pullSecrets</code></td>
<td>
Additional pull secrets will be added from the spark configuration to both executor pods.
</td>
</tr>
<tr>
<td>nodeSelector</td>
<td>Adds node selectors from <code>spark.kubernetes.node.selector.*</code></td>
<td>
Additional node selectors will be added from the spark configuration to both executor pods.
</td>
</tr>
<tr>
<td>restartPolicy</td>
<td><code>"never"</code></td>
<td>
Spark assumes that both drivers and executors never restart.
</td>
</tr>
<tr>
<td>serviceAccount</td>
<td>Value of <code>spark.kubernetes.authenticate.driver.serviceAccountName</code></td>
<td>
Spark will override <code>serviceAccount</code> with the value of the spark configuration for only
driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected.
</td>
</tr>
<tr>
<td>serviceAccountName</td>
<td>Value of <code>spark.kubernetes.authenticate.driver.serviceAccountName</code></td>
<td>
Spark will override <code>serviceAccountName</code> with the value of the spark configuration for only
driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected.
</td>
</tr>
<tr>
<td>volumes</td>
<td>Adds volumes from <code>spark.kubernetes.{driver,executor}.volumes.[VolumeType].[VolumeName].mount.path</code></td>
<td>
Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing
spark conf and pod template files.
</td>
</tr>
</table>

### Container spec

The following affect the driver and executor containers. All other containers in the pod spec will be unaffected.

<table class="table">
<tr><th>Container spec key</th><th>Modified value</th><th>Description</th></tr>
<tr>
<td>env</td>
<td>Adds env variables from <code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
<td>
Spark will add driver env variables from <code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code>, and
executor env variables from <code>spark.executorEnv.[EnvironmentVariableName]</code>.
</td>
</tr>
<tr>
<td>image</td>
<td>Value of <code>spark.kubernetes.{driver,executor}.container.image</code></td>
<td>
The image will be defined by the spark configurations.
</td>
</tr>
<tr>
<td>imagePullPolicy</td>
<td>Value of <code>spark.kubernetes.container.image.pullPolicy</code></td>
<td>
Spark will override the pull policy for both driver and executors.
</td>
</tr>
<tr>
<td>name</td>
<td>See description.</code></td>
<td>
The container name will be assigned by spark ("spark-kubernetes-driver" for the driver container, and
"executor" for each executor container) if not defined by the pod template. If the container is defined by the
template, the template's name will be used.
</td>
</tr>
<tr>
<td>resources</td>
<td>See description</td>
<td>
The cpu limits are set by <code>spark.kubernetes.{driver,executor}.limit.cores</code>. The cpu is set by
<code>spark.{driver,executor}.cores</code>. The memory request and limit are set by summing the values of
<code>spark.{driver,executor}.memory</code> and <code>spark.{driver,executor}.memoryOverhead</code>.

</td>
</tr>
<tr>
<td>volumeMounts</td>
<td>Add volumes from <code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.{path,readOnly}</code></td>
<td>
Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing
spark conf and pod template files.
</td>
</tr>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,18 @@ private[spark] object Config extends Logging {
"Ensure that major Python version is either Python2 or Python3")
.createWithDefault("2")

val KUBERNETES_DRIVER_PODTEMPLATE_FILE =
ConfigBuilder("spark.kubernetes.driver.podTemplateFile")
.doc("File containing a template pod spec for the driver")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE =
ConfigBuilder("spark.kubernetes.executor.podTemplateFile")
.doc("File containing a template pod spec for executors")
.stringConf
.createOptional

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,16 @@ private[spark] object Constants {
val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"

// Pod spec templates
val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"
val EXECUTOR_POD_SPEC_TEMPLATE_MOUNTHPATH = "/opt/spark/pod-template"
val POD_TEMPLATE_VOLUME = "podspec-volume"
val POD_TEMPLATE_CONFIGMAP = "podspec-configmap"
val POD_TEMPLATE_KEY = "podspec-configmap-key"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
val MEMORY_OVERHEAD_MIN_MIB = 384L
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,3 @@ private[spark] case class KubernetesDriverSpec(
pod: SparkPod,
driverKubernetesResources: Seq[HasMetadata],
systemProperties: Map[String, String])

private[spark] object KubernetesDriverSpec {
def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec(
SparkPod.initialPod(),
Seq.empty,
initialProps)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ package org.apache.spark.deploy.k8s

import java.io.File

import org.apache.spark.SparkConf
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

private[spark] object KubernetesUtils {
private[spark] object KubernetesUtils extends Logging {

/**
* Extract and parse Spark configuration properties with a given name prefix and
Expand Down Expand Up @@ -72,5 +77,28 @@ private[spark] object KubernetesUtils {
}
}

def loadPodFromTemplate(
kubernetesClient: KubernetesClient,
templateFile: File): SparkPod = {
try {
val pod = kubernetesClient.pods().load(templateFile).get()
pod.getSpec.getContainers.asScala.toList match {
case first :: rest => SparkPod(
new PodBuilder(pod)
.editSpec()
.withContainers(rest.asJava)
.endSpec()
.build(),
first)
case Nil => SparkPod(pod, new ContainerBuilder().build())
}
} catch {
case e: Exception =>
logError(
s"Encountered exception while attempting to load initial pod spec from file", e)
throw new SparkException("Could not load pod from template file.", e)
}
}

def parseMasterUrl(url: String): String = url.substring("k8s://".length)
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[spark] class BasicDriverFeatureStep(
)
val driverUIPort = SparkUI.getUIPort(conf.sparkConf)
val driverContainer = new ContainerBuilder(pod.container)
.withName(DRIVER_CONTAINER_NAME)
.withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
.withImage(driverContainerImage)
.withImagePullPolicy(conf.imagePullPolicy())
.addNewPort()
Expand All @@ -105,7 +105,7 @@ private[spark] class BasicDriverFeatureStep(
.withNewFieldRef("v1", "status.podIP")
.build())
.endEnv()
.withNewResources()
.editOrNewResources()
.addToRequests("cpu", driverCpuQuantity)
.addToLimits(maybeCpuLimitQuantity.toMap.asJava)
.addToRequests("memory", driverMemoryQuantity)
Expand All @@ -119,9 +119,9 @@ private[spark] class BasicDriverFeatureStep(
.addToLabels(conf.roleLabels.asJava)
.addToAnnotations(conf.roleAnnotations.asJava)
.endMetadata()
.withNewSpec()
.editOrNewSpec()
.withRestartPolicy("Never")
.withNodeSelector(conf.nodeSelector().asJava)
.addToNodeSelector(conf.nodeSelector().asJava)
.addToImagePullSecrets(conf.imagePullSecrets(): _*)
.endSpec()
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ private[spark] class BasicExecutorFeatureStep(
}

val executorContainer = new ContainerBuilder(pod.container)
.withName("executor")
.withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME))
.withImage(executorContainerImage)
.withImagePullPolicy(kubernetesConf.imagePullPolicy())
.withNewResources()
.editOrNewResources()
.addToRequests("memory", executorMemoryQuantity)
.addToLimits("memory", executorMemoryQuantity)
.addToRequests("cpu", executorCpuQuantity)
Expand Down Expand Up @@ -163,14 +163,14 @@ private[spark] class BasicExecutorFeatureStep(
val executorPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.withLabels(kubernetesConf.roleLabels.asJava)
.withAnnotations(kubernetesConf.roleAnnotations.asJava)
.addToLabels(kubernetesConf.roleLabels.asJava)
.addToAnnotations(kubernetesConf.roleAnnotations.asJava)
.addToOwnerReferences(ownerReference.toSeq: _*)
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
Expand Down
Loading

0 comments on commit febcd5c

Please sign in to comment.