Skip to content

Commit

Permalink
[SPARK-22953][K8S] Avoids adding duplicated secret volumes when init-…
Browse files Browse the repository at this point in the history
…container is used

## What changes were proposed in this pull request?

User-specified secrets are mounted into both the main container and init-container (when it is used) in a Spark driver/executor pod, using the `MountSecretsBootstrap`. Because `MountSecretsBootstrap` always adds new secret volumes for the secrets to the pod, the same secret volumes get added twice, one when mounting the secrets to the main container, and the other when mounting the secrets to the init-container. This PR fixes the issue by separating `MountSecretsBootstrap.mountSecrets` out into two methods: `addSecretVolumes` for adding secret volumes to a pod and `mountSecrets` for mounting secret volumes to a container, respectively. `addSecretVolumes` is only called once for each pod, whereas `mountSecrets` is called individually for the main container and the init-container (if it is used).

Ref: apache-spark-on-k8s#594.

## How was this patch tested?
Unit tested and manually tested.

vanzin This replaces #20148.
hex108 foxish kimoonkim

Author: Yinan Li <liyinan926@gmail.com>

Closes #20159 from liyinan926/master.

(cherry picked from commit e288fc8)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
liyinan926 authored and Marcelo Vanzin committed Jan 4, 2018
1 parent 2ab4012 commit 84707f0
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,36 @@ import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBui
private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) {

/**
* Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
* Add new secret volumes for the secrets specified in secretNamesToMountPaths into the given pod.
*
* @param pod the pod into which the secret volumes are being added.
* @param container the container into which the secret volumes are being mounted.
* @return the updated pod and container with the secrets mounted.
* @return the updated pod with the secret volumes added.
*/
def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
def addSecretVolumes(pod: Pod): Pod = {
var podBuilder = new PodBuilder(pod)
secretNamesToMountPaths.keys.foreach { name =>
podBuilder = podBuilder
.editOrNewSpec()
.addNewVolume()
.withName(secretVolumeName(name))
.withNewSecret()
.withSecretName(name)
.endSecret()
.endVolume()
.withName(secretVolumeName(name))
.withNewSecret()
.withSecretName(name)
.endSecret()
.endVolume()
.endSpec()
}

podBuilder.build()
}

/**
* Mounts Kubernetes secret volumes of the secrets specified in secretNamesToMountPaths into the
* given container.
*
* @param container the container into which the secret volumes are being mounted.
* @return the updated container with the secrets mounted.
*/
def mountSecrets(container: Container): Container = {
var containerBuilder = new ContainerBuilder(container)
secretNamesToMountPaths.foreach { case (name, path) =>
containerBuilder = containerBuilder
Expand All @@ -53,7 +63,7 @@ private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String,
.endVolumeMount()
}

(podBuilder.build(), containerBuilder.build())
containerBuilder.build()
}

private def secretVolumeName(secretName: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ private[spark] class DriverConfigOrchestrator(
Nil
}

val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
Seq(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
} else {
Nil
}

val initContainerBootstrapStep = if (existNonContainerLocalFiles(sparkJars ++ sparkFiles)) {
val orchestrator = new InitContainerConfigOrchestrator(
sparkJars,
Expand All @@ -147,19 +153,13 @@ private[spark] class DriverConfigOrchestrator(
Nil
}

val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
Seq(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
} else {
Nil
}

Seq(
initialSubmissionStep,
serviceBootstrapStep,
kubernetesCredentialsStep) ++
dependencyResolutionStep ++
initContainerBootstrapStep ++
mountSecretsStep
mountSecretsStep ++
initContainerBootstrapStep
}

private def existNonContainerLocalFiles(files: Seq[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private[spark] class BasicDriverConfigurationStep(
.endEnv()
.addNewEnv()
.withName(ENV_DRIVER_ARGS)
.withValue(appArgs.map(arg => "\"" + arg + "\"").mkString(" "))
.withValue(appArgs.mkString(" "))
.endEnv()
.addNewEnv()
.withName(ENV_DRIVER_BIND_ADDRESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ private[spark] class DriverMountSecretsStep(
bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val (pod, container) = bootstrap.mountSecrets(
driverSpec.driverPod, driverSpec.driverContainer)
val pod = bootstrap.addSecretVolumes(driverSpec.driverPod)
val container = bootstrap.mountSecrets(driverSpec.driverContainer)
driverSpec.copy(
driverPod = pod,
driverContainer = container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@ private[spark] class InitContainerMountSecretsStep(
bootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {

override def configureInitContainer(spec: InitContainerSpec) : InitContainerSpec = {
val (driverPod, initContainer) = bootstrap.mountSecrets(
spec.driverPod,
spec.initContainer)
spec.copy(
driverPod = driverPod,
initContainer = initContainer
)
// Mount the secret volumes given that the volumes have already been added to the driver pod
// when mounting the secrets into the main driver container.
val initContainer = bootstrap.mountSecrets(spec.initContainer)
spec.copy(initContainer = initContainer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private[spark] class ExecutorPodFactory(

val (maybeSecretsMountedPod, maybeSecretsMountedContainer) =
mountSecretsBootstrap.map { bootstrap =>
bootstrap.mountSecrets(executorPod, containerWithLimitCores)
(bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores))
}.getOrElse((executorPod, containerWithLimitCores))

val (bootstrappedPod, bootstrappedContainer) =
Expand All @@ -227,7 +227,9 @@ private[spark] class ExecutorPodFactory(

val (pod, mayBeSecretsMountedInitContainer) =
initContainerMountSecretsBootstrap.map { bootstrap =>
bootstrap.mountSecrets(podWithInitContainer.pod, podWithInitContainer.initContainer)
// Mount the secret volumes given that the volumes have already been added to the
// executor pod when mounting the secrets into the main executor container.
(podWithInitContainer.pod, bootstrap.mountSecrets(podWithInitContainer.initContainer))
}.getOrElse((podWithInitContainer.pod, podWithInitContainer.initContainer))

val bootstrappedPod = KubernetesUtils.appendInitContainer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
package org.apache.spark.deploy.k8s

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, Pod}

private[spark] object SecretVolumeUtils {

def podHasVolume(driverPod: Pod, volumeName: String): Boolean = {
driverPod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName)
def podHasVolume(pod: Pod, volumeName: String): Boolean = {
pod.getSpec.getVolumes.asScala.exists { volume =>
volume.getName == volumeName
}
}

def containerHasVolume(
driverContainer: Container,
volumeName: String,
mountPath: String): Boolean = {
driverContainer.getVolumeMounts.asScala.exists(volumeMount =>
volumeMount.getName == volumeName && volumeMount.getMountPath == mountPath)
def containerHasVolume(container: Container, volumeName: String, mountPath: String): Boolean = {
container.getVolumeMounts.asScala.exists { volumeMount =>
volumeMount.getName == volumeName && volumeMount.getMountPath == mountPath
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
private val APP_NAME = "spark-test"
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
private val APP_ARGS = Array("arg1", "arg2", "arg 3")
private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"")
private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
Expand Down Expand Up @@ -82,7 +82,7 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-examples.jar")
assert(envs(ENV_DRIVER_MEMORY) === "256M")
assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
assert(envs(ENV_DRIVER_ARGS) === "\"arg1\" \"arg2\" \"arg 3\"")
assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2 \"arg 3\"")
assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.spark.deploy.k8s.submit.steps

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.MountSecretsBootstrap
import org.apache.spark.deploy.k8s.submit.{KubernetesDriverSpec, SecretVolumeUtils}
import org.apache.spark.deploy.k8s.{MountSecretsBootstrap, SecretVolumeUtils}
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec

class DriverMountSecretsStepSuite extends SparkFunSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.spark.deploy.k8s.submit.steps.initcontainer
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.MountSecretsBootstrap
import org.apache.spark.deploy.k8s.submit.SecretVolumeUtils
import org.apache.spark.deploy.k8s.{MountSecretsBootstrap, SecretVolumeUtils}

class InitContainerMountSecretsStepSuite extends SparkFunSuite {

Expand All @@ -44,12 +43,8 @@ class InitContainerMountSecretsStepSuite extends SparkFunSuite {
val initContainerMountSecretsStep = new InitContainerMountSecretsStep(mountSecretsBootstrap)
val configuredInitContainerSpec = initContainerMountSecretsStep.configureInitContainer(
baseInitContainerSpec)

val podWithSecretsMounted = configuredInitContainerSpec.driverPod
val initContainerWithSecretsMounted = configuredInitContainerSpec.initContainer

Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
assert(SecretVolumeUtils.podHasVolume(podWithSecretsMounted, volumeName)))
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
assert(SecretVolumeUtils.containerHasVolume(
initContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, MountSecretsBootstrap, PodWithDetachedInitContainer}
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, MountSecretsBootstrap, PodWithDetachedInitContainer, SecretVolumeUtils}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._

Expand Down Expand Up @@ -165,17 +165,19 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef

val factory = new ExecutorPodFactory(
conf,
None,
Some(secretsBootstrap),
Some(initContainerBootstrap),
Some(secretsBootstrap))
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())

assert(executor.getSpec.getVolumes.size() === 1)
assert(SecretVolumeUtils.podHasVolume(executor, "secret1-volume"))
assert(SecretVolumeUtils.containerHasVolume(
executor.getSpec.getContainers.get(0), "secret1-volume", "/var/secret1"))
assert(executor.getSpec.getInitContainers.size() === 1)
assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0).getName
=== "secret1-volume")
assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0)
.getMountPath === "/var/secret1")
assert(SecretVolumeUtils.containerHasVolume(
executor.getSpec.getInitContainers.get(0), "secret1-volume", "/var/secret1"))

checkOwnerReferences(executor, driverPodUid)
}
Expand Down

0 comments on commit 84707f0

Please sign in to comment.