Skip to content

Commit

Permalink
Extract SSL configuration handling to a separate class (apache-spark-…
Browse files Browse the repository at this point in the history
…on-k8s#123)

* Extract SSL configuration to a separate class

* KubernetesSsl -> Ssl, container -> local
  • Loading branch information
mccheah authored and ash211 committed Feb 24, 2017
1 parent 622bfdb commit 1f8fca0
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
*/
package org.apache.spark.deploy.kubernetes

import java.io.{File, FileInputStream}
import java.security.{KeyStore, SecureRandom}
import java.io.File
import java.security.SecureRandom
import java.util
import java.util.concurrent.{CountDownLatch, TimeUnit}
import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}

import com.google.common.base.Charsets
import com.google.common.io.Files
import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.codec.binary.Base64
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.{SecurityManager, SparkConf, SparkException, SSLOptions}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
Expand All @@ -56,8 +53,6 @@ private[spark] class Client(
private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId"
private val secretDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId"
private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl"
private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId"
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)
Expand Down Expand Up @@ -95,7 +90,6 @@ private[spark] class Client(
throw new SparkException(s"Main app resource file $mainAppResource is not a file or" +
s" is a directory.")
}
val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()
val parsedCustomLabels = parseCustomLabels(customLabels)
var k8ConfBuilder = new K8SConfigBuilder()
.withApiVersion("v1")
Expand All @@ -115,6 +109,8 @@ private[spark] class Client(
Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient =>
ShutdownHookManager.addShutdownHook(() =>
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient))
val sslConfigurationProvider = new SslConfigurationProvider(
sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner)
val submitServerSecret = kubernetesClient.secrets().createNew()
.withNewMetadata()
.withName(secretName)
Expand All @@ -124,10 +120,7 @@ private[spark] class Client(
.done()
kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret)
try {
val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(
kubernetesClient,
driverSubmitSslOptions,
isKeyStoreLocalFile)
val sslConfiguration = sslConfigurationProvider.getSslConfiguration()
// start outer watch for status logging of driver pod
val driverPodCompletedLatch = new CountDownLatch(1)
// only enable interval logging if in waitForAppCompletion mode
Expand All @@ -142,21 +135,16 @@ private[spark] class Client(
kubernetesClient,
parsedCustomLabels,
submitServerSecret,
driverSubmitSslOptions,
sslSecrets,
sslVolumes,
sslVolumeMounts,
sslEnvs,
isKeyStoreLocalFile)
sslConfiguration)
configureOwnerReferences(
kubernetesClient,
submitServerSecret,
sslSecrets,
sslConfiguration.sslSecrets,
driverPod,
driverService)
submitApplicationToDriverServer(
kubernetesClient,
driverSubmitSslOptions,
sslConfiguration,
driverService,
submitterLocalFiles,
submitterLocalJars)
Expand All @@ -182,7 +170,7 @@ private[spark] class Client(

private def submitApplicationToDriverServer(
kubernetesClient: KubernetesClient,
driverSubmitSslOptions: SSLOptions,
sslConfiguration: SslConfiguration,
driverService: Service,
submitterLocalFiles: Iterable[String],
submitterLocalJars: Iterable[String]): Unit = {
Expand All @@ -198,7 +186,7 @@ private[spark] class Client(
sparkConf.setIfMissing("spark.blockmanager.port",
DEFAULT_BLOCKMANAGER_PORT.toString)
val driverSubmitter = buildDriverSubmissionClient(kubernetesClient, driverService,
driverSubmitSslOptions)
sslConfiguration)
// Sanity check to see if the driver submitter is even reachable.
driverSubmitter.ping()
logInfo(s"Submitting local resources to driver pod for application " +
Expand Down Expand Up @@ -229,20 +217,15 @@ private[spark] class Client(
kubernetesClient: KubernetesClient,
parsedCustomLabels: Map[String, String],
submitServerSecret: Secret,
driverSubmitSslOptions: SSLOptions,
sslSecrets: Array[Secret],
sslVolumes: Array[Volume],
sslVolumeMounts: Array[VolumeMount],
sslEnvs: Array[EnvVar],
isKeyStoreLocalFile: Boolean): (Pod, Service) = {
val endpointsReadyFuture = SettableFuture.create[Endpoints]
val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture)
val serviceReadyFuture = SettableFuture.create[Service]
sslConfiguration: SslConfiguration): (Pod, Service) = {
val driverKubernetesSelectors = (Map(
SPARK_DRIVER_LABEL -> kubernetesAppId,
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_APP_NAME_LABEL -> appName)
++ parsedCustomLabels).asJava
val endpointsReadyFuture = SettableFuture.create[Endpoints]
val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture)
val serviceReadyFuture = SettableFuture.create[Service]
val serviceReadyWatcher = new DriverServiceReadyWatcher(serviceReadyFuture)
val podReadyFuture = SettableFuture.create[Pod]
val podWatcher = new DriverPodReadyWatcher(podReadyFuture)
Expand All @@ -267,10 +250,7 @@ private[spark] class Client(
kubernetesClient,
driverKubernetesSelectors,
submitServerSecret,
driverSubmitSslOptions,
sslVolumes,
sslVolumeMounts,
sslEnvs)
sslConfiguration)
kubernetesResourceCleaner.registerOrUpdateResource(driverPod)
waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture,
serviceReadyFuture, podReadyFuture)
Expand Down Expand Up @@ -386,13 +366,10 @@ private[spark] class Client(
kubernetesClient: KubernetesClient,
driverKubernetesSelectors: util.Map[String, String],
submitServerSecret: Secret,
driverSubmitSslOptions: SSLOptions,
sslVolumes: Array[Volume],
sslVolumeMounts: Array[VolumeMount],
sslEnvs: Array[EnvVar]): Pod = {
sslConfiguration: SslConfiguration): Pod = {
val containerPorts = buildContainerPorts()
val probePingHttpGet = new HTTPGetActionBuilder()
.withScheme(if (driverSubmitSslOptions.enabled) "HTTPS" else "HTTP")
.withScheme(if (sslConfiguration.sslOptions.enabled) "HTTPS" else "HTTP")
.withPath("/v1/submissions/ping")
.withNewPort(SUBMISSION_SERVER_PORT_NAME)
.build()
Expand All @@ -409,7 +386,7 @@ private[spark] class Client(
.withSecretName(submitServerSecret.getMetadata.getName)
.endSecret()
.endVolume()
.addToVolumes(sslVolumes: _*)
.addToVolumes(sslConfiguration.sslPodVolumes: _*)
.withServiceAccount(serviceAccount)
.addNewContainer()
.withName(DRIVER_CONTAINER_NAME)
Expand All @@ -420,7 +397,7 @@ private[spark] class Client(
.withMountPath(secretDirectory)
.withReadOnly(true)
.endVolumeMount()
.addToVolumeMounts(sslVolumeMounts: _*)
.addToVolumeMounts(sslConfiguration.sslPodVolumeMounts: _*)
.addNewEnv()
.withName(ENV_SUBMISSION_SECRET_LOCATION)
.withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
Expand All @@ -429,7 +406,7 @@ private[spark] class Client(
.withName(ENV_SUBMISSION_SERVER_PORT)
.withValue(SUBMISSION_SERVER_PORT.toString)
.endEnv()
.addToEnv(sslEnvs: _*)
.addToEnv(sslConfiguration.sslPodEnvVars: _*)
.withPorts(containerPorts.asJava)
.withNewReadinessProbe().withHttpGet(probePingHttpGet).endReadinessProbe()
.endContainer()
Expand Down Expand Up @@ -486,108 +463,6 @@ private[spark] class Client(
}
}

private def parseDriverSubmitSslOptions(): (SSLOptions, Boolean) = {
val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE)
val resolvedSparkConf = sparkConf.clone()
val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => {
(KubernetesFileUtils.isUriLocalFile(keyStore),
Option.apply(Utils.resolveURI(keyStore).getPath))
}).getOrElse((false, Option.empty[String]))
resolvedKeyStore.foreach {
resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, _)
}
sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE).foreach { trustStore =>
if (KubernetesFileUtils.isUriLocalFile(trustStore)) {
resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE,
Utils.resolveURI(trustStore).getPath)
} else {
throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" +
" for submit server must have no scheme, or scheme file://")
}
}
val securityManager = new SecurityManager(resolvedSparkConf)
(securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore)
}

