diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 45e5a46a26258..ab442131ad271 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -364,10 +364,15 @@ package object config extends Logging { private[spark] val RESOURCE_STAGING_SERVER_SSL_NAMESPACE = "kubernetes.resourceStagingServer" private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM = ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.serverCertPem") - .doc("Certificate PEM file to use when having the Kubernetes dependency server" + + .doc("Certificate PEM file to use when having the resource staging server" + " listen on TLS.") .stringConf .createOptional + private[spark] val RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM = + ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.clientCertPem") + .doc("Certificate PEM file to use when the client contacts the resource staging server.") + .stringConf + .createOptional private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE = ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyStorePasswordFile") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala index da08e17dee85b..23e3e09834372 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.rest.kubernetes.v2.ResourceStagingServerSslOptionsProviderImpl import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.Utils @@ -267,8 +268,9 @@ private[spark] object Client { val appName = sparkConf.getOption("spark.app.name") .getOrElse("spark") val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") + val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf) val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl( - sparkConf, kubernetesAppId, sparkJars, sparkFiles) + sparkConf, kubernetesAppId, sparkJars, sparkFiles, sslOptionsProvider.getSslOptions) val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf) val kubernetesCredentialsMounterProvider = new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverInitContainerComponentsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverInitContainerComponentsProvider.scala index 5b649735f2b3d..7f6ae2ec47675 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverInitContainerComponentsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverInitContainerComponentsProvider.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.deploy.kubernetes.submit.v2 -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.{SparkConf, SSLOptions} import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ @@ -46,12 +46,11 @@ private[spark] class DriverInitContainerComponentsProviderImpl( sparkConf: SparkConf, kubernetesAppId: String, sparkJars: Seq[String], - sparkFiles: Seq[String]) + sparkFiles: Seq[String], + resourceStagingServerSslOptions: SSLOptions) extends DriverInitContainerComponentsProvider { private val maybeResourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI) - private val resourceStagingServerSslOptions = new SecurityManager(sparkConf) - .getSSLOptions(RESOURCE_STAGING_SERVER_SSL_NAMESPACE) private val jarsDownloadPath = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) private val filesDownloadPath = sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION) private val maybeSecretName = maybeResourceStagingServerUri.map { _ => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilder.scala index 1853b2ecce6d2..b8fa43d0573f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencySecretBuilder.scala @@ -22,7 +22,6 @@ import io.fabric8.kubernetes.api.model.{Secret, SecretBuilder} import scala.collection.JavaConverters._ import org.apache.spark.SSLOptions -import org.apache.spark.deploy.kubernetes.constants._ private[spark] trait SubmittedDependencySecretBuilder { /** diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala index 52ca3ef956a79..5cd24a8f9b75e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala @@ -414,18 +414,20 @@ private[spark] object KubernetesSparkRestServer { // If keystore password isn't set but we're using PEM files, generate a password .orElse(parsedArguments.keyPemFile.map(_ => randomPassword())) val resolvedKeyStore = parsedArguments.keyStoreFile.map(new File(_)).orElse( - parsedArguments.keyPemFile.map(keyPemFile => { - parsedArguments.certPemFile.map(certPemFile => { - PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile( - new File(keyPemFile), - new File(certPemFile), - "provided-key", - keyStorePassword, - keyPassword, - parsedArguments.keyStoreType) - }) - }).getOrElse(throw new SparkException("When providing PEM files to set up TLS for the" + - " submission server, both the key and the certificate must be specified."))) + for { + keyPemFile <- parsedArguments.keyPemFile + certPemFile <- parsedArguments.certPemFile + resolvedKeyStorePassword <- keyStorePassword + resolvedKeyPassword <- keyPassword + } yield { + PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile( + new File(keyPemFile), + new File(certPemFile), + "provided-key", + resolvedKeyStorePassword, + resolvedKeyPassword, + parsedArguments.keyStoreType) + }) new SSLOptions( enabled = true, keyStore = resolvedKeyStore, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala index 2c68b150baf91..178956a136d1c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala @@ -43,8 +43,8 @@ private[spark] object PemsToKeyStoreConverter { keyPemFile: File, certPemFile: File, keyAlias: String, - keyStorePassword: Option[String], - keyPassword: Option[String], + keyStorePassword: String, + keyPassword: String, keyStoreType: Option[String]): File = { require(keyPemFile.isFile, s"Key PEM file provided at ${keyPemFile.getAbsolutePath}" + " does not exist or is not a file.") @@ -58,12 +58,12 @@ private[spark] object PemsToKeyStoreConverter { keyStore.setKeyEntry( keyAlias, privateKey, - keyPassword.map(_.toCharArray).orNull, + keyPassword.toCharArray, certificates) val keyStoreDir = Utils.createTempDir("temp-keystores") val keyStoreFile = new File(keyStoreDir, s"keystore-${UUID.randomUUID()}.$resolvedKeyStoreType") Utils.tryWithResource(new FileOutputStream(keyStoreFile)) { storeStream => - keyStore.store(storeStream, keyStorePassword.map(_.toCharArray).orNull) + keyStore.store(storeStream, keyStorePassword.toCharArray) } keyStoreFile } @@ -81,6 +81,20 @@ private[spark] object PemsToKeyStoreConverter { trustStore } + def convertCertPemToTempTrustStoreFile( + certPemFile: File, + trustStorePassword: String, + trustStoreType: Option[String]): File = { + val trustStore = convertCertPemToTrustStore(certPemFile, trustStoreType) + val tempTrustStoreDir = Utils.createTempDir(namePrefix = "temp-trustStore") + val tempTrustStoreFile = new File(tempTrustStoreDir, + s"trustStore.${trustStoreType.getOrElse(KeyStore.getDefaultType)}") + Utils.tryWithResource(new FileOutputStream(tempTrustStoreFile)) { + trustStore.store(_, trustStorePassword.toCharArray) + } + tempTrustStoreFile + } + private def withPemParsedFromFile[T](pemFile: File)(f: (PEMParser => T)): T = { Utils.tryWithResource(new FileInputStream(pemFile)) { pemStream => Utils.tryWithResource(new InputStreamReader(pemStream, Charsets.UTF_8)) { pemReader => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala index 67caa176930ea..7f21087159145 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainer.scala @@ -28,7 +28,7 @@ import retrofit2.{Call, Callback, Response} import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration -import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf} +import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SSLOptions} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.CompressionUtils @@ -95,7 +95,7 @@ private[spark] class KubernetesSparkDependencyDownloadInitContainer( sparkConf: SparkConf, retrofitClientFactory: RetrofitClientFactory, fileFetcher: FileFetcher, - securityManager: SparkSecurityManager) extends Logging { + resourceStagingServerSslOptions: SSLOptions) extends Logging { private implicit val downloadExecutor = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("download-executor")) @@ -177,9 +177,10 @@ private[spark] class KubernetesSparkDependencyDownloadInitContainer( maybeResourceId.foreach { resourceId => require(resourceSecretLocation.isFile, errMessageOnSecretNotAFile) require(resourceDownloadDir.isDirectory, errMessageOnDownloadDirNotADirectory) - val sslOptions = securityManager.getSSLOptions("kubernetes.resourceStagingServer") val service = retrofitClientFactory.createRetrofitClient( - resourceStagingServerUri, classOf[ResourceStagingServiceRetrofit], sslOptions) + resourceStagingServerUri, + classOf[ResourceStagingServiceRetrofit], + resourceStagingServerSslOptions) val resourceSecret = Files.toString(resourceSecretLocation, Charsets.UTF_8) val downloadResourceCallback = new DownloadTarGzCallback(resourceDownloadDir) logInfo(downloadStartMessage) @@ -219,12 +220,14 @@ object KubernetesSparkDependencyDownloadInitContainer extends Logging { new SparkConf(true) } val securityManager = new SparkSecurityManager(sparkConf) + val resourceStagingServerSslOptions = + new ResourceStagingServerSslOptionsProviderImpl(sparkConf).getSslOptions val fileFetcher = new FileFetcherImpl(sparkConf, securityManager) new KubernetesSparkDependencyDownloadInitContainer( sparkConf, RetrofitClientFactoryImpl, fileFetcher, - securityManager).run() + resourceStagingServerSslOptions).run() logInfo("Finished downloading application dependencies.") } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProvider.scala index 2744ed0a74616..6b88426d00e72 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProvider.scala @@ -17,9 +17,11 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import java.io.File +import java.security.SecureRandom import com.google.common.base.Charsets import com.google.common.io.Files +import org.apache.commons.lang3.RandomStringUtils import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.kubernetes.config._ @@ -32,20 +34,29 @@ private[spark] trait ResourceStagingServerSslOptionsProvider { private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: SparkConf) extends ResourceStagingServerSslOptionsProvider with Logging { + + private val SECURE_RANDOM = new SecureRandom() + def getSslOptions: SSLOptions = { val baseSslOptions = new SparkSecurityManager(sparkConf) - .getSSLOptions("kubernetes.resourceStagingServer") + .getSSLOptions(RESOURCE_STAGING_SERVER_SSL_NAMESPACE) val maybeKeyPem = sparkConf.get(RESOURCE_STAGING_SERVER_KEY_PEM) - val maybeCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CERT_PEM) + val maybeServerCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CERT_PEM) val maybeKeyStorePasswordFile = sparkConf.get(RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE) val maybeKeyPasswordFile = sparkConf.get(RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE) + val maybeClientCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM) logSslConfigurations( - baseSslOptions, maybeKeyPem, maybeCertPem, maybeKeyStorePasswordFile, maybeKeyPasswordFile) + baseSslOptions, + maybeKeyPem, + maybeServerCertPem, + maybeKeyStorePasswordFile, + maybeKeyPasswordFile, + maybeClientCertPem) requireNandDefined(baseSslOptions.keyStore, maybeKeyPem, "Shouldn't provide both key PEM and keyStore files for TLS.") - requireNandDefined(baseSslOptions.keyStore, maybeCertPem, + requireNandDefined(baseSslOptions.keyStore, maybeServerCertPem, "Shouldn't provide both certificate PEM and keyStore files for TLS.") requireNandDefined(baseSslOptions.keyStorePassword, maybeKeyStorePasswordFile, "Shouldn't provide both the keyStore password value and the keyStore password file.") @@ -53,42 +64,68 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar "Shouldn't provide both the keyStore key password value and the keyStore key password file.") requireBothOrNeitherDefined( maybeKeyPem, - maybeCertPem, + maybeServerCertPem, "When providing a certificate PEM file, the key PEM file must also be provided.", "When providing a key PEM file, the certificate PEM file must also be provided.") + requireNandDefined(baseSslOptions.trustStore, maybeClientCertPem, + "Shouldn't provide both the trustStore and a client certificate PEM file.") val resolvedKeyStorePassword = baseSslOptions.keyStorePassword .orElse(maybeKeyStorePasswordFile.map { keyStorePasswordFile => safeFileToString(keyStorePasswordFile, "KeyStore password file") }) + .orElse(maybeKeyPem.map { _ => randomPassword()}) val resolvedKeyStoreKeyPassword = baseSslOptions.keyPassword .orElse(maybeKeyPasswordFile.map { keyPasswordFile => safeFileToString(keyPasswordFile, "KeyStore key password file") }) - val resolvedKeyStore = baseSslOptions.keyStore - .orElse(maybeKeyPem.map { keyPem => + .orElse(maybeKeyPem.map { _ => randomPassword()}) + val resolvedKeyStore = baseSslOptions.keyStore.orElse { + for { + keyPem <- maybeKeyPem + certPem <- maybeServerCertPem + keyStorePassword <- resolvedKeyStorePassword + keyPassword <- resolvedKeyStoreKeyPassword + } yield { val keyPemFile = new File(keyPem) - val certPemFile = new File(maybeCertPem.get) + val certPemFile = new File(certPem) PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile( keyPemFile, certPemFile, "key", - resolvedKeyStorePassword, - resolvedKeyStoreKeyPassword, + keyStorePassword, + keyPassword, baseSslOptions.keyStoreType) - }) + } + } + val resolvedTrustStorePassword = baseSslOptions.trustStorePassword + .orElse(maybeClientCertPem.map( _ => "defaultTrustStorePassword")) + val resolvedTrustStore = baseSslOptions.trustStore.orElse { + for { + clientCertPem <- maybeClientCertPem + trustStorePassword <- resolvedTrustStorePassword + } yield { + val certPemFile = new File(clientCertPem) + PemsToKeyStoreConverter.convertCertPemToTempTrustStoreFile( + certPemFile, + trustStorePassword, + baseSslOptions.trustStoreType) + } + } baseSslOptions.copy( keyStore = resolvedKeyStore, keyStorePassword = resolvedKeyStorePassword, - keyPassword = resolvedKeyStoreKeyPassword) + keyPassword = resolvedKeyStoreKeyPassword, + trustStore = resolvedTrustStore) } private def logSslConfigurations( baseSslOptions: SSLOptions, maybeKeyPem: Option[String], - maybeCertPem: Option[String], + maybeServerCertPem: Option[String], maybeKeyStorePasswordFile: Option[String], - maybeKeyPasswordFile: Option[String]) = { + maybeKeyPasswordFile: Option[String], + maybeClientCertPem: Option[String]) = { logDebug("The following SSL configurations were provided for the resource staging server:") logDebug(s"KeyStore File: ${baseSslOptions.keyStore.map(_.getAbsolutePath).getOrElse("N/A")}") logDebug("KeyStore Password: " + @@ -99,7 +136,8 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar logDebug(s"Key Password File: ${maybeKeyPasswordFile.getOrElse("N/A")}") logDebug(s"KeyStore Type: ${baseSslOptions.keyStoreType.getOrElse("N/A")}") logDebug(s"Key PEM: ${maybeKeyPem.getOrElse("N/A")}") - logDebug(s"Certificate PEM: ${maybeCertPem.getOrElse("N/A")}") + logDebug(s"Server-side certificate PEM: ${maybeServerCertPem.getOrElse("N/A")}") + logDebug(s"Client-side certificate PEM: ${maybeClientCertPem.getOrElse("N/A")}") } private def requireBothOrNeitherDefined( @@ -130,4 +168,8 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar } Files.toString(file, Charsets.UTF_8) } + + private def randomPassword(): String = { + RandomStringUtils.random(1024, 0, Integer.MAX_VALUE, false, false, null, SECURE_RANDOM) + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SSLUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SSLUtils.scala index 0cb056dcf5493..886484ffb4692 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SSLUtils.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SSLUtils.scala @@ -30,6 +30,7 @@ import org.bouncycastle.cert.jcajce.{JcaX509CertificateConverter, JcaX509v3Certi import org.bouncycastle.openssl.jcajce.JcaPEMWriter import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder +import org.apache.spark.deploy.kubernetes.submit.v2.{KeyAndCertPem, KeyStoreAndTrustStore} import org.apache.spark.util.Utils private[spark] object SSLUtils { @@ -38,7 +39,7 @@ private[spark] object SSLUtils { ipAddress: String, keyStorePassword: String, keyPassword: String, - trustStorePassword: String): (File, File) = { + trustStorePassword: String): KeyStoreAndTrustStore = { val keyPairGenerator = KeyPairGenerator.getInstance("RSA") keyPairGenerator.initialize(512) val keyPair = keyPairGenerator.generateKeyPair() @@ -60,10 +61,10 @@ private[spark] object SSLUtils { Utils.tryWithResource(new FileOutputStream(trustStoreFile)) { trustStore.store(_, trustStorePassword.toCharArray) } - (keyStoreFile, trustStoreFile) + KeyStoreAndTrustStore(keyStoreFile, trustStoreFile) } - def generateKeyCertPemPair(ipAddress: String): (File, File) = { + def generateKeyCertPemPair(ipAddress: String): KeyAndCertPem = { val keyPairGenerator = KeyPairGenerator.getInstance("RSA") keyPairGenerator.initialize(512) val keyPair = keyPairGenerator.generateKeyPair() @@ -90,7 +91,7 @@ private[spark] object SSLUtils { } } } - (keyPemFile, certPemFile) + KeyAndCertPem(keyPemFile, certPemFile) } private def generateCertificate(ipAddress: String, keyPair: KeyPair): X509Certificate = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SSLFilePairs.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SSLFilePairs.scala new file mode 100644 index 0000000000000..3d3ff7ad7011a --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/SSLFilePairs.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.v2 + +import java.io.File + +case class KeyAndCertPem(keyPem: File, certPem: File) + +case class KeyStoreAndTrustStore(keyStore: File, trustStore: File) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainerSuite.scala index 6ab37185b8d07..c551fbc01d060 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyDownloadInitContainerSuite.scala @@ -99,7 +99,7 @@ class KubernetesSparkDependencyDownloadInitContainerSuite sparkConf, retrofitClientFactory, fileFetcher, - securityManager = new SparkSecurityManager(sparkConf)) + resourceStagingServerSslOptions = STAGING_SERVER_SSL_OPTIONS) when(retrofitClient.downloadResources(JARS_RESOURCE_ID, downloadJarsSecretValue)) .thenReturn(downloadJarsCall) when(retrofitClient.downloadResources(FILES_RESOURCE_ID, downloadFilesSecretValue)) @@ -126,7 +126,7 @@ class KubernetesSparkDependencyDownloadInitContainerSuite sparkConf, retrofitClientFactory, fileFetcher, - securityManager = new SparkSecurityManager(sparkConf)) + resourceStagingServerSslOptions = STAGING_SERVER_SSL_OPTIONS) initContainerUnderTest.run() Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar", downloadJarsDir) Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", downloadJarsDir) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProviderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProviderSuite.scala index 10aced9000bf8..c33d8beb2c397 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProviderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProviderSuite.scala @@ -66,10 +66,12 @@ class ResourceStagingServerSslOptionsProviderSuite extends SparkFunSuite with Be } test("Setting key and certificate pem files should write an appropriate keyStore.") { - val (keyPemFile, certPemFile) = SSLUtils.generateKeyCertPemPair("127.0.0.1") + val keyAndCertPem = SSLUtils.generateKeyCertPemPair("127.0.0.1") sparkConf.set("spark.ssl.kubernetes.resourceStagingServer.enabled", "true") - .set("spark.ssl.kubernetes.resourceStagingServer.keyPem", keyPemFile.getAbsolutePath) - .set("spark.ssl.kubernetes.resourceStagingServer.serverCertPem", certPemFile.getAbsolutePath) + .set("spark.ssl.kubernetes.resourceStagingServer.keyPem", + keyAndCertPem.keyPem.getAbsolutePath) + .set("spark.ssl.kubernetes.resourceStagingServer.serverCertPem", + keyAndCertPem.certPem.getAbsolutePath) .set("spark.ssl.kubernetes.resourceStagingServer.keyStorePassword", "keyStorePassword") .set("spark.ssl.kubernetes.resourceStagingServer.keyPassword", "keyPassword") val sslOptions = sslOptionsProvider.getSslOptions @@ -81,9 +83,37 @@ class ResourceStagingServerSslOptionsProviderSuite extends SparkFunSuite with Be keyStore.load(_, "keyStorePassword".toCharArray) } val key = keyStore.getKey("key", "keyPassword".toCharArray) - compareJcaPemObjectToFileString(key, keyPemFile) + compareJcaPemObjectToFileString(key, keyAndCertPem.keyPem) val certificate = keyStore.getCertificateChain("key")(0) - compareJcaPemObjectToFileString(certificate, certPemFile) + compareJcaPemObjectToFileString(certificate, keyAndCertPem.certPem) + } + } + + test("Setting pem files without setting passwords should use random passwords.") { + val keyAndCertPem = SSLUtils.generateKeyCertPemPair("127.0.0.1") + sparkConf.set("spark.ssl.kubernetes.resourceStagingServer.enabled", "true") + .set("spark.ssl.kubernetes.resourceStagingServer.keyPem", + keyAndCertPem.keyPem.getAbsolutePath) + .set("spark.ssl.kubernetes.resourceStagingServer.serverCertPem", + keyAndCertPem.certPem.getAbsolutePath) + val sslOptions = sslOptionsProvider.getSslOptions + assert(sslOptions.enabled, "SSL should be enabled.") + assert(sslOptions.keyStore.isDefined, "KeyStore should be defined.") + assert(sslOptions.keyStorePassword.isDefined) + assert(sslOptions.keyPassword.isDefined) + for { + keyStoreFile <- sslOptions.keyStore + keyStorePassword <- sslOptions.keyStorePassword + keyPassword <- sslOptions.keyPassword + } { + val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) + Utils.tryWithResource(new FileInputStream(keyStoreFile)) { + keyStore.load(_, keyStorePassword.toCharArray) + } + val key = keyStore.getKey("key", keyPassword.toCharArray) + compareJcaPemObjectToFileString(key, keyAndCertPem.keyPem) + val certificate = keyStore.getCertificateChain("key")(0) + compareJcaPemObjectToFileString(certificate, keyAndCertPem.certPem) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala index 4ef12e8686bb0..4ffb0d4dfa887 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala @@ -57,17 +57,17 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter { } test("Enable SSL on the server") { - val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair( + val keyStoreAndTrustStore = SSLUtils.generateKeyStoreTrustStorePair( ipAddress = "127.0.0.1", keyStorePassword = "keyStore", keyPassword = "key", trustStorePassword = "trustStore") val sslOptions = SSLOptions( enabled = true, - keyStore = Some(keyStore), + keyStore = Some(keyStoreAndTrustStore.keyStore), keyStorePassword = Some("keyStore"), keyPassword = Some("key"), - trustStore = Some(trustStore), + trustStore = Some(keyStoreAndTrustStore.trustStore), trustStorePassword = Some("trustStore")) sslOptionsProvider.setOptions(sslOptions) server.start() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala index f09339a9c3e08..559cb281c7c62 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala @@ -34,7 +34,7 @@ import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube -import org.apache.spark.deploy.kubernetes.integrationtest.constants.{GCE_TEST_BACKEND, MINIKUBE_TEST_BACKEND} +import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 import org.apache.spark.deploy.kubernetes.submit.v1.{Client, ExternalSuppliedUrisDriverServiceManager} import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} @@ -190,16 +190,17 @@ private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend) test("Enable SSL on the driver submit server") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) - val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair( + val keyStoreAndTrustStore = SSLUtils.generateKeyStoreTrustStorePair( Minikube.getMinikubeIp, "changeit", "changeit", "changeit") - sparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE, s"file://${keyStoreFile.getAbsolutePath}") + sparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE, + s"file://${keyStoreAndTrustStore.keyStore.getAbsolutePath}") sparkConf.set("spark.ssl.kubernetes.driversubmitserver.keyStorePassword", "changeit") sparkConf.set("spark.ssl.kubernetes.driversubmitserver.keyPassword", "changeit") sparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE, - s"file://${trustStoreFile.getAbsolutePath}") + s"file://${keyStoreAndTrustStore.trustStore.getAbsolutePath}") sparkConf.set("spark.ssl.kubernetes.driversubmitserver.trustStorePassword", "changeit") sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true) new Client( @@ -212,10 +213,12 @@ private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend) test("Enable SSL on the driver submit server using PEM files") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) - val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp) - sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}") - sparkConf.set(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM, s"file://${certPem.getAbsolutePath}") - sparkConf.set(DRIVER_SUBMIT_SSL_SERVER_CERT_PEM, s"file://${certPem.getAbsolutePath}") + val keyAndCertPem = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp) + sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyAndCertPem.keyPem.getAbsolutePath}") + sparkConf.set( + DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM, s"file://${keyAndCertPem.certPem.getAbsolutePath}") + sparkConf.set( + DRIVER_SUBMIT_SSL_SERVER_CERT_PEM, s"file://${keyAndCertPem.certPem.getAbsolutePath}") sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true) new Client( sparkConf = sparkConf, diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala index ba9d088bfcfcc..e9900b90cb588 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala @@ -29,7 +29,7 @@ import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND -import org.apache.spark.deploy.kubernetes.submit.v2.Client +import org.apache.spark.deploy.kubernetes.submit.v2.{Client, KeyAndCertPem} import org.apache.spark.launcher.SparkLauncher @DoNotDiscover @@ -65,31 +65,34 @@ private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) test("Use submission v2.") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) - launchStagingServer(SSLOptions()) + launchStagingServer(SSLOptions(), None) runSparkPiAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) } test("Enable SSL on the submission server") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) - val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair( + val keyStoreAndTrustStore = SSLUtils.generateKeyStoreTrustStorePair( ipAddress = Minikube.getMinikubeIp, keyStorePassword = "keyStore", keyPassword = "key", trustStorePassword = "trustStore") sparkConf.set(RESOURCE_STAGING_SERVER_SSL_ENABLED, true) - .set("spark.ssl.kubernetes.resourceStagingServer.keyStore", keyStore.getAbsolutePath) - .set("spark.ssl.kubernetes.resourceStagingServer.trustStore", trustStore.getAbsolutePath) + .set("spark.ssl.kubernetes.resourceStagingServer.keyStore", + keyStoreAndTrustStore.keyStore.getAbsolutePath) + .set("spark.ssl.kubernetes.resourceStagingServer.trustStore", + keyStoreAndTrustStore.trustStore.getAbsolutePath) .set("spark.ssl.kubernetes.resourceStagingServer.keyStorePassword", "keyStore") .set("spark.ssl.kubernetes.resourceStagingServer.keyPassword", "key") .set("spark.ssl.kubernetes.resourceStagingServer.trustStorePassword", "trustStore") launchStagingServer(SSLOptions( enabled = true, - keyStore = Some(keyStore), - trustStore = Some(trustStore), + keyStore = Some(keyStoreAndTrustStore.keyStore), + trustStore = Some(keyStoreAndTrustStore.trustStore), keyStorePassword = Some("keyStore"), keyPassword = Some("key"), - trustStorePassword = Some("trustStore"))) + trustStorePassword = Some("trustStore")), + None) runSparkPiAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) } @@ -104,7 +107,7 @@ private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) test("Dynamic executor scaling basic test") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) - launchStagingServer(SSLOptions()) + launchStagingServer(SSLOptions(), None) createShuffleServiceDaemonSet() sparkConf.setJars(Seq(KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH)) @@ -117,6 +120,7 @@ private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) } test("Use remote resources without the resource staging server.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) val assetServerUri = staticAssetServerLauncher.launchStaticAssetServer() sparkConf.setJars(Seq( s"$assetServerUri/${KubernetesSuite.EXAMPLES_JAR_FILE.getName}", @@ -126,7 +130,8 @@ private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) } test("Mix remote resources with submitted ones.") { - launchStagingServer(SSLOptions()) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + launchStagingServer(SSLOptions(), None) val assetServerUri = staticAssetServerLauncher.launchStaticAssetServer() sparkConf.setJars(Seq( KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE, @@ -135,7 +140,20 @@ private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) runSparkPiAndVerifyCompletion(SparkLauncher.NO_RESOURCE) } + test("Use key and certificate PEM files for TLS.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + val keyAndCertificate = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp) + launchStagingServer( + SSLOptions(enabled = true), + Some(keyAndCertificate)) + sparkConf.set(RESOURCE_STAGING_SERVER_SSL_ENABLED, true) + .set( + RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM.key, keyAndCertificate.certPem.getAbsolutePath) + runSparkPiAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) + } + test("Use client key and client cert file when requesting executors") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) sparkConf.setJars(Seq( KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE, KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH)) @@ -148,11 +166,12 @@ private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) runSparkPiAndVerifyCompletion(SparkLauncher.NO_RESOURCE) } - private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = { + private def launchStagingServer( + resourceStagingServerSslOptions: SSLOptions, keyAndCertPem: Option[KeyAndCertPem]): Unit = { assume(testBackend.name == MINIKUBE_TEST_BACKEND) val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer( - resourceStagingServerSslOptions) + resourceStagingServerSslOptions, keyAndCertPem) val resourceStagingServerUriScheme = if (resourceStagingServerSslOptions.enabled) { "https" } else { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ResourceStagingServerLauncher.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ResourceStagingServerLauncher.scala index 3a99f907d15fd..1ba54c131c196 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ResourceStagingServerLauncher.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ResourceStagingServerLauncher.scala @@ -16,21 +16,17 @@ */ package org.apache.spark.deploy.kubernetes.integrationtest -import java.io.StringWriter +import java.io.{File, StringWriter} import java.util.Properties -import java.util.concurrent.TimeUnit import com.google.common.io.{BaseEncoding, Files} -import com.google.common.util.concurrent.SettableFuture -import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, Endpoints, HasMetadata, HTTPGetActionBuilder, KeyToPathBuilder, Pod, PodBuilder, SecretBuilder, ServiceBuilder} -import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} -import io.fabric8.kubernetes.client.Watcher.Action -import io.fabric8.kubernetes.client.internal.readiness.Readiness +import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, Endpoints, HTTPGetActionBuilder, KeyToPathBuilder, Pod, PodBuilder, SecretBuilder, ServiceBuilder} +import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.JavaConverters._ import org.apache.spark.SSLOptions import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.submit.v2.ContainerNameEqualityPredicate +import org.apache.spark.deploy.kubernetes.submit.v2.{ContainerNameEqualityPredicate, KeyAndCertPem} import org.apache.spark.util.Utils /** @@ -38,23 +34,39 @@ import org.apache.spark.util.Utils */ private[spark] class ResourceStagingServerLauncher(kubernetesClient: KubernetesClient) { - private val KEYSTORE_DIR = "/mnt/secrets/spark-staging" - private val KEYSTORE_FILE = s"$KEYSTORE_DIR/keyStore" + private val SECRETS_ROOT_DIR = "/mnt/secrets/spark-staging" + private val KEYSTORE_SECRET_KEY = "keyStore" + private val KEYSTORE_FILE = s"$SECRETS_ROOT_DIR/$KEYSTORE_SECRET_KEY" + private val KEY_PEM_SECRET_KEY = "keyPem" + private val CERT_PEM_SECRET_KEY = "certPem" + private val KEY_PEM_FILE = s"$SECRETS_ROOT_DIR/$KEY_PEM_SECRET_KEY" + private val CERT_PEM_FILE = s"$SECRETS_ROOT_DIR/$CERT_PEM_SECRET_KEY" + private val SSL_SECRET_NAME = "resource-staging-server-ssl-secrets" private val PROPERTIES_FILE_NAME = "staging-server.properties" private val PROPERTIES_DIR = "/var/data/spark-staging-server" private val PROPERTIES_FILE_PATH = s"$PROPERTIES_DIR/$PROPERTIES_FILE_NAME" // Returns the NodePort the staging server is listening on - def launchStagingServer(sslOptions: SSLOptions): Int = { + def launchStagingServer( + sslOptions: SSLOptions, + keyAndCertPem: Option[KeyAndCertPem] = None): Int = { val stagingServerProperties = new Properties() val stagingServerSecret = sslOptions.keyStore.map { keyStore => val keyStoreBytes = Files.toByteArray(keyStore) val keyStoreBase64 = BaseEncoding.base64().encode(keyStoreBytes) + Map(KEYSTORE_SECRET_KEY -> keyStoreBase64) + }.orElse { + keyAndCertPem.map { keyAndCert => + val keyPemBytes = Files.toByteArray(keyAndCert.keyPem) + val keyPemBase64 = BaseEncoding.base64().encode(keyPemBytes) + val certPemBytes = Files.toByteArray(keyAndCert.certPem) + val certPemBase64 = BaseEncoding.base64().encode(certPemBytes) + Map(KEY_PEM_SECRET_KEY -> keyPemBase64, CERT_PEM_SECRET_KEY -> certPemBase64) + } + }.map { secretData => new SecretBuilder() - .withNewMetadata() - .withName("resource-staging-server-keystore") - .endMetadata() - .addToData("keyStore", keyStoreBase64) + .withNewMetadata().withName(SSL_SECRET_NAME).endMetadata() + .withData(secretData.asJava) .build() } stagingServerProperties.setProperty( @@ -67,10 +79,18 @@ private[spark] class ResourceStagingServerLauncher(kubernetesClient: KubernetesC stagingServerProperties.setProperty( "spark.ssl.kubernetes.resourceStagingServer.keyPassword", password) } - stagingServerSecret.foreach { _ => + sslOptions.keyStore.foreach { _ => stagingServerProperties.setProperty( "spark.ssl.kubernetes.resourceStagingServer.keyStore", KEYSTORE_FILE) } + keyAndCertPem.foreach { _ => + stagingServerProperties.setProperty( + RESOURCE_STAGING_SERVER_KEY_PEM.key, KEY_PEM_FILE) + } + keyAndCertPem.foreach { _ => + stagingServerProperties.setProperty( + RESOURCE_STAGING_SERVER_CERT_PEM.key, CERT_PEM_FILE) + } val propertiesWriter = new StringWriter() stagingServerProperties.store(propertiesWriter, "Resource staging server properties.") val stagingServerConfigMap = new ConfigMapBuilder() @@ -126,7 +146,7 @@ private[spark] class ResourceStagingServerLauncher(kubernetesClient: KubernetesC .editMatchingContainer(new ContainerNameEqualityPredicate("staging-server-container")) .addNewVolumeMount() .withName("keystore-volume") - .withMountPath(KEYSTORE_DIR) + .withMountPath(SECRETS_ROOT_DIR) .endVolumeMount() .endContainer() .endSpec() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala index 6e0049b813719..461264877edc2 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -42,6 +42,4 @@ private[spark] class MinikubeTestBackend extends IntegrationTestBackend { } override def name(): String = MINIKUBE_TEST_BACKEND - - }