From 1f8fca02dc7f01a78233a4afbc3e3531dfb5d23f Mon Sep 17 00:00:00 2001 From: mccheah Date: Thu, 23 Feb 2017 23:54:23 -0800 Subject: [PATCH] Extract SSL configuration handling to a separate class (#123) * Extract SSL configuration to a separate class * KubernetesSsl -> Ssl, container -> local --- .../spark/deploy/kubernetes/Client.scala | 210 +++--------------- .../kubernetes/SslConfigurationProvider.scala | 203 +++++++++++++++++ 2 files changed, 232 insertions(+), 181 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index f55d71acfd6a0..c787d5917e381 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -16,13 +16,11 @@ */ package org.apache.spark.deploy.kubernetes -import java.io.{File, FileInputStream} -import java.security.{KeyStore, SecureRandom} +import java.io.File +import java.security.SecureRandom import java.util import java.util.concurrent.{CountDownLatch, TimeUnit} -import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager} -import com.google.common.base.Charsets import com.google.common.io.Files import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model._ @@ -30,9 +28,8 @@ import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultK import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.codec.binary.Base64 import scala.collection.JavaConverters._ -import scala.collection.mutable -import org.apache.spark.{SecurityManager, SparkConf, SparkException, SSLOptions} +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource} @@ -56,8 +53,6 @@ private[spark] class Client( private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId" private val secretDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId" - private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl" - private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId" private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) @@ -95,7 +90,6 @@ private[spark] class Client( throw new SparkException(s"Main app resource file $mainAppResource is not a file or" + s" is a directory.") } - val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() val parsedCustomLabels = parseCustomLabels(customLabels) var k8ConfBuilder = new K8SConfigBuilder() .withApiVersion("v1") @@ -115,6 +109,8 @@ private[spark] class Client( Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient => ShutdownHookManager.addShutdownHook(() => kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)) + val sslConfigurationProvider = new SslConfigurationProvider( + sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner) val submitServerSecret = kubernetesClient.secrets().createNew() .withNewMetadata() .withName(secretName) @@ -124,10 +120,7 @@ private[spark] class Client( .done() kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) try { - val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl( - kubernetesClient, - driverSubmitSslOptions, - isKeyStoreLocalFile) + val sslConfiguration = sslConfigurationProvider.getSslConfiguration() // start outer watch for status logging of driver pod val driverPodCompletedLatch = new CountDownLatch(1) // only enable interval logging if in waitForAppCompletion mode @@ -142,21 +135,16 @@ private[spark] class Client( kubernetesClient, parsedCustomLabels, submitServerSecret, - driverSubmitSslOptions, - sslSecrets, - sslVolumes, - sslVolumeMounts, - sslEnvs, - isKeyStoreLocalFile) + sslConfiguration) configureOwnerReferences( kubernetesClient, submitServerSecret, - sslSecrets, + sslConfiguration.sslSecrets, driverPod, driverService) submitApplicationToDriverServer( kubernetesClient, - driverSubmitSslOptions, + sslConfiguration, driverService, submitterLocalFiles, submitterLocalJars) @@ -182,7 +170,7 @@ private[spark] class Client( private def submitApplicationToDriverServer( kubernetesClient: KubernetesClient, - driverSubmitSslOptions: SSLOptions, + sslConfiguration: SslConfiguration, driverService: Service, submitterLocalFiles: Iterable[String], submitterLocalJars: Iterable[String]): Unit = { @@ -198,7 +186,7 @@ private[spark] class Client( sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) val driverSubmitter = buildDriverSubmissionClient(kubernetesClient, driverService, - driverSubmitSslOptions) + sslConfiguration) // Sanity check to see if the driver submitter is even reachable. driverSubmitter.ping() logInfo(s"Submitting local resources to driver pod for application " + @@ -229,20 +217,15 @@ private[spark] class Client( kubernetesClient: KubernetesClient, parsedCustomLabels: Map[String, String], submitServerSecret: Secret, - driverSubmitSslOptions: SSLOptions, - sslSecrets: Array[Secret], - sslVolumes: Array[Volume], - sslVolumeMounts: Array[VolumeMount], - sslEnvs: Array[EnvVar], - isKeyStoreLocalFile: Boolean): (Pod, Service) = { - val endpointsReadyFuture = SettableFuture.create[Endpoints] - val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture) - val serviceReadyFuture = SettableFuture.create[Service] + sslConfiguration: SslConfiguration): (Pod, Service) = { val driverKubernetesSelectors = (Map( SPARK_DRIVER_LABEL -> kubernetesAppId, SPARK_APP_ID_LABEL -> kubernetesAppId, SPARK_APP_NAME_LABEL -> appName) ++ parsedCustomLabels).asJava + val endpointsReadyFuture = SettableFuture.create[Endpoints] + val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture) + val serviceReadyFuture = SettableFuture.create[Service] val serviceReadyWatcher = new DriverServiceReadyWatcher(serviceReadyFuture) val podReadyFuture = SettableFuture.create[Pod] val podWatcher = new DriverPodReadyWatcher(podReadyFuture) @@ -267,10 +250,7 @@ private[spark] class Client( kubernetesClient, driverKubernetesSelectors, submitServerSecret, - driverSubmitSslOptions, - sslVolumes, - sslVolumeMounts, - sslEnvs) + sslConfiguration) kubernetesResourceCleaner.registerOrUpdateResource(driverPod) waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture, serviceReadyFuture, podReadyFuture) @@ -386,13 +366,10 @@ private[spark] class Client( kubernetesClient: KubernetesClient, driverKubernetesSelectors: util.Map[String, String], submitServerSecret: Secret, - driverSubmitSslOptions: SSLOptions, - sslVolumes: Array[Volume], - sslVolumeMounts: Array[VolumeMount], - sslEnvs: Array[EnvVar]): Pod = { + sslConfiguration: SslConfiguration): Pod = { val containerPorts = buildContainerPorts() val probePingHttpGet = new HTTPGetActionBuilder() - .withScheme(if (driverSubmitSslOptions.enabled) "HTTPS" else "HTTP") + .withScheme(if (sslConfiguration.sslOptions.enabled) "HTTPS" else "HTTP") .withPath("/v1/submissions/ping") .withNewPort(SUBMISSION_SERVER_PORT_NAME) .build() @@ -409,7 +386,7 @@ private[spark] class Client( .withSecretName(submitServerSecret.getMetadata.getName) .endSecret() .endVolume() - .addToVolumes(sslVolumes: _*) + .addToVolumes(sslConfiguration.sslPodVolumes: _*) .withServiceAccount(serviceAccount) .addNewContainer() .withName(DRIVER_CONTAINER_NAME) @@ -420,7 +397,7 @@ private[spark] class Client( .withMountPath(secretDirectory) .withReadOnly(true) .endVolumeMount() - .addToVolumeMounts(sslVolumeMounts: _*) + .addToVolumeMounts(sslConfiguration.sslPodVolumeMounts: _*) .addNewEnv() .withName(ENV_SUBMISSION_SECRET_LOCATION) .withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME") @@ -429,7 +406,7 @@ private[spark] class Client( .withName(ENV_SUBMISSION_SERVER_PORT) .withValue(SUBMISSION_SERVER_PORT.toString) .endEnv() - .addToEnv(sslEnvs: _*) + .addToEnv(sslConfiguration.sslPodEnvVars: _*) .withPorts(containerPorts.asJava) .withNewReadinessProbe().withHttpGet(probePingHttpGet).endReadinessProbe() .endContainer() @@ -486,108 +463,6 @@ private[spark] class Client( } } - private def parseDriverSubmitSslOptions(): (SSLOptions, Boolean) = { - val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE) - val resolvedSparkConf = sparkConf.clone() - val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => { - (KubernetesFileUtils.isUriLocalFile(keyStore), - Option.apply(Utils.resolveURI(keyStore).getPath)) - }).getOrElse((false, Option.empty[String])) - resolvedKeyStore.foreach { - resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, _) - } - sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE).foreach { trustStore => - if (KubernetesFileUtils.isUriLocalFile(trustStore)) { - resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, - Utils.resolveURI(trustStore).getPath) - } else { - throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" + - " for submit server must have no scheme, or scheme file://") - } - } - val securityManager = new SecurityManager(resolvedSparkConf) - (securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore) - } - - private def configureSsl( - kubernetesClient: KubernetesClient, - driverSubmitSslOptions: SSLOptions, - isKeyStoreLocalFile: Boolean): - (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { - if (driverSubmitSslOptions.enabled) { - val sslSecretsMap = mutable.HashMap[String, String]() - val sslEnvs = mutable.Buffer[EnvVar]() - val secrets = mutable.Buffer[Secret]() - driverSubmitSslOptions.keyStore.foreach(store => { - val resolvedKeyStoreFile = if (isKeyStoreLocalFile) { - if (!store.isFile) { - throw new SparkException(s"KeyStore specified at $store is not a file or" + - s" does not exist.") - } - val keyStoreBytes = Files.toByteArray(store) - val keyStoreBase64 = Base64.encodeBase64String(keyStoreBytes) - sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64) - s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_SECRET_NAME" - } else { - store.getAbsolutePath - } - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_KEYSTORE_FILE) - .withValue(resolvedKeyStoreFile) - .build() - }) - driverSubmitSslOptions.keyStorePassword.foreach(password => { - val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8)) - sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64) - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE) - .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME") - .build() - }) - driverSubmitSslOptions.keyPassword.foreach(password => { - val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8)) - sslSecretsMap += (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64) - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE) - .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME") - .build() - }) - driverSubmitSslOptions.keyStoreType.foreach(storeType => { - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_KEYSTORE_TYPE) - .withValue(storeType) - .build() - }) - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_USE_SSL) - .withValue("true") - .build() - val sslVolume = new VolumeBuilder() - .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME) - .withNewSecret() - .withSecretName(sslSecretsName) - .endSecret() - .build() - val sslVolumeMount = new VolumeMountBuilder() - .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME) - .withReadOnly(true) - .withMountPath(sslSecretsDirectory) - .build() - val sslSecrets = kubernetesClient.secrets().createNew() - .withNewMetadata() - .withName(sslSecretsName) - .endMetadata() - .withData(sslSecretsMap.asJava) - .withType("Opaque") - .done() - kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets) - secrets += sslSecrets - (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray) - } else { - (Array[EnvVar](), Array[Volume](), Array[VolumeMount](), Array[Secret]()) - } - } - private def buildSubmitFailedErrorMessage( kubernetesClient: KubernetesClient, e: Throwable): String = { @@ -688,8 +563,8 @@ private[spark] class Client( private def buildDriverSubmissionClient( kubernetesClient: KubernetesClient, service: Service, - driverSubmitSslOptions: SSLOptions): KubernetesSparkRestApi = { - val urlScheme = if (driverSubmitSslOptions.enabled) { + sslConfiguration: SslConfiguration): KubernetesSparkRestApi = { + val urlScheme = if (sslConfiguration.sslOptions.enabled) { "https" } else { logWarning("Submitting application details, application secret, and local" + @@ -714,45 +589,18 @@ private[spark] class Client( s"$urlScheme://${address.getAddress}:$servicePort" }).toSet require(nodeUrls.nonEmpty, "No nodes found to contact the driver!") - val (trustManager, sslContext): (X509TrustManager, SSLContext) = - if (driverSubmitSslOptions.enabled) { - buildSslConnectionConfiguration(driverSubmitSslOptions) - } else { - (null, SSLContext.getDefault) - } HttpClientUtil.createClient[KubernetesSparkRestApi]( uris = nodeUrls, maxRetriesPerServer = 3, - sslSocketFactory = sslContext.getSocketFactory, - trustContext = trustManager, + sslSocketFactory = sslConfiguration + .driverSubmitClientSslContext + .getSocketFactory, + trustContext = sslConfiguration + .driverSubmitClientTrustManager + .orNull, connectTimeoutMillis = 5000) } - private def buildSslConnectionConfiguration(driverSubmitSslOptions: SSLOptions) = { - driverSubmitSslOptions.trustStore.map(trustStoreFile => { - val trustManagerFactory = TrustManagerFactory.getInstance( - TrustManagerFactory.getDefaultAlgorithm) - val trustStore = KeyStore.getInstance( - driverSubmitSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType)) - if (!trustStoreFile.isFile) { - throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" + - s" does not exist or is not a file.") - } - Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream => - driverSubmitSslOptions.trustStorePassword match { - case Some(password) => - trustStore.load(trustStoreStream, password.toCharArray) - case None => trustStore.load(trustStoreStream, null) - } - } - trustManagerFactory.init(trustStore) - val trustManagers = trustManagerFactory.getTrustManagers - val sslContext = SSLContext.getInstance("TLSv1.2") - sslContext.init(null, trustManagers, SECURE_RANDOM) - (trustManagers(0).asInstanceOf[X509TrustManager], sslContext) - }).getOrElse((null, SSLContext.getDefault)) - } - private def parseCustomLabels(maybeLabels: Option[String]): Map[String, String] = { maybeLabels.map(labels => { labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala new file mode 100644 index 0000000000000..4c031fcba91ab --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import java.io.FileInputStream +import java.security.{KeyStore, SecureRandom} +import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager} + +import com.google.common.base.Charsets +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{EnvVar, EnvVarBuilder, Secret, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.util.Utils + +private[spark] case class SslConfiguration( + sslOptions: SSLOptions, + isKeyStoreLocalFile: Boolean, + sslPodEnvVars: Array[EnvVar], + sslPodVolumes: Array[Volume], + sslPodVolumeMounts: Array[VolumeMount], + sslSecrets: Array[Secret], + driverSubmitClientTrustManager: Option[X509TrustManager], + driverSubmitClientSslContext: SSLContext) + +private[spark] class SslConfigurationProvider( + sparkConf: SparkConf, + kubernetesAppId: String, + kubernetesClient: KubernetesClient, + kubernetesResourceCleaner: KubernetesResourceCleaner) { + private val SECURE_RANDOM = new SecureRandom() + private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId" + private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl" + + def getSslConfiguration(): SslConfiguration = { + val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() + if (driverSubmitSslOptions.enabled) { + val sslSecretsMap = mutable.HashMap[String, String]() + val sslEnvs = mutable.Buffer[EnvVar]() + val secrets = mutable.Buffer[Secret]() + driverSubmitSslOptions.keyStore.foreach(store => { + val resolvedKeyStoreFile = if (isKeyStoreLocalFile) { + if (!store.isFile) { + throw new SparkException(s"KeyStore specified at $store is not a file or" + + s" does not exist.") + } + val keyStoreBytes = Files.toByteArray(store) + val keyStoreBase64 = BaseEncoding.base64().encode(keyStoreBytes) + sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64) + s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_SECRET_NAME" + } else { + store.getAbsolutePath + } + sslEnvs += new EnvVarBuilder() + .withName(ENV_SUBMISSION_KEYSTORE_FILE) + .withValue(resolvedKeyStoreFile) + .build() + }) + driverSubmitSslOptions.keyStorePassword.foreach(password => { + val passwordBase64 = BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8)) + sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64) + sslEnvs += new EnvVarBuilder() + .withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE) + .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME") + .build() + }) + driverSubmitSslOptions.keyPassword.foreach(password => { + val passwordBase64 = BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8)) + sslSecretsMap += (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64) + sslEnvs += new EnvVarBuilder() + .withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE) + .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME") + .build() + }) + driverSubmitSslOptions.keyStoreType.foreach(storeType => { + sslEnvs += new EnvVarBuilder() + .withName(ENV_SUBMISSION_KEYSTORE_TYPE) + .withValue(storeType) + .build() + }) + sslEnvs += new EnvVarBuilder() + .withName(ENV_SUBMISSION_USE_SSL) + .withValue("true") + .build() + val sslVolume = new VolumeBuilder() + .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME) + .withNewSecret() + .withSecretName(sslSecretsName) + .endSecret() + .build() + val sslVolumeMount = new VolumeMountBuilder() + .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME) + .withReadOnly(true) + .withMountPath(sslSecretsDirectory) + .build() + val sslSecrets = kubernetesClient.secrets().createNew() + .withNewMetadata() + .withName(sslSecretsName) + .endMetadata() + .withData(sslSecretsMap.asJava) + .withType("Opaque") + .done() + kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets) + secrets += sslSecrets + val (driverSubmitClientTrustManager, driverSubmitClientSslContext) = + buildSslConnectionConfiguration(driverSubmitSslOptions) + SslConfiguration( + driverSubmitSslOptions, + isKeyStoreLocalFile, + sslEnvs.toArray, + Array(sslVolume), + Array(sslVolumeMount), + secrets.toArray, + driverSubmitClientTrustManager, + driverSubmitClientSslContext) + } else { + SslConfiguration( + driverSubmitSslOptions, + isKeyStoreLocalFile, + Array[EnvVar](), + Array[Volume](), + Array[VolumeMount](), + Array[Secret](), + None, + SSLContext.getDefault) + } + } + + private def parseDriverSubmitSslOptions(): (SSLOptions, Boolean) = { + val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE) + val resolvedSparkConf = sparkConf.clone() + val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => { + val keyStoreURI = Utils.resolveURI(keyStore) + val isProvidedKeyStoreLocal = keyStoreURI.getScheme match { + case "file" | null => true + case "local" => false + case _ => throw new SparkException(s"Invalid KeyStore URI $keyStore; keyStore URI" + + " for submit server must have scheme file:// or local:// (no scheme defaults" + + " to file://)") + } + (isProvidedKeyStoreLocal, Option.apply(keyStoreURI.getPath)) + }).getOrElse((false, Option.empty[String])) + resolvedKeyStore.foreach { + resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, _) + } + sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE).foreach { trustStore => + val trustStoreURI = Utils.resolveURI(trustStore) + trustStoreURI.getScheme match { + case "file" | null => + resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, trustStoreURI.getPath) + case _ => throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" + + " for submit server must have no scheme, or scheme file://") + } + } + val securityManager = new SparkSecurityManager(resolvedSparkConf) + (securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore) + } + + private def buildSslConnectionConfiguration(driverSubmitSslOptions: SSLOptions): + (Option[X509TrustManager], SSLContext) = { + driverSubmitSslOptions.trustStore.map(trustStoreFile => { + val trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm) + val trustStore = KeyStore.getInstance( + driverSubmitSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType)) + if (!trustStoreFile.isFile) { + throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" + + s" does not exist or is not a file.") + } + Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream => + driverSubmitSslOptions.trustStorePassword match { + case Some(password) => + trustStore.load(trustStoreStream, password.toCharArray) + case None => trustStore.load(trustStoreStream, null) + } + } + trustManagerFactory.init(trustStore) + val trustManagers = trustManagerFactory.getTrustManagers + val sslContext = SSLContext.getInstance("TLSv1.2") + sslContext.init(null, trustManagers, SECURE_RANDOM) + (Option.apply(trustManagers(0).asInstanceOf[X509TrustManager]), sslContext) + }).getOrElse((Option.empty[X509TrustManager], SSLContext.getDefault)) + } +}