private def configureSsl(
kubernetesClient: KubernetesClient,
driverSubmitSslOptions: SSLOptions,
isKeyStoreLocalFile: Boolean):
(Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = {
if (driverSubmitSslOptions.enabled) {
val sslSecretsMap = mutable.HashMap[String, String]()
val sslEnvs = mutable.Buffer[EnvVar]()
val secrets = mutable.Buffer[Secret]()
driverSubmitSslOptions.keyStore.foreach(store => {
val resolvedKeyStoreFile = if (isKeyStoreLocalFile) {
if (!store.isFile) {
throw new SparkException(s"KeyStore specified at $store is not a file or" +
s" does not exist.")
}
val keyStoreBytes = Files.toByteArray(store)
val keyStoreBase64 = Base64.encodeBase64String(keyStoreBytes)
sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64)
s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_SECRET_NAME"
} else {
store.getAbsolutePath
}
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_FILE)
.withValue(resolvedKeyStoreFile)
.build()
})
driverSubmitSslOptions.keyStorePassword.foreach(password => {
val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8))
sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64)
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE)
.withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME")
.build()
})
driverSubmitSslOptions.keyPassword.foreach(password => {
val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8))
sslSecretsMap += (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64)
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE)
.withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME")
.build()
})
driverSubmitSslOptions.keyStoreType.foreach(storeType => {
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_TYPE)
.withValue(storeType)
.build()
})
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_USE_SSL)
.withValue("true")
.build()
val sslVolume = new VolumeBuilder()
.withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
.withNewSecret()
.withSecretName(sslSecretsName)
.endSecret()
.build()
val sslVolumeMount = new VolumeMountBuilder()
.withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
.withReadOnly(true)
.withMountPath(sslSecretsDirectory)
.build()
val sslSecrets = kubernetesClient.secrets().createNew()
.withNewMetadata()
.withName(sslSecretsName)
.endMetadata()
.withData(sslSecretsMap.asJava)
.withType("Opaque")
.done()
kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets)
secrets += sslSecrets
(sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray)
} else {
(Array[EnvVar](), Array[Volume](), Array[VolumeMount](), Array[Secret]())
}
}

