Skip to content

Commit

Permalink
Allow client certificate PEM for resource staging server. (apache-spa…
Browse files Browse the repository at this point in the history
  • Loading branch information
mccheah authored and Puneet Loya committed Mar 8, 2019
1 parent f31ca1b commit 4b06c18
Show file tree
Hide file tree
Showing 17 changed files with 256 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,15 @@ package object config extends Logging {
private[spark] val RESOURCE_STAGING_SERVER_SSL_NAMESPACE = "kubernetes.resourceStagingServer"
private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.serverCertPem")
.doc("Certificate PEM file to use when having the Kubernetes dependency server" +
.doc("Certificate PEM file to use when having the resource staging server" +
" listen on TLS.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.clientCertPem")
.doc("Certificate PEM file to use when the client contacts the resource staging server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyStorePasswordFile")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.kubernetes.v2.ResourceStagingServerSslOptionsProviderImpl
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -267,8 +268,9 @@ private[spark] object Client {
val appName = sparkConf.getOption("spark.app.name")
.getOrElse("spark")
val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf)
val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl(
sparkConf, kubernetesAppId, sparkJars, sparkFiles)
sparkConf, kubernetesAppId, sparkJars, sparkFiles, sslOptionsProvider.getSslOptions)
val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf)
val kubernetesCredentialsMounterProvider =
new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.deploy.kubernetes.submit.v2

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.{SparkConf, SSLOptions}
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
Expand Down Expand Up @@ -46,12 +46,11 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
sparkConf: SparkConf,
kubernetesAppId: String,
sparkJars: Seq[String],
sparkFiles: Seq[String])
sparkFiles: Seq[String],
resourceStagingServerSslOptions: SSLOptions)
extends DriverInitContainerComponentsProvider {

private val maybeResourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI)
private val resourceStagingServerSslOptions = new SecurityManager(sparkConf)
.getSSLOptions(RESOURCE_STAGING_SERVER_SSL_NAMESPACE)
private val jarsDownloadPath = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
private val filesDownloadPath = sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
private val maybeSecretName = maybeResourceStagingServerUri.map { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import io.fabric8.kubernetes.api.model.{Secret, SecretBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.SSLOptions
import org.apache.spark.deploy.kubernetes.constants._

private[spark] trait SubmittedDependencySecretBuilder {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,18 +414,20 @@ private[spark] object KubernetesSparkRestServer {
// If keystore password isn't set but we're using PEM files, generate a password
.orElse(parsedArguments.keyPemFile.map(_ => randomPassword()))
val resolvedKeyStore = parsedArguments.keyStoreFile.map(new File(_)).orElse(
parsedArguments.keyPemFile.map(keyPemFile => {
parsedArguments.certPemFile.map(certPemFile => {
PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile(
new File(keyPemFile),
new File(certPemFile),
"provided-key",
keyStorePassword,
keyPassword,
parsedArguments.keyStoreType)
})
}).getOrElse(throw new SparkException("When providing PEM files to set up TLS for the" +
" submission server, both the key and the certificate must be specified.")))
for {
keyPemFile <- parsedArguments.keyPemFile
certPemFile <- parsedArguments.certPemFile
resolvedKeyStorePassword <- keyStorePassword
resolvedKeyPassword <- keyPassword
} yield {
PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile(
new File(keyPemFile),
new File(certPemFile),
"provided-key",
resolvedKeyStorePassword,
resolvedKeyPassword,
parsedArguments.keyStoreType)
})
new SSLOptions(
enabled = true,
keyStore = resolvedKeyStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ private[spark] object PemsToKeyStoreConverter {
keyPemFile: File,
certPemFile: File,
keyAlias: String,
keyStorePassword: Option[String],
keyPassword: Option[String],
keyStorePassword: String,
keyPassword: String,
keyStoreType: Option[String]): File = {
require(keyPemFile.isFile, s"Key PEM file provided at ${keyPemFile.getAbsolutePath}" +
" does not exist or is not a file.")
Expand All @@ -58,12 +58,12 @@ private[spark] object PemsToKeyStoreConverter {
keyStore.setKeyEntry(
keyAlias,
privateKey,
keyPassword.map(_.toCharArray).orNull,
keyPassword.toCharArray,
certificates)
val keyStoreDir = Utils.createTempDir("temp-keystores")
val keyStoreFile = new File(keyStoreDir, s"keystore-${UUID.randomUUID()}.$resolvedKeyStoreType")
Utils.tryWithResource(new FileOutputStream(keyStoreFile)) { storeStream =>
keyStore.store(storeStream, keyStorePassword.map(_.toCharArray).orNull)
keyStore.store(storeStream, keyStorePassword.toCharArray)
}
keyStoreFile
}
Expand All @@ -81,6 +81,20 @@ private[spark] object PemsToKeyStoreConverter {
trustStore
}

def convertCertPemToTempTrustStoreFile(
certPemFile: File,
trustStorePassword: String,
trustStoreType: Option[String]): File = {
val trustStore = convertCertPemToTrustStore(certPemFile, trustStoreType)
val tempTrustStoreDir = Utils.createTempDir(namePrefix = "temp-trustStore")
val tempTrustStoreFile = new File(tempTrustStoreDir,
s"trustStore.${trustStoreType.getOrElse(KeyStore.getDefaultType)}")
Utils.tryWithResource(new FileOutputStream(tempTrustStoreFile)) {
trustStore.store(_, trustStorePassword.toCharArray)
}
tempTrustStoreFile
}

private def withPemParsedFromFile[T](pemFile: File)(f: (PEMParser => T)): T = {
Utils.tryWithResource(new FileInputStream(pemFile)) { pemStream =>
Utils.tryWithResource(new InputStreamReader(pemStream, Charsets.UTF_8)) { pemReader =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import retrofit2.{Call, Callback, Response}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SSLOptions}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.CompressionUtils
Expand Down Expand Up @@ -95,7 +95,7 @@ private[spark] class KubernetesSparkDependencyDownloadInitContainer(
sparkConf: SparkConf,
retrofitClientFactory: RetrofitClientFactory,
fileFetcher: FileFetcher,
securityManager: SparkSecurityManager) extends Logging {
resourceStagingServerSslOptions: SSLOptions) extends Logging {

private implicit val downloadExecutor = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("download-executor"))
Expand Down Expand Up @@ -177,9 +177,10 @@ private[spark] class KubernetesSparkDependencyDownloadInitContainer(
maybeResourceId.foreach { resourceId =>
require(resourceSecretLocation.isFile, errMessageOnSecretNotAFile)
require(resourceDownloadDir.isDirectory, errMessageOnDownloadDirNotADirectory)
val sslOptions = securityManager.getSSLOptions("kubernetes.resourceStagingServer")
val service = retrofitClientFactory.createRetrofitClient(
resourceStagingServerUri, classOf[ResourceStagingServiceRetrofit], sslOptions)
resourceStagingServerUri,
classOf[ResourceStagingServiceRetrofit],
resourceStagingServerSslOptions)
val resourceSecret = Files.toString(resourceSecretLocation, Charsets.UTF_8)
val downloadResourceCallback = new DownloadTarGzCallback(resourceDownloadDir)
logInfo(downloadStartMessage)
Expand Down Expand Up @@ -219,12 +220,14 @@ object KubernetesSparkDependencyDownloadInitContainer extends Logging {
new SparkConf(true)
}
val securityManager = new SparkSecurityManager(sparkConf)
val resourceStagingServerSslOptions =
new ResourceStagingServerSslOptionsProviderImpl(sparkConf).getSslOptions
val fileFetcher = new FileFetcherImpl(sparkConf, securityManager)
new KubernetesSparkDependencyDownloadInitContainer(
sparkConf,
RetrofitClientFactoryImpl,
fileFetcher,
securityManager).run()
resourceStagingServerSslOptions).run()
logInfo("Finished downloading application dependencies.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package org.apache.spark.deploy.rest.kubernetes.v2

import java.io.File
import java.security.SecureRandom

import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.commons.lang3.RandomStringUtils

import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions}
import org.apache.spark.deploy.kubernetes.config._
Expand All @@ -32,63 +34,98 @@ private[spark] trait ResourceStagingServerSslOptionsProvider {

private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: SparkConf)
extends ResourceStagingServerSslOptionsProvider with Logging {

private val SECURE_RANDOM = new SecureRandom()

def getSslOptions: SSLOptions = {
val baseSslOptions = new SparkSecurityManager(sparkConf)
.getSSLOptions("kubernetes.resourceStagingServer")
.getSSLOptions(RESOURCE_STAGING_SERVER_SSL_NAMESPACE)
val maybeKeyPem = sparkConf.get(RESOURCE_STAGING_SERVER_KEY_PEM)
val maybeCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CERT_PEM)
val maybeServerCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CERT_PEM)
val maybeKeyStorePasswordFile = sparkConf.get(RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE)
val maybeKeyPasswordFile = sparkConf.get(RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE)
val maybeClientCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM)

logSslConfigurations(
baseSslOptions, maybeKeyPem, maybeCertPem, maybeKeyStorePasswordFile, maybeKeyPasswordFile)
baseSslOptions,
maybeKeyPem,
maybeServerCertPem,
maybeKeyStorePasswordFile,
maybeKeyPasswordFile,
maybeClientCertPem)

requireNandDefined(baseSslOptions.keyStore, maybeKeyPem,
"Shouldn't provide both key PEM and keyStore files for TLS.")
requireNandDefined(baseSslOptions.keyStore, maybeCertPem,
requireNandDefined(baseSslOptions.keyStore, maybeServerCertPem,
"Shouldn't provide both certificate PEM and keyStore files for TLS.")
requireNandDefined(baseSslOptions.keyStorePassword, maybeKeyStorePasswordFile,
"Shouldn't provide both the keyStore password value and the keyStore password file.")
requireNandDefined(baseSslOptions.keyPassword, maybeKeyPasswordFile,
"Shouldn't provide both the keyStore key password value and the keyStore key password file.")
requireBothOrNeitherDefined(
maybeKeyPem,
maybeCertPem,
maybeServerCertPem,
"When providing a certificate PEM file, the key PEM file must also be provided.",
"When providing a key PEM file, the certificate PEM file must also be provided.")
requireNandDefined(baseSslOptions.trustStore, maybeClientCertPem,
"Shouldn't provide both the trustStore and a client certificate PEM file.")

val resolvedKeyStorePassword = baseSslOptions.keyStorePassword
.orElse(maybeKeyStorePasswordFile.map { keyStorePasswordFile =>
safeFileToString(keyStorePasswordFile, "KeyStore password file")
})
.orElse(maybeKeyPem.map { _ => randomPassword()})
val resolvedKeyStoreKeyPassword = baseSslOptions.keyPassword
.orElse(maybeKeyPasswordFile.map { keyPasswordFile =>
safeFileToString(keyPasswordFile, "KeyStore key password file")
})
val resolvedKeyStore = baseSslOptions.keyStore
.orElse(maybeKeyPem.map { keyPem =>
.orElse(maybeKeyPem.map { _ => randomPassword()})
val resolvedKeyStore = baseSslOptions.keyStore.orElse {
for {
keyPem <- maybeKeyPem
certPem <- maybeServerCertPem
keyStorePassword <- resolvedKeyStorePassword
keyPassword <- resolvedKeyStoreKeyPassword
} yield {
val keyPemFile = new File(keyPem)
val certPemFile = new File(maybeCertPem.get)
val certPemFile = new File(certPem)
PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile(
keyPemFile,
certPemFile,
"key",
resolvedKeyStorePassword,
resolvedKeyStoreKeyPassword,
keyStorePassword,
keyPassword,
baseSslOptions.keyStoreType)
})
}
}
val resolvedTrustStorePassword = baseSslOptions.trustStorePassword
.orElse(maybeClientCertPem.map( _ => "defaultTrustStorePassword"))
val resolvedTrustStore = baseSslOptions.trustStore.orElse {
for {
clientCertPem <- maybeClientCertPem
trustStorePassword <- resolvedTrustStorePassword
} yield {
val certPemFile = new File(clientCertPem)
PemsToKeyStoreConverter.convertCertPemToTempTrustStoreFile(
certPemFile,
trustStorePassword,
baseSslOptions.trustStoreType)
}
}
baseSslOptions.copy(
keyStore = resolvedKeyStore,
keyStorePassword = resolvedKeyStorePassword,
keyPassword = resolvedKeyStoreKeyPassword)
keyPassword = resolvedKeyStoreKeyPassword,
trustStore = resolvedTrustStore)
}

private def logSslConfigurations(
baseSslOptions: SSLOptions,
maybeKeyPem: Option[String],
maybeCertPem: Option[String],
maybeServerCertPem: Option[String],
maybeKeyStorePasswordFile: Option[String],
maybeKeyPasswordFile: Option[String]) = {
maybeKeyPasswordFile: Option[String],
maybeClientCertPem: Option[String]) = {
logDebug("The following SSL configurations were provided for the resource staging server:")
logDebug(s"KeyStore File: ${baseSslOptions.keyStore.map(_.getAbsolutePath).getOrElse("N/A")}")
logDebug("KeyStore Password: " +
Expand All @@ -99,7 +136,8 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar
logDebug(s"Key Password File: ${maybeKeyPasswordFile.getOrElse("N/A")}")
logDebug(s"KeyStore Type: ${baseSslOptions.keyStoreType.getOrElse("N/A")}")
logDebug(s"Key PEM: ${maybeKeyPem.getOrElse("N/A")}")
logDebug(s"Certificate PEM: ${maybeCertPem.getOrElse("N/A")}")
logDebug(s"Server-side certificate PEM: ${maybeServerCertPem.getOrElse("N/A")}")
logDebug(s"Client-side certificate PEM: ${maybeClientCertPem.getOrElse("N/A")}")
}

private def requireBothOrNeitherDefined(
Expand Down Expand Up @@ -130,4 +168,8 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar
}
Files.toString(file, Charsets.UTF_8)
}

private def randomPassword(): String = {
RandomStringUtils.random(1024, 0, Integer.MAX_VALUE, false, false, null, SECURE_RANDOM)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.bouncycastle.cert.jcajce.{JcaX509CertificateConverter, JcaX509v3Certi
import org.bouncycastle.openssl.jcajce.JcaPEMWriter
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder

import org.apache.spark.deploy.kubernetes.submit.v2.{KeyAndCertPem, KeyStoreAndTrustStore}
import org.apache.spark.util.Utils

private[spark] object SSLUtils {
Expand All @@ -38,7 +39,7 @@ private[spark] object SSLUtils {
ipAddress: String,
keyStorePassword: String,
keyPassword: String,
trustStorePassword: String): (File, File) = {
trustStorePassword: String): KeyStoreAndTrustStore = {
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(512)
val keyPair = keyPairGenerator.generateKeyPair()
Expand All @@ -60,10 +61,10 @@ private[spark] object SSLUtils {
Utils.tryWithResource(new FileOutputStream(trustStoreFile)) {
trustStore.store(_, trustStorePassword.toCharArray)
}
(keyStoreFile, trustStoreFile)
KeyStoreAndTrustStore(keyStoreFile, trustStoreFile)
}

def generateKeyCertPemPair(ipAddress: String): (File, File) = {
def generateKeyCertPemPair(ipAddress: String): KeyAndCertPem = {
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(512)
val keyPair = keyPairGenerator.generateKeyPair()
Expand All @@ -90,7 +91,7 @@ private[spark] object SSLUtils {
}
}
}
(keyPemFile, certPemFile)
KeyAndCertPem(keyPemFile, certPemFile)
}

private def generateCertificate(ipAddress: String, keyPair: KeyPair): X509Certificate = {
Expand Down
Loading

0 comments on commit 4b06c18

Please sign in to comment.