Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Allow client certificate PEM for resource staging server #257

Merged
merged 1 commit into from
May 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,15 @@ package object config extends Logging {
private[spark] val RESOURCE_STAGING_SERVER_SSL_NAMESPACE = "kubernetes.resourceStagingServer"
private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.serverCertPem")
.doc("Certificate PEM file to use when having the Kubernetes dependency server" +
.doc("Certificate PEM file to use when having the resource staging server" +
" listen on TLS.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.clientCertPem")
.doc("Certificate PEM file to use when the client contacts the resource staging server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyStorePasswordFile")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.kubernetes.v2.ResourceStagingServerSslOptionsProviderImpl
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -267,8 +268,9 @@ private[spark] object Client {
val appName = sparkConf.getOption("spark.app.name")
.getOrElse("spark")
val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf)
val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl(
sparkConf, kubernetesAppId, sparkJars, sparkFiles)
sparkConf, kubernetesAppId, sparkJars, sparkFiles, sslOptionsProvider.getSslOptions)
val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf)
val kubernetesCredentialsMounterProvider =
new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.deploy.kubernetes.submit.v2

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.{SparkConf, SSLOptions}
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
Expand Down Expand Up @@ -46,12 +46,11 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
sparkConf: SparkConf,
kubernetesAppId: String,
sparkJars: Seq[String],
sparkFiles: Seq[String])
sparkFiles: Seq[String],
resourceStagingServerSslOptions: SSLOptions)
extends DriverInitContainerComponentsProvider {

private val maybeResourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI)
private val resourceStagingServerSslOptions = new SecurityManager(sparkConf)
.getSSLOptions(RESOURCE_STAGING_SERVER_SSL_NAMESPACE)
private val jarsDownloadPath = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
private val filesDownloadPath = sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
private val maybeSecretName = maybeResourceStagingServerUri.map { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import io.fabric8.kubernetes.api.model.{Secret, SecretBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.SSLOptions
import org.apache.spark.deploy.kubernetes.constants._

private[spark] trait SubmittedDependencySecretBuilder {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,18 +414,20 @@ private[spark] object KubernetesSparkRestServer {
// If keystore password isn't set but we're using PEM files, generate a password
.orElse(parsedArguments.keyPemFile.map(_ => randomPassword()))
val resolvedKeyStore = parsedArguments.keyStoreFile.map(new File(_)).orElse(
parsedArguments.keyPemFile.map(keyPemFile => {
parsedArguments.certPemFile.map(certPemFile => {
PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile(
new File(keyPemFile),
new File(certPemFile),
"provided-key",
keyStorePassword,
keyPassword,
parsedArguments.keyStoreType)
})
}).getOrElse(throw new SparkException("When providing PEM files to set up TLS for the" +
" submission server, both the key and the certificate must be specified.")))
for {
keyPemFile <- parsedArguments.keyPemFile
certPemFile <- parsedArguments.certPemFile
resolvedKeyStorePassword <- keyStorePassword
resolvedKeyPassword <- keyPassword
} yield {
PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile(
new File(keyPemFile),
new File(certPemFile),
"provided-key",
resolvedKeyStorePassword,
resolvedKeyPassword,
parsedArguments.keyStoreType)
})
new SSLOptions(
enabled = true,
keyStore = resolvedKeyStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ private[spark] object PemsToKeyStoreConverter {
keyPemFile: File,
certPemFile: File,
keyAlias: String,
keyStorePassword: Option[String],
keyPassword: Option[String],
keyStorePassword: String,
keyPassword: String,
keyStoreType: Option[String]): File = {
require(keyPemFile.isFile, s"Key PEM file provided at ${keyPemFile.getAbsolutePath}" +
" does not exist or is not a file.")
Expand All @@ -58,12 +58,12 @@ private[spark] object PemsToKeyStoreConverter {
keyStore.setKeyEntry(
keyAlias,
privateKey,
keyPassword.map(_.toCharArray).orNull,
keyPassword.toCharArray,
certificates)
val keyStoreDir = Utils.createTempDir("temp-keystores")
val keyStoreFile = new File(keyStoreDir, s"keystore-${UUID.randomUUID()}.$resolvedKeyStoreType")
Utils.tryWithResource(new FileOutputStream(keyStoreFile)) { storeStream =>
keyStore.store(storeStream, keyStorePassword.map(_.toCharArray).orNull)
keyStore.store(storeStream, keyStorePassword.toCharArray)
}
keyStoreFile
}
Expand All @@ -81,6 +81,20 @@ private[spark] object PemsToKeyStoreConverter {
trustStore
}

def convertCertPemToTempTrustStoreFile(
certPemFile: File,
trustStorePassword: String,
trustStoreType: Option[String]): File = {
val trustStore = convertCertPemToTrustStore(certPemFile, trustStoreType)
val tempTrustStoreDir = Utils.createTempDir(namePrefix = "temp-trustStore")
val tempTrustStoreFile = new File(tempTrustStoreDir,
s"trustStore.${trustStoreType.getOrElse(KeyStore.getDefaultType)}")
Utils.tryWithResource(new FileOutputStream(tempTrustStoreFile)) {
trustStore.store(_, trustStorePassword.toCharArray)
}
tempTrustStoreFile
}

private def withPemParsedFromFile[T](pemFile: File)(f: (PEMParser => T)): T = {
Utils.tryWithResource(new FileInputStream(pemFile)) { pemStream =>
Utils.tryWithResource(new InputStreamReader(pemStream, Charsets.UTF_8)) { pemReader =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import retrofit2.{Call, Callback, Response}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf}
import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SSLOptions}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.CompressionUtils
Expand Down Expand Up @@ -95,7 +95,7 @@ private[spark] class KubernetesSparkDependencyDownloadInitContainer(
sparkConf: SparkConf,
retrofitClientFactory: RetrofitClientFactory,
fileFetcher: FileFetcher,
securityManager: SparkSecurityManager) extends Logging {
resourceStagingServerSslOptions: SSLOptions) extends Logging {

private implicit val downloadExecutor = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("download-executor"))
Expand Down Expand Up @@ -177,9 +177,10 @@ private[spark] class KubernetesSparkDependencyDownloadInitContainer(
maybeResourceId.foreach { resourceId =>
require(resourceSecretLocation.isFile, errMessageOnSecretNotAFile)
require(resourceDownloadDir.isDirectory, errMessageOnDownloadDirNotADirectory)
val sslOptions = securityManager.getSSLOptions("kubernetes.resourceStagingServer")
val service = retrofitClientFactory.createRetrofitClient(
resourceStagingServerUri, classOf[ResourceStagingServiceRetrofit], sslOptions)
resourceStagingServerUri,
classOf[ResourceStagingServiceRetrofit],
resourceStagingServerSslOptions)
val resourceSecret = Files.toString(resourceSecretLocation, Charsets.UTF_8)
val downloadResourceCallback = new DownloadTarGzCallback(resourceDownloadDir)
logInfo(downloadStartMessage)
Expand Down Expand Up @@ -219,12 +220,14 @@ object KubernetesSparkDependencyDownloadInitContainer extends Logging {
new SparkConf(true)
}
val securityManager = new SparkSecurityManager(sparkConf)
val resourceStagingServerSslOptions =
new ResourceStagingServerSslOptionsProviderImpl(sparkConf).getSslOptions
val fileFetcher = new FileFetcherImpl(sparkConf, securityManager)
new KubernetesSparkDependencyDownloadInitContainer(
sparkConf,
RetrofitClientFactoryImpl,
fileFetcher,
securityManager).run()
resourceStagingServerSslOptions).run()
logInfo("Finished downloading application dependencies.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package org.apache.spark.deploy.rest.kubernetes.v2

import java.io.File
import java.security.SecureRandom

import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.commons.lang3.RandomStringUtils

import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions}
import org.apache.spark.deploy.kubernetes.config._
Expand All @@ -32,63 +34,98 @@ private[spark] trait ResourceStagingServerSslOptionsProvider {

private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: SparkConf)
extends ResourceStagingServerSslOptionsProvider with Logging {

private val SECURE_RANDOM = new SecureRandom()

def getSslOptions: SSLOptions = {
val baseSslOptions = new SparkSecurityManager(sparkConf)
.getSSLOptions("kubernetes.resourceStagingServer")
.getSSLOptions(RESOURCE_STAGING_SERVER_SSL_NAMESPACE)
val maybeKeyPem = sparkConf.get(RESOURCE_STAGING_SERVER_KEY_PEM)
val maybeCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CERT_PEM)
val maybeServerCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CERT_PEM)
val maybeKeyStorePasswordFile = sparkConf.get(RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE)
val maybeKeyPasswordFile = sparkConf.get(RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE)
val maybeClientCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM)

logSslConfigurations(
baseSslOptions, maybeKeyPem, maybeCertPem, maybeKeyStorePasswordFile, maybeKeyPasswordFile)
baseSslOptions,
maybeKeyPem,
maybeServerCertPem,
maybeKeyStorePasswordFile,
maybeKeyPasswordFile,
maybeClientCertPem)

requireNandDefined(baseSslOptions.keyStore, maybeKeyPem,
"Shouldn't provide both key PEM and keyStore files for TLS.")
requireNandDefined(baseSslOptions.keyStore, maybeCertPem,
requireNandDefined(baseSslOptions.keyStore, maybeServerCertPem,
"Shouldn't provide both certificate PEM and keyStore files for TLS.")
requireNandDefined(baseSslOptions.keyStorePassword, maybeKeyStorePasswordFile,
"Shouldn't provide both the keyStore password value and the keyStore password file.")
requireNandDefined(baseSslOptions.keyPassword, maybeKeyPasswordFile,
"Shouldn't provide both the keyStore key password value and the keyStore key password file.")
requireBothOrNeitherDefined(
maybeKeyPem,
maybeCertPem,
maybeServerCertPem,
"When providing a certificate PEM file, the key PEM file must also be provided.",
"When providing a key PEM file, the certificate PEM file must also be provided.")
requireNandDefined(baseSslOptions.trustStore, maybeClientCertPem,
"Shouldn't provide both the trustStore and a client certificate PEM file.")

val resolvedKeyStorePassword = baseSslOptions.keyStorePassword
.orElse(maybeKeyStorePasswordFile.map { keyStorePasswordFile =>
safeFileToString(keyStorePasswordFile, "KeyStore password file")
})
.orElse(maybeKeyPem.map { _ => randomPassword()})
val resolvedKeyStoreKeyPassword = baseSslOptions.keyPassword
.orElse(maybeKeyPasswordFile.map { keyPasswordFile =>
safeFileToString(keyPasswordFile, "KeyStore key password file")
})
val resolvedKeyStore = baseSslOptions.keyStore
.orElse(maybeKeyPem.map { keyPem =>
.orElse(maybeKeyPem.map { _ => randomPassword()})
val resolvedKeyStore = baseSslOptions.keyStore.orElse {
for {
keyPem <- maybeKeyPem
certPem <- maybeServerCertPem
keyStorePassword <- resolvedKeyStorePassword
keyPassword <- resolvedKeyStoreKeyPassword
} yield {
val keyPemFile = new File(keyPem)
val certPemFile = new File(maybeCertPem.get)
val certPemFile = new File(certPem)
PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile(
keyPemFile,
certPemFile,
"key",
resolvedKeyStorePassword,
resolvedKeyStoreKeyPassword,
keyStorePassword,
keyPassword,
baseSslOptions.keyStoreType)
})
}
}
val resolvedTrustStorePassword = baseSslOptions.trustStorePassword
.orElse(maybeClientCertPem.map( _ => "defaultTrustStorePassword"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this default password ever actually work? I wonder if it's better to require a password upfront rather than failing later when the default inevitably doesn't match

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will match because the SSL options provider is generating the trustStore with the given password from the certificate file.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it -- didn't realize this same codepath was being used both during creating the temp truststore and reading it on the other side

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to ideas of making the flow clearer here

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine as is -- if I'd read more closely the context around I think I would've seen that

val resolvedTrustStore = baseSslOptions.trustStore.orElse {
for {
clientCertPem <- maybeClientCertPem
trustStorePassword <- resolvedTrustStorePassword
} yield {
val certPemFile = new File(clientCertPem)
PemsToKeyStoreConverter.convertCertPemToTempTrustStoreFile(
certPemFile,
trustStorePassword,
baseSslOptions.trustStoreType)
}
}
baseSslOptions.copy(
keyStore = resolvedKeyStore,
keyStorePassword = resolvedKeyStorePassword,
keyPassword = resolvedKeyStoreKeyPassword)
keyPassword = resolvedKeyStoreKeyPassword,
trustStore = resolvedTrustStore)
}

private def logSslConfigurations(
baseSslOptions: SSLOptions,
maybeKeyPem: Option[String],
maybeCertPem: Option[String],
maybeServerCertPem: Option[String],
maybeKeyStorePasswordFile: Option[String],
maybeKeyPasswordFile: Option[String]) = {
maybeKeyPasswordFile: Option[String],
maybeClientCertPem: Option[String]) = {
logDebug("The following SSL configurations were provided for the resource staging server:")
logDebug(s"KeyStore File: ${baseSslOptions.keyStore.map(_.getAbsolutePath).getOrElse("N/A")}")
logDebug("KeyStore Password: " +
Expand All @@ -99,7 +136,8 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar
logDebug(s"Key Password File: ${maybeKeyPasswordFile.getOrElse("N/A")}")
logDebug(s"KeyStore Type: ${baseSslOptions.keyStoreType.getOrElse("N/A")}")
logDebug(s"Key PEM: ${maybeKeyPem.getOrElse("N/A")}")
logDebug(s"Certificate PEM: ${maybeCertPem.getOrElse("N/A")}")
logDebug(s"Server-side certificate PEM: ${maybeServerCertPem.getOrElse("N/A")}")
logDebug(s"Client-side certificate PEM: ${maybeClientCertPem.getOrElse("N/A")}")
}

private def requireBothOrNeitherDefined(
Expand Down Expand Up @@ -130,4 +168,8 @@ private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: Spar
}
Files.toString(file, Charsets.UTF_8)
}

private def randomPassword(): String = {
RandomStringUtils.random(1024, 0, Integer.MAX_VALUE, false, false, null, SECURE_RANDOM)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.bouncycastle.cert.jcajce.{JcaX509CertificateConverter, JcaX509v3Certi
import org.bouncycastle.openssl.jcajce.JcaPEMWriter
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder

import org.apache.spark.deploy.kubernetes.submit.v2.{KeyAndCertPem, KeyStoreAndTrustStore}
import org.apache.spark.util.Utils

private[spark] object SSLUtils {
Expand All @@ -38,7 +39,7 @@ private[spark] object SSLUtils {
ipAddress: String,
keyStorePassword: String,
keyPassword: String,
trustStorePassword: String): (File, File) = {
trustStorePassword: String): KeyStoreAndTrustStore = {
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(512)
val keyPair = keyPairGenerator.generateKeyPair()
Expand All @@ -60,10 +61,10 @@ private[spark] object SSLUtils {
Utils.tryWithResource(new FileOutputStream(trustStoreFile)) {
trustStore.store(_, trustStorePassword.toCharArray)
}
(keyStoreFile, trustStoreFile)
KeyStoreAndTrustStore(keyStoreFile, trustStoreFile)
}

def generateKeyCertPemPair(ipAddress: String): (File, File) = {
def generateKeyCertPemPair(ipAddress: String): KeyAndCertPem = {
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(512)
val keyPair = keyPairGenerator.generateKeyPair()
Expand All @@ -90,7 +91,7 @@ private[spark] object SSLUtils {
}
}
}
(keyPemFile, certPemFile)
KeyAndCertPem(keyPemFile, certPemFile)
}

private def generateCertificate(ipAddress: String, keyPair: KeyPair): X509Certificate = {
Expand Down
Loading