private def buildSubmitFailedErrorMessage(
kubernetesClient: KubernetesClient,
e: Throwable): String = {
Expand Down Expand Up @@ -688,8 +563,8 @@ private[spark] class Client(
private def buildDriverSubmissionClient(
kubernetesClient: KubernetesClient,
service: Service,
driverSubmitSslOptions: SSLOptions): KubernetesSparkRestApi = {
val urlScheme = if (driverSubmitSslOptions.enabled) {
sslConfiguration: SslConfiguration): KubernetesSparkRestApi = {
val urlScheme = if (sslConfiguration.sslOptions.enabled) {
"https"
} else {
logWarning("Submitting application details, application secret, and local" +
Expand All @@ -714,45 +589,18 @@ private[spark] class Client(
s"$urlScheme://${address.getAddress}:$servicePort"
}).toSet
require(nodeUrls.nonEmpty, "No nodes found to contact the driver!")
val (trustManager, sslContext): (X509TrustManager, SSLContext) =
if (driverSubmitSslOptions.enabled) {
buildSslConnectionConfiguration(driverSubmitSslOptions)
} else {
(null, SSLContext.getDefault)
}
HttpClientUtil.createClient[KubernetesSparkRestApi](
uris = nodeUrls,
maxRetriesPerServer = 3,
sslSocketFactory = sslContext.getSocketFactory,
trustContext = trustManager,
sslSocketFactory = sslConfiguration
.driverSubmitClientSslContext
.getSocketFactory,
trustContext = sslConfiguration
.driverSubmitClientTrustManager
.orNull,
connectTimeoutMillis = 5000)
}

private def buildSslConnectionConfiguration(driverSubmitSslOptions: SSLOptions) = {
driverSubmitSslOptions.trustStore.map(trustStoreFile => {
val trustManagerFactory = TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm)
val trustStore = KeyStore.getInstance(
driverSubmitSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType))
if (!trustStoreFile.isFile) {
throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" +
s" does not exist or is not a file.")
}
Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream =>
driverSubmitSslOptions.trustStorePassword match {
case Some(password) =>
trustStore.load(trustStoreStream, password.toCharArray)
case None => trustStore.load(trustStoreStream, null)
}
}
trustManagerFactory.init(trustStore)
val trustManagers = trustManagerFactory.getTrustManagers
val sslContext = SSLContext.getInstance("TLSv1.2")
sslContext.init(null, trustManagers, SECURE_RANDOM)
(trustManagers(0).asInstanceOf[X509TrustManager], sslContext)
}).getOrElse((null, SSLContext.getDefault))
}

private def parseCustomLabels(maybeLabels: Option[String]): Map[String, String] = {
maybeLabels.map(labels => {
labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => {
Expand Down
Loading

0 comments on commit 1f8fca0

Please sign in to comment.