Skip to content

Commit

Permalink
Avoids adding duplicated secret volumes when init-container is used (a…
Browse files Browse the repository at this point in the history
…pache#597)

* Avoids adding duplicated secret volumes when init-container is used

Cherry-picked from apache#20148.

* Added the missing commit from upstream
  • Loading branch information
liyinan926 authored Jan 12, 2018
1 parent d7dd259 commit 12d590c
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, HadoopConfigBootstrapStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, HadoopConfigBootstrapStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.HadoopStepsOrchestrator
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.{SystemClock, Utils}

Expand Down Expand Up @@ -117,6 +116,13 @@ private[spark] class DriverConfigurationStepsOrchestrator(
val localDirectoryMountConfigurationStep = new LocalDirectoryMountConfigurationStep(
submissionSparkConf)

val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
val mountSecretsBootstrap = new MountSecretsBootstrap(driverSecretNamesToMountPaths)
Some(new MountSecretsStep(mountSecretsBootstrap))
} else {
None
}

val hadoopConfigSteps =
hadoopConfDir.map { conf =>
val hadoopStepsOrchestrator =
Expand Down Expand Up @@ -204,23 +210,16 @@ private[spark] class DriverConfigurationStepsOrchestrator(
jarsDownloadPath,
localFilesDownloadPath)

val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths)
Some(new MountSecretsStep(mountSecretsBootstrap))
} else {
None
}

Seq(
initialSubmissionStep,
driverAddressStep,
kubernetesCredentialsStep,
dependencyResolutionStep,
localDirectoryMountConfigurationStep) ++
mountSecretsStep.toSeq ++
submittedDependenciesBootstrapSteps ++
hadoopConfigSteps.toSeq ++
resourceStep.toSeq ++
mountSecretsStep.toSeq
resourceStep.toSeq
}

private def areAnyFilesNonContainerLocal(files: Seq[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,17 @@ package org.apache.spark.deploy.k8s.submit

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}

/**
* Bootstraps a driver or executor pod with needed secrets mounted.
*/
private[spark] trait MountSecretsBootstrap {
private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) {

/**
* Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
* Add new secret volumes for the secrets specified in secretNamesToMountPaths into the given pod.
*
* @param pod the pod into which the secret volumes are being added.
* @param container the container into which the secret volumes are being mounted.
* @return the updated pod and container with the secrets mounted.
* @return the updated pod with the secret volumes added.
*/
def mountSecrets(pod: Pod, container: Container): (Pod, Container)
}

private[spark] class MountSecretsBootstrapImpl(
secretNamesToMountPaths: Map[String, String]) extends MountSecretsBootstrap {

override def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
def addSecretVolumes(pod: Pod): Pod = {
var podBuilder = new PodBuilder(pod)
secretNamesToMountPaths.keys.foreach(name =>
secretNamesToMountPaths.keys.foreach { name =>
podBuilder = podBuilder
.editOrNewSpec()
.addNewVolume()
Expand All @@ -47,18 +37,30 @@ private[spark] class MountSecretsBootstrapImpl(
.withSecretName(name)
.endSecret()
.endVolume()
.endSpec())
.endSpec()
}

podBuilder.build()
}

/**
* Mounts Kubernetes secret volumes of the secrets specified in secretNamesToMountPaths into the
* given container.
*
* @param container the container into which the secret volumes are being mounted.
* @return the updated container with the secrets mounted.
*/
def mountSecrets(container: Container): Container = {
var containerBuilder = new ContainerBuilder(container)
secretNamesToMountPaths.foreach(namePath =>
secretNamesToMountPaths.foreach { case (name, path) =>
containerBuilder = containerBuilder
.addNewVolumeMount()
.withName(secretVolumeName(namePath._1))
.withMountPath(namePath._2)
.withName(secretVolumeName(name))
.withMountPath(path)
.endVolumeMount()
)
}

(podBuilder.build(), containerBuilder.build())
containerBuilder.build()
}

private def secretVolumeName(secretName: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap
/**
* A driver configuration step for mounting user-specified secrets onto user-specified paths.
*
* @param mountSecretsBootstrap a utility actually handling mounting of the secrets.
* @param bootstrap a utility actually handling mounting of the secrets.
*/
private[spark] class MountSecretsStep(
mountSecretsBootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val (driverPodWithSecretsMounted, driverContainerWithSecretsMounted) =
mountSecretsBootstrap.mountSecrets(driverSpec.driverPod, driverSpec.driverContainer)
val pod = bootstrap.addSecretVolumes(driverSpec.driverPod)
val container = bootstrap.mountSecrets(driverSpec.driverContainer)
driverSpec.copy(
driverPod = driverPodWithSecretsMounted,
driverContainer = driverContainerWithSecretsMounted
driverPod = pod,
driverContainer = container
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSecretsBootstrapImpl, SubmittedDependencyUploaderImpl}
import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSecretsBootstrap, SubmittedDependencyUploaderImpl}
import org.apache.spark.deploy.rest.k8s.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -140,7 +140,7 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(
KUBERNETES_DRIVER_SECRETS_PREFIX,
"driver secrets")
val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths)
val mountSecretsBootstrap = new MountSecretsBootstrap(driverSecretNamesToMountPaths)
Some(new InitContainerMountSecretsStep(mountSecretsBootstrap))
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@ import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap
* An init-container configuration step for mounting user-specified secrets onto user-specified
* paths.
*
* @param mountSecretsBootstrap a utility actually handling mounting of the secrets.
* @param bootstrap a utility actually handling mounting of the secrets.
*/
private[spark] class InitContainerMountSecretsStep(
mountSecretsBootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {
bootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {

override def configureInitContainer(initContainerSpec: InitContainerSpec) : InitContainerSpec = {
val (podWithSecretsMounted, initContainerWithSecretsMounted) =
mountSecretsBootstrap.mountSecrets(
initContainerSpec.podToInitialize,
initContainerSpec.initContainer)
initContainerSpec.copy(
podToInitialize = podWithSecretsMounted,
initContainer = initContainerWithSecretsMounted
)
override def configureInitContainer(spec: InitContainerSpec) : InitContainerSpec = {
// Mount the secret volumes given that the volumes have already been added to the driver pod
// when mounting the secrets into the main driver container.
val initContainer = bootstrap.mountSecrets(spec.initContainer)
spec.copy(initContainer = initContainer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package org.apache.spark.scheduler.cluster.k8s

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrap, HadoopConfSparkUserBootstrap, InitContainerResourceStagingServerSecretPlugin, KerberosTokenConfBootstrap, PodWithDetachedInitContainer, PodWithMainContainer, SparkPodInitContainerBootstrap}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
Expand Down Expand Up @@ -235,7 +235,8 @@ private[spark] class ExecutorPodFactoryImpl(

val (withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer) =
mountSecretsBootstrap.map {bootstrap =>
bootstrap.mountSecrets(executorPod, containerWithExecutorLimitCores)
(bootstrap.addSecretVolumes(executorPod),
bootstrap.mountSecrets(containerWithExecutorLimitCores))
}.getOrElse((executorPod, containerWithExecutorLimitCores))
val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) =
mountSmallFilesBootstrap.map { bootstrap =>
Expand All @@ -258,7 +259,7 @@ private[spark] class ExecutorPodFactoryImpl(

val (mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted) =
executorInitContainerMountSecretsBootstrap.map { bootstrap =>
bootstrap.mountSecrets(podWithDetachedInitContainer.pod, resolvedInitContainer)
(podWithDetachedInitContainer.pod, bootstrap.mountSecrets(resolvedInitContainer))
}.getOrElse(podWithDetachedInitContainer.pod, resolvedInitContainer)

val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrapImpl, HadoopConfSparkUserBootstrapImpl, HadoopConfUtils, HadoopUGIUtilImpl, InitContainerResourceStagingServerSecretPluginImpl, KerberosTokenConfBootstrapImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl}
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrap, MountSmallFilesBootstrapImpl}
import org.apache.spark.internal.Logging
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl
Expand Down Expand Up @@ -125,12 +125,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val executorSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs(sparkConf,
KUBERNETES_EXECUTOR_SECRETS_PREFIX, "executor secrets")
val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
} else {
None
}
val executorInitContainerMountSecretsBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
package org.apache.spark.deploy.k8s

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, Pod}

private[spark] object SecretVolumeUtils {

def podHasVolume(driverPod: Pod, volumeName: String): Boolean = {
driverPod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName)
def podHasVolume(pod: Pod, volumeName: String): Boolean = {
pod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName)
}

def containerHasVolume(
driverContainer: Container,
container: Container,
volumeName: String,
mountPath: String): Boolean = {
driverContainer.getVolumeMounts.asScala.exists(volumeMount =>
container.getVolumeMounts.asScala.exists(volumeMount =>
volumeMount.getName == volumeName && volumeMount.getMountPath == mountPath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.submit
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.SecretVolumeUtils

private[spark] class MountSecretsBootstrapSuite extends SparkFunSuite {

Expand All @@ -34,9 +35,9 @@ private[spark] class MountSecretsBootstrapSuite extends SparkFunSuite {
val driverContainer = new ContainerBuilder().build()
val driverPod = new PodBuilder().build()

val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
val bootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
val (driverPodWithSecretsMounted, driverContainerWithSecretsMounted) =
mountSecretsBootstrap.mountSecrets(driverPod, driverContainer)
(bootstrap.addSecretVolumes(driverPod), bootstrap.mountSecrets(driverContainer))
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, volumeName)))
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.submitsteps
package org.apache.spark.deploy.k8s.submit.submitsteps

import java.nio.file.Paths

import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.{KubernetesDriverSpec, LocalDirectoryMountConfigurationStep}

private[spark] class LocalDirectoryMountConfigurationStepSuite extends SparkFunSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.apache.spark.deploy.k8s.submit.submitsteps

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, SecretVolumeUtils}
import org.apache.spark.deploy.k8s.SecretVolumeUtils
import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap

private[spark] class MountSecretsStepSuite extends SparkFunSuite {

Expand All @@ -31,7 +32,7 @@ private[spark] class MountSecretsStepSuite extends SparkFunSuite {
SECRET_FOO -> SECRET_MOUNT_PATH,
SECRET_BAR -> SECRET_MOUNT_PATH)

val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
val mountSecretsBootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
val mountSecretsStep = new MountSecretsStep(mountSecretsBootstrap)
val configuredDriverSpec = mountSecretsStep.configureDriver(baseDriverSpec)
val driverPodWithSecretsMounted = configuredDriverSpec.driverPod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, SecretVolumeUtils}
import org.apache.spark.deploy.k8s.SecretVolumeUtils
import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap

class InitContainerMountSecretsStepSuite extends SparkFunSuite {

Expand All @@ -39,16 +40,12 @@ class InitContainerMountSecretsStepSuite extends SparkFunSuite {
SECRET_FOO -> SECRET_MOUNT_PATH,
SECRET_BAR -> SECRET_MOUNT_PATH)

val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
val mountSecretsBootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
val initContainerMountSecretsStep = new InitContainerMountSecretsStep(mountSecretsBootstrap)
val configuredInitContainerSpec = initContainerMountSecretsStep.configureInitContainer(
baseInitContainerSpec)

val podWithSecretsMounted = configuredInitContainerSpec.podToInitialize
val initContainerWithSecretsMounted = configuredInitContainerSpec.initContainer

Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
assert(SecretVolumeUtils.podHasVolume(podWithSecretsMounted, volumeName)))
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
assert(SecretVolumeUtils.containerHasVolume(
initContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)))
Expand Down
Loading

0 comments on commit 12d590c

Please sign in to comment.