diff --git a/dev/.rat-excludes b/dev/.rat-excludes index a588ce3fc8c19..61dcbd1353210 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -109,3 +109,4 @@ circle.yml publish.sh structured-streaming/* kafka-source-initial-offset-version-2.1.0.bin +org.apache.spark.deploy.rest.kubernetes.DriverServiceManager diff --git a/docs/running-on-kubernetes-cloud.md b/docs/running-on-kubernetes-cloud.md new file mode 100644 index 0000000000000..244c64d696ab3 --- /dev/null +++ b/docs/running-on-kubernetes-cloud.md @@ -0,0 +1,24 @@ +--- +layout: global +title: Running Spark in the cloud with Kubernetes +--- + +For general information about running Spark on Kubernetes, refer to [running Spark on Kubernetes](running-on-kubernetes.md). + +A Kubernetes cluster may be brought up on different cloud providers or on premise. It is commonly provisioned through [Google Container Engine](https://cloud.google.com/container-engine/), or using [kops](https://github.com/kubernetes/kops) on AWS, or on premise using [kubeadm](https://kubernetes.io/docs/getting-started-guides/kubeadm/). + +## Running on Google Container Engine (GKE) + +* Create a GKE [container cluster](https://cloud.google.com/container-engine/docs/clusters/operations). +* Obtain kubectl and [configure](https://cloud.google.com/container-engine/docs/clusters/operations#configuring_kubectl) it appropriately. +* Find the identity of the master associated with this project. + + > kubectl cluster-info + Kubernetes master is running at https://:443 + +* Run spark-submit with the master option set to `k8s://https://:443`. The instructions for running spark-submit are provided in the [running on kubernetes](running-on-kubernetes.md) tutorial. +* Check that your driver pod, and subsequently your executor pods are launched using `kubectl get pods`. +* Read the stdout and stderr of the driver pod using `kubectl logs `, or stream the logs using `kubectl logs -f `. + +Known issues: +* If you face OAuth token expiry errors when you run spark-submit, it is likely because the token needs to be refreshed. The easiest way to fix this is to run any `kubectl` command, say, `kubectl version` and then retry your submission. diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 27ddc4b04062f..3b6935560a575 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -10,17 +10,30 @@ currently limited and not well-tested. This should not be used in production env * You must have a running Kubernetes cluster with access configured to it using [kubectl](https://kubernetes.io/docs/user-guide/prereqs/). If you do not already have a working Kubernetes cluster, you may setup a test cluster on your local machine using [minikube](https://kubernetes.io/docs/getting-started-guides/minikube/). * You must have appropriate permissions to create and list [pods](https://kubernetes.io/docs/user-guide/pods/), [nodes](https://kubernetes.io/docs/admin/node/) and [services](https://kubernetes.io/docs/user-guide/services/) in your cluster. You can verify that you can list these resources by running `kubectl get nodes`, `kubectl get pods` and `kubectl get svc` which should give you a list of nodes, pods and services (if any) respectively. -* You must have an extracted spark distribution with Kubernetes support, or build one from [source](https://github.com/apache-spark-on-k8s/spark). +* You must [build Spark with Kubernetes support](../resource-managers/kubernetes/README.md#building-spark-with-kubernetes-support) from source. -## Setting Up Docker Images +## Driver & Executor Images Kubernetes requires users to supply images that can be deployed into containers within pods. The images are built to be run in a container runtime environment that Kubernetes supports. Docker is a container runtime environment that is frequently used with Kubernetes, so Spark provides some support for working with Docker to get started quickly. -To use Spark on Kubernetes with Docker, images for the driver and the executors need to built and published to an -accessible Docker registry. Spark distributions include the Docker files for the driver and the executor at -`dockerfiles/driver/Dockerfile` and `docker/executor/Dockerfile`, respectively. Use these Docker files to build the +If you wish to use pre-built docker images, you may use the images published in [kubespark](https://hub.docker.com/u/kubespark/). The images are as follows: + + + + + + + + + + + +
ComponentImage
Spark Driver Imagekubespark/spark-driver:v2.1.0-k8s-support-0.1.0-alpha.1
Spark Executor Imagekubespark/spark-executor:v2.1.0-k8s-support-0.1.0-alpha.1
+ +You may also build these docker images from sources, or customize them as required. Spark distributions include the Docker files for the driver and the executor at +`dockerfiles/driver/Dockerfile` and `dockerfiles/executor/Dockerfile`, respectively. Use these Docker files to build the Docker images, and then tag them with the registry that the images should be sent to. Finally, push the images to the registry. @@ -44,8 +57,8 @@ are set up as described above: --kubernetes-namespace default \ --conf spark.executor.instances=5 \ --conf spark.app.name=spark-pi \ - --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver:latest \ - --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ + --conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:v2.1.0-k8s-support-0.1.0-alpha.1 \ + --conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:v2.1.0-k8s-support-0.1.0-alpha.1 \ examples/jars/spark_examples_2.11-2.2.0.jar The Spark master, specified either via passing the `--master` command line argument to `spark-submit` or by setting @@ -53,8 +66,7 @@ The Spark master, specified either via passing the `--master` command line argum master string with `k8s://` will cause the Spark application to launch on the Kubernetes cluster, with the API server being contacted at `api_server_url`. If no HTTP protocol is specified in the URL, it defaults to `https`. For example, setting the master to `k8s://example.com:443` is equivalent to setting it to `k8s://https://example.com:443`, but to -connect without SSL on a different port, the master would be set to `k8s://http://example.com:8443`. - +connect without TLS on a different port, the master would be set to `k8s://http://example.com:8443`. If you have a Kubernetes cluster setup, one way to discover the apiserver URL is by executing `kubectl cluster-info`. @@ -67,33 +79,17 @@ In the above example, the specific Kubernetes cluster can be used with spark sub Note that applications can currently only be executed in cluster mode, where the driver and its executors are running on the cluster. -### Dependency Management and Docker Containers +### Specifying input files Spark supports specifying JAR paths that are either on the submitting host's disk, or are located on the disk of the driver and executors. Refer to the [application submission](submitting-applications.html#advanced-dependency-management) section for details. Note that files specified with the `local://` scheme should be added to the container image of both the driver and the executors. Files without a scheme or with the scheme `file://` are treated as being on the disk of the submitting machine, and are uploaded to the driver running in Kubernetes before launching the application. - -### Setting Up SSL For Submitting the Driver -When submitting to Kubernetes, a pod is started for the driver, and the pod starts an HTTP server. This HTTP server -receives the driver's configuration, including uploaded driver jars, from the client before starting the application. -Spark supports using SSL to encrypt the traffic in this bootstrapping process. It is recommended to configure this -whenever possible. +### Accessing Kubernetes Clusters -See the [security page](security.html) and [configuration](configuration.html) sections for more information on -configuring SSL; use the prefix `spark.ssl.kubernetes.submit` in configuring the SSL-related fields in the context -of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver -pod in starting the application, set `spark.ssl.kubernetes.submit.trustStore`. - -One note about the keyStore is that it can be specified as either a file on the client machine or a file in the -container image's disk. Thus `spark.ssl.kubernetes.submit.keyStore` can be a URI with a scheme of either `file:` -or `local:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto -the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme -`local:`, the file is assumed to already be on the container's disk at the appropriate path. - -### Kubernetes Clusters and the authenticated proxy endpoint +For details about running on public cloud environments, such as Google Container Engine (GKE), refer to [running Spark in the cloud with Kubernetes](running-on-kubernetes-cloud.md). Spark-submit also supports submission through the [local kubectl proxy](https://kubernetes.io/docs/user-guide/accessing-the-cluster/#using-kubectl-proxy). One can use the @@ -112,16 +108,44 @@ If our local proxy were listening on port 8001, we would have our submission loo --kubernetes-namespace default \ --conf spark.executor.instances=5 \ --conf spark.app.name=spark-pi \ - --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver:latest \ - --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ + --conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:v2.1.0-k8s-support-0.1.0-alpha.1 \ + --conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:v2.1.0-k8s-support-0.1.0-alpha.1 \ examples/jars/spark_examples_2.11-2.2.0.jar Communication between Spark and Kubernetes clusters is performed using the fabric8 kubernetes-client library. The above mechanism using `kubectl proxy` can be used when we have authentication providers that the fabric8 -kubernetes-client library does not support. Authentication using X509 Client Certs and oauth tokens +kubernetes-client library does not support. Authentication using X509 Client Certs and OAuth tokens is currently supported. -### Determining the Driver Base URI +## Advanced + +### Setting Up TLS For Submitting the Driver + +When submitting to Kubernetes, a pod is started for the driver, and the pod starts an HTTP server. This HTTP server +receives the driver's configuration, including uploaded driver jars, from the client before starting the application. +Spark supports using TLS to encrypt the traffic in this bootstrapping process. It is recommended to configure this +whenever possible. + +See the [security page](security.html) and [configuration](configuration.html) sections for more information on +configuring TLS; use the prefix `spark.ssl.kubernetes.driversubmitserver` in configuring the TLS-related fields in the context +of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver +pod in starting the application, set `spark.ssl.kubernetes.driversubmitserver.trustStore`. + +One note about the keyStore is that it can be specified as either a file on the client machine or a file in the +container image's disk. Thus `spark.ssl.kubernetes.driversubmitserver.keyStore` can be a URI with a scheme of either `file:` +or `local:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto +the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme +`local:`, the file is assumed to already be on the container's disk at the appropriate path. + +Finally, the submission server and client can be configured to use PEM files instead of Java keyStores. When using +this mode, set `spark.ssl.kubernetes.driversubmitserver.keyPem` and +`spark.ssl.kubernetes.driversubmitserver.serverCertPem` to configure the key and certificate files on the driver +submission server. These files can be uploaded from the submitter's machine if they have no scheme or a scheme of +`file:`, or they can be located on the container's disk if they have the scheme `local:`. The client's certificate +file should be provided via setting `spark.ssl.kubernetes.driversubmitserver.clientCertPem`, and this file must be +located on the submitting machine's local disk. + +### Submission of Local Files through Ingress/External controller Kubernetes pods run with their own IP address space. If Spark is run in cluster mode, the driver pod may not be accessible to the submitter. However, the submitter needs to send local dependencies from its local disk to the driver @@ -184,34 +208,88 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.submit.caCertFile + spark.kubernetes.authenticate.submission.caCertFile + (none) + + Path to the CA cert file for connecting to the Kubernetes API server over TLS when starting the driver. This file + must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide + a scheme). + + + + spark.kubernetes.authenticate.submission.clientKeyFile + (none) + + Path to the client key file for authenticating against the Kubernetes API server when starting the driver. This file + must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide + a scheme). + + + + spark.kubernetes.authenticate.submission.clientCertFile + (none) + + Path to the client cert file for authenticating against the Kubernetes API server when starting the driver. This + file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not + provide a scheme). + + + + spark.kubernetes.authenticate.submission.oauthToken + (none) + + OAuth token to use when authenticating against the Kubernetes API server when starting the driver. Note + that unlike the other authentication options, this is expected to be the exact string value of the token to use for + the authentication. + + + + spark.kubernetes.authenticate.driver.caCertFile + (none) + + Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting + executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod. + Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + + + + spark.kubernetes.authenticate.driver.clientKeyFile (none) - CA cert file for connecting to Kubernetes over SSL. This file should be located on the submitting machine's disk. + Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting + executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod. + Specify this as a path as opposed to a URI (i.e. do not provide a scheme). If this is specified, it is highly + recommended to set up TLS for the driver submission server, as this value is sensitive information that would be + passed to the driver pod in plaintext otherwise. - spark.kubernetes.submit.clientKeyFile + spark.kubernetes.authenticate.driver.clientCertFile (none) - Client key file for authenticating against the Kubernetes API server. This file should be located on the submitting - machine's disk. + Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when + requesting executors. This file must be located on the submitting machine's disk, and will be uploaded to the + driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). - spark.kubernetes.submit.clientCertFile + spark.kubernetes.authenticate.driver.oauthToken (none) - Client cert file for authenticating against the Kubernetes API server. This file should be located on the submitting - machine's disk. + OAuth token to use when authenticating against the against the Kubernetes API server from the driver pod when + requesting executors. Note that unlike the other authentication options, this must be the exact string value of + the token to use for the authentication. This token value is uploaded to the driver pod. If this is specified, it is + highly recommended to set up TLS for the driver submission server, as this value is sensitive information that would + be passed to the driver pod in plaintext otherwise. - spark.kubernetes.submit.serviceAccountName + spark.kubernetes.authenticate.driver.serviceAccountName default Service account that is used when running the driver pod. The driver pod uses this service account when requesting - executor pods from the API server. + executor pods from the API server. Note that this cannot be specified alongside a CA cert file, client key file, + client cert file, and/or OAuth token. @@ -257,7 +335,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.driverSubmitTimeout + spark.kubernetes.driverSubmissionTimeout 60s Time to wait for the driver pod to start running before aborting its execution. @@ -272,7 +350,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.submit.waitAppCompletion + spark.kubernetes.submission.waitAppCompletion true In cluster mode, whether to wait for the application to finish before exiting the launcher process. When changed to diff --git a/resource-managers/kubernetes/README.md b/resource-managers/kubernetes/README.md index 25b62ba35a193..d70c38fdc64d5 100644 --- a/resource-managers/kubernetes/README.md +++ b/resource-managers/kubernetes/README.md @@ -53,6 +53,14 @@ Afterwards, the integration tests can be executed with Maven or your IDE. Note t `pre-integration-test` phase must be run every time the Spark main code changes. When running tests from the command line, the `pre-integration-test` phase should automatically be invoked if the `integration-test` phase is run. +After the above step, the integration test can be run using the following command: + +```sh +build/mvn integration-test \ + -Pkubernetes -Pkubernetes-integration-tests \ + -pl resource-managers/kubernetes/integration-tests -am +``` + # Preserve the Minikube VM The integration tests make use of [Minikube](https://github.com/kubernetes/minikube), which fires up a virtual machine diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 1c26af6593d37..5549dbe6200a4 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -87,6 +87,10 @@ com.google.guava guava + + org.bouncycastle + bcpkix-jdk15on + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 770821e97d12c..7e700b569a3fb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -32,7 +32,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource} +import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, KubernetesCredentials, RemoteAppResource, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging import org.apache.spark.util.{ShutdownHookManager, Utils} @@ -52,7 +52,7 @@ private[spark] class Client( .getOrElse("spark") private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId" - private val secretDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId" + private val secretDirectory = s"$DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR/$kubernetesAppId" private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) @@ -118,19 +118,23 @@ private[spark] class Client( customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations") + val driverPodKubernetesCredentials = new DriverPodKubernetesCredentialsProvider(sparkConf).get() var k8ConfBuilder = new K8SConfigBuilder() .withApiVersion("v1") .withMasterUrl(master) .withNamespace(namespace) - sparkConf.get(KUBERNETES_CA_CERT_FILE).foreach { + sparkConf.get(KUBERNETES_SUBMIT_CA_CERT_FILE).foreach { f => k8ConfBuilder = k8ConfBuilder.withCaCertFile(f) } - sparkConf.get(KUBERNETES_CLIENT_KEY_FILE).foreach { + sparkConf.get(KUBERNETES_SUBMIT_CLIENT_KEY_FILE).foreach { f => k8ConfBuilder = k8ConfBuilder.withClientKeyFile(f) } - sparkConf.get(KUBERNETES_CLIENT_CERT_FILE).foreach { + sparkConf.get(KUBERNETES_SUBMIT_CLIENT_CERT_FILE).foreach { f => k8ConfBuilder = k8ConfBuilder.withClientCertFile(f) } + sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { token => + k8ConfBuilder = k8ConfBuilder.withOauthToken(token) + } val k8ClientConfig = k8ConfBuilder.build Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient => @@ -157,7 +161,7 @@ private[spark] class Client( driverServiceManager.handleSubmissionError( new SparkException("Submission shutting down early..."))) try { - val sslConfigurationProvider = new SslConfigurationProvider( + val sslConfigurationProvider = new DriverSubmitSslConfigurationProvider( sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner) val submitServerSecret = kubernetesClient.secrets().createNew() .withNewMetadata() @@ -168,11 +172,6 @@ private[spark] class Client( .done() kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) val sslConfiguration = sslConfigurationProvider.getSslConfiguration() - val driverKubernetesSelectors = (Map( - SPARK_DRIVER_LABEL -> kubernetesAppId, - SPARK_APP_ID_LABEL -> kubernetesAppId, - SPARK_APP_NAME_LABEL -> appName) - ++ parsedCustomLabels) val (driverPod, driverService) = launchDriverKubernetesComponents( kubernetesClient, driverServiceManager, @@ -183,7 +182,7 @@ private[spark] class Client( configureOwnerReferences( kubernetesClient, submitServerSecret, - sslConfiguration.sslSecrets, + sslConfiguration.sslSecret, driverPod, driverService) submitApplicationToDriverServer( @@ -192,7 +191,8 @@ private[spark] class Client( sslConfiguration, driverService, submitterLocalFiles, - submitterLocalJars) + submitterLocalJars, + driverPodKubernetesCredentials) // Now that the application has started, persist the components that were created beyond // the shutdown hook. We still want to purge the one-time secrets, so do not unregister // those. @@ -209,7 +209,6 @@ private[spark] class Client( Utils.tryLogNonFatalError { driverServiceManager.stop() } - // Remove the shutdown hooks that would be redundant Utils.tryLogNonFatalError { ShutdownHookManager.removeShutdownHook(resourceCleanShutdownHook) @@ -236,10 +235,11 @@ private[spark] class Client( private def submitApplicationToDriverServer( kubernetesClient: KubernetesClient, driverServiceManager: DriverServiceManager, - sslConfiguration: SslConfiguration, + sslConfiguration: DriverSubmitSslConfiguration, driverService: Service, submitterLocalFiles: Iterable[String], - submitterLocalJars: Iterable[String]): Unit = { + submitterLocalJars: Iterable[String], + driverPodKubernetesCredentials: KubernetesCredentials): Unit = { sparkConf.getOption("spark.app.id").foreach { id => logWarning(s"Warning: Provided app id in spark.app.id as $id will be" + s" overridden as $kubernetesAppId") @@ -251,6 +251,12 @@ private[spark] class Client( sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) + sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ => + sparkConf.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "") + } + sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ => + sparkConf.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "") + } val driverSubmitter = buildDriverSubmissionClient( kubernetesClient, driverServiceManager, @@ -260,7 +266,10 @@ private[spark] class Client( driverSubmitter.ping() logInfo(s"Submitting local resources to driver pod for application " + s"$kubernetesAppId ...") - val submitRequest = buildSubmissionRequest(submitterLocalFiles, submitterLocalJars) + val submitRequest = buildSubmissionRequest( + submitterLocalFiles, + submitterLocalJars, + driverPodKubernetesCredentials) driverSubmitter.submitApplication(submitRequest) logInfo("Successfully submitted local resources and driver configuration to" + " driver pod.") @@ -288,7 +297,7 @@ private[spark] class Client( customLabels: Map[String, String], customAnnotations: Map[String, String], submitServerSecret: Secret, - sslConfiguration: SslConfiguration): (Pod, Service) = { + sslConfiguration: DriverSubmitSslConfiguration): (Pod, Service) = { val driverKubernetesSelectors = (Map( SPARK_DRIVER_LABEL -> kubernetesAppId, SPARK_APP_ID_LABEL -> kubernetesAppId, @@ -339,7 +348,7 @@ private[spark] class Client( private def configureOwnerReferences( kubernetesClient: KubernetesClient, submitServerSecret: Secret, - sslSecrets: Array[Secret], + sslSecret: Option[Secret], driverPod: Pod, driverService: Service): Service = { val driverPodOwnerRef = new OwnerReferenceBuilder() @@ -349,7 +358,7 @@ private[spark] class Client( .withKind(driverPod.getKind) .withController(true) .build() - sslSecrets.foreach(secret => { + sslSecret.foreach(secret => { val updatedSecret = kubernetesClient.secrets().withName(secret.getMetadata.getName).edit() .editMetadata() .addToOwnerReferences(driverPodOwnerRef) @@ -415,10 +424,10 @@ private[spark] class Client( driverKubernetesSelectors: Map[String, String], customAnnotations: Map[String, String], submitServerSecret: Secret, - sslConfiguration: SslConfiguration): Pod = { + sslConfiguration: DriverSubmitSslConfiguration): Pod = { val containerPorts = buildContainerPorts() val probePingHttpGet = new HTTPGetActionBuilder() - .withScheme(if (sslConfiguration.sslOptions.enabled) "HTTPS" else "HTTP") + .withScheme(if (sslConfiguration.enabled) "HTTPS" else "HTTP") .withPath("/v1/submissions/ping") .withNewPort(SUBMISSION_SERVER_PORT_NAME) .build() @@ -442,8 +451,8 @@ private[spark] class Client( .withSecretName(submitServerSecret.getMetadata.getName) .endSecret() .endVolume() - .addToVolumes(sslConfiguration.sslPodVolumes: _*) - .withServiceAccount(serviceAccount) + .addToVolumes(sslConfiguration.sslPodVolume.toSeq: _*) + .withServiceAccount(serviceAccount.getOrElse("default")) .addNewContainer() .withName(DRIVER_CONTAINER_NAME) .withImage(driverDockerImage) @@ -453,7 +462,7 @@ private[spark] class Client( .withMountPath(secretDirectory) .withReadOnly(true) .endVolumeMount() - .addToVolumeMounts(sslConfiguration.sslPodVolumeMounts: _*) + .addToVolumeMounts(sslConfiguration.sslPodVolumeMount.toSeq: _*) .addNewEnv() .withName(ENV_SUBMISSION_SECRET_LOCATION) .withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME") @@ -619,7 +628,8 @@ private[spark] class Client( private def buildSubmissionRequest( submitterLocalFiles: Iterable[String], - submitterLocalJars: Iterable[String]): KubernetesCreateSubmissionRequest = { + submitterLocalJars: Iterable[String], + driverPodKubernetesCredentials: KubernetesCredentials): KubernetesCreateSubmissionRequest = { val mainResourceUri = Utils.resolveURI(mainAppResource) val resolvedAppResource: AppResource = Option(mainResourceUri.getScheme) .getOrElse("file") match { @@ -642,14 +652,15 @@ private[spark] class Client( secret = secretBase64String, sparkProperties = sparkConf.getAll.toMap, uploadedJarsBase64Contents = uploadJarsBase64Contents, - uploadedFilesBase64Contents = uploadFilesBase64Contents) + uploadedFilesBase64Contents = uploadFilesBase64Contents, + driverPodKubernetesCredentials = driverPodKubernetesCredentials) } private def buildDriverSubmissionClient( kubernetesClient: KubernetesClient, driverServiceManager: DriverServiceManager, service: Service, - sslConfiguration: SslConfiguration): KubernetesSparkRestApi = { + sslConfiguration: DriverSubmitSslConfiguration): KubernetesSparkRestApi = { val serviceUris = driverServiceManager.getDriverServiceSubmissionServerUris(service) require(serviceUris.nonEmpty, "No uris found to contact the driver!") HttpClientUtil.createClient[KubernetesSparkRestApi]( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala new file mode 100644 index 0000000000000..cee47aad79393 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import java.io.File + +import com.google.common.io.{BaseEncoding, Files} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.internal.config.OptionalConfigEntry + +private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf) { + + def get(): KubernetesCredentials = { + sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME).foreach { _ => + require(sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).isEmpty, + "Cannot specify both a service account and a driver pod OAuth token.") + require(sparkConf.get(KUBERNETES_DRIVER_CA_CERT_FILE).isEmpty, + "Cannot specify both a service account and a driver pod CA cert file.") + require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_KEY_FILE).isEmpty, + "Cannot specify both a service account and a driver pod client key file.") + require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_CERT_FILE).isEmpty, + "Cannot specify both a service account and a driver pod client cert file.") + } + val oauthToken = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN) + val caCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CA_CERT_FILE, + s"Driver CA cert file provided at %s does not exist or is not a file.") + val clientKeyDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_KEY_FILE, + s"Driver client key file provided at %s does not exist or is not a file.") + val clientCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_CERT_FILE, + s"Driver client cert file provided at %s does not exist or is not a file.") + val serviceAccountName = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + KubernetesCredentials( + oauthToken = oauthToken, + caCertDataBase64 = caCertDataBase64, + clientKeyDataBase64 = clientKeyDataBase64, + clientCertDataBase64 = clientCertDataBase64) + } + + private def safeFileConfToBase64( + conf: OptionalConfigEntry[String], + fileNotFoundFormatString: String): Option[String] = { + sparkConf.get(conf) + .map(new File(_)) + .map { file => + require(file.isFile, String.format(fileNotFoundFormatString, file.getAbsolutePath)) + BaseEncoding.base64().encode(Files.toByteArray(file)) + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverSubmitSslConfigurationProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverSubmitSslConfigurationProvider.scala new file mode 100644 index 0000000000000..a83c9a9896a08 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverSubmitSslConfigurationProvider.scala @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import java.io.{File, FileInputStream} +import java.security.{KeyStore, SecureRandom} +import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager} + +import com.google.common.base.Charsets +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{EnvVar, EnvVarBuilder, Secret, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.rest.kubernetes.{KubernetesFileUtils, PemsToKeyStoreConverter} +import org.apache.spark.util.Utils + +/** + * Raw SSL configuration as the user specified in SparkConf for setting up the driver + * submission server. + */ +private case class DriverSubmitSslConfigurationParameters( + storeBasedSslOptions: SSLOptions, + isKeyStoreLocalFile: Boolean, + driverSubmitServerKeyPem: Option[File], + isDriverSubmitKeyPemLocalFile: Boolean, + driverSubmitServerCertPem: Option[File], + isDriverSubmitServerCertPemLocalFile: Boolean, + submissionClientCertPem: Option[File]) + +/** + * Resolved from translating options provided in + * {@link DriverSubmitSslConfigurationParameters} into Kubernetes volumes, environment variables + * for the driver pod, Kubernetes secrets, client-side trust managers, and the client-side SSL + * context. This is used for setting up the SSL connection for the submission server where the + * application local dependencies and configuration is provided from. + */ +private[spark] case class DriverSubmitSslConfiguration( + enabled: Boolean, + sslPodEnvVars: Array[EnvVar], + sslPodVolume: Option[Volume], + sslPodVolumeMount: Option[VolumeMount], + sslSecret: Option[Secret], + driverSubmitClientTrustManager: Option[X509TrustManager], + driverSubmitClientSslContext: SSLContext) + +/** + * Provides the SSL configuration for bootstrapping the driver pod to listen for the driver + * submission over SSL, and then supply the client-side configuration for establishing the + * SSL connection. This is done in two phases: first, interpreting the raw configuration + * values from the SparkConf object; then second, converting the configuration parameters + * into the appropriate Kubernetes constructs, namely the volume and volume mount to add to the + * driver pod, and the secret to create at the API server; and finally, constructing the + * client-side trust manager and SSL context for sending the local dependencies. + */ +private[spark] class DriverSubmitSslConfigurationProvider( + sparkConf: SparkConf, + kubernetesAppId: String, + kubernetesClient: KubernetesClient, + kubernetesResourceCleaner: KubernetesResourceCleaner) { + private val SECURE_RANDOM = new SecureRandom() + private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId" + private val sslSecretsDirectory = DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR + + s"/$kubernetesAppId-ssl" + + def getSslConfiguration(): DriverSubmitSslConfiguration = { + val sslConfigurationParameters = parseSslConfigurationParameters() + if (sslConfigurationParameters.storeBasedSslOptions.enabled) { + val storeBasedSslOptions = sslConfigurationParameters.storeBasedSslOptions + val keyStoreSecret = resolveFileToSecretMapping( + sslConfigurationParameters.isKeyStoreLocalFile, + SUBMISSION_SSL_KEYSTORE_SECRET_NAME, + storeBasedSslOptions.keyStore, + "KeyStore") + val keyStorePathEnv = resolveFilePathEnv( + sslConfigurationParameters.isKeyStoreLocalFile, + ENV_SUBMISSION_KEYSTORE_FILE, + SUBMISSION_SSL_KEYSTORE_SECRET_NAME, + storeBasedSslOptions.keyStore) + val storePasswordSecret = storeBasedSslOptions.keyStorePassword.map(password => { + val passwordBase64 = BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8)) + (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME, passwordBase64) + }).toMap + val storePasswordLocationEnv = storeBasedSslOptions.keyStorePassword.map(_ => { + new EnvVarBuilder() + .withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE) + .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME") + .build() + }) + val storeKeyPasswordSecret = storeBasedSslOptions.keyPassword.map(password => { + val passwordBase64 = BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8)) + (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME, passwordBase64) + }).toMap + val storeKeyPasswordEnv = storeBasedSslOptions.keyPassword.map(_ => { + new EnvVarBuilder() + .withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE) + .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME") + .build() + }) + val storeTypeEnv = storeBasedSslOptions.keyStoreType.map(storeType => { + new EnvVarBuilder() + .withName(ENV_SUBMISSION_KEYSTORE_TYPE) + .withValue(storeType) + .build() + }) + val keyPemSecret = resolveFileToSecretMapping( + sslConfigurationParameters.isDriverSubmitKeyPemLocalFile, + secretName = SUBMISSION_SSL_KEY_PEM_SECRET_NAME, + secretType = "Key pem", + secretFile = sslConfigurationParameters.driverSubmitServerKeyPem) + val keyPemLocationEnv = resolveFilePathEnv( + sslConfigurationParameters.isDriverSubmitKeyPemLocalFile, + envName = ENV_SUBMISSION_KEY_PEM_FILE, + secretName = SUBMISSION_SSL_KEY_PEM_SECRET_NAME, + maybeFile = sslConfigurationParameters.driverSubmitServerKeyPem) + val certPemSecret = resolveFileToSecretMapping( + sslConfigurationParameters.isDriverSubmitServerCertPemLocalFile, + secretName = SUBMISSION_SSL_CERT_PEM_SECRET_NAME, + secretType = "Cert pem", + secretFile = sslConfigurationParameters.driverSubmitServerCertPem) + val certPemLocationEnv = resolveFilePathEnv( + sslConfigurationParameters.isDriverSubmitServerCertPemLocalFile, + envName = ENV_SUBMISSION_CERT_PEM_FILE, + secretName = SUBMISSION_SSL_CERT_PEM_SECRET_NAME, + maybeFile = sslConfigurationParameters.driverSubmitServerCertPem) + val useSslEnv = new EnvVarBuilder() + .withName(ENV_SUBMISSION_USE_SSL) + .withValue("true") + .build() + val sslVolume = new VolumeBuilder() + .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME) + .withNewSecret() + .withSecretName(sslSecretsName) + .endSecret() + .build() + val sslVolumeMount = new VolumeMountBuilder() + .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME) + .withReadOnly(true) + .withMountPath(sslSecretsDirectory) + .build() + val allSecrets = keyStoreSecret ++ + storePasswordSecret ++ + storeKeyPasswordSecret ++ + keyPemSecret ++ + certPemSecret + val sslSecret = kubernetesClient.secrets().createNew() + .withNewMetadata() + .withName(sslSecretsName) + .endMetadata() + .withData(allSecrets.asJava) + .withType("Opaque") + .done() + kubernetesResourceCleaner.registerOrUpdateResource(sslSecret) + val allSslEnvs = keyStorePathEnv ++ + storePasswordLocationEnv ++ + storeKeyPasswordEnv ++ + storeTypeEnv ++ + keyPemLocationEnv ++ + Array(useSslEnv) ++ + certPemLocationEnv + val (driverSubmitClientTrustManager, driverSubmitClientSslContext) = + buildSslConnectionConfiguration(sslConfigurationParameters) + DriverSubmitSslConfiguration( + true, + allSslEnvs.toArray, + Some(sslVolume), + Some(sslVolumeMount), + Some(sslSecret), + driverSubmitClientTrustManager, + driverSubmitClientSslContext) + } else { + DriverSubmitSslConfiguration( + false, + Array[EnvVar](), + None, + None, + None, + None, + SSLContext.getDefault) + } + } + + private def resolveFilePathEnv( + isLocal: Boolean, + envName: String, + secretName: String, + maybeFile: Option[File]): Option[EnvVar] = { + maybeFile.map(file => { + val pemPath = if (isLocal) { + s"$sslSecretsDirectory/$secretName" + } else { + file.getAbsolutePath + } + new EnvVarBuilder() + .withName(envName) + .withValue(pemPath) + .build() + }) + } + + private def resolveFileToSecretMapping( + isLocal: Boolean, + secretName: String, + secretFile: Option[File], + secretType: String): Map[String, String] = { + secretFile.filter(_ => isLocal).map(file => { + if (!file.isFile) { + throw new SparkException(s"$secretType specified at ${file.getAbsolutePath} is not" + + s" a file or does not exist.") + } + val keyStoreBytes = Files.toByteArray(file) + (secretName, BaseEncoding.base64().encode(keyStoreBytes)) + }).toMap + } + + private def parseSslConfigurationParameters(): DriverSubmitSslConfigurationParameters = { + val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE) + val maybeTrustStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE) + val maybeKeyPem = sparkConf.get(DRIVER_SUBMIT_SSL_KEY_PEM) + val maybeDriverSubmitServerCertPem = sparkConf.get(DRIVER_SUBMIT_SSL_SERVER_CERT_PEM) + val maybeDriverSubmitClientCertPem = sparkConf.get(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM) + validatePemsDoNotConflictWithStores( + maybeKeyStore, + maybeTrustStore, + maybeKeyPem, + maybeDriverSubmitServerCertPem, + maybeDriverSubmitClientCertPem) + val resolvedSparkConf = sparkConf.clone() + val (isLocalKeyStore, resolvedKeyStore) = resolveLocalFile(maybeKeyStore, "keyStore") + resolvedKeyStore.foreach { + resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE, _) + } + val (isLocalDriverSubmitServerCertPem, resolvedDriverSubmitServerCertPem) = + resolveLocalFile(maybeDriverSubmitServerCertPem, "server cert PEM") + val (isLocalKeyPem, resolvedKeyPem) = resolveLocalFile(maybeKeyPem, "key PEM") + maybeTrustStore.foreach { trustStore => + require(KubernetesFileUtils.isUriLocalFile(trustStore), s"Invalid trustStore URI" + + s" $trustStore; trustStore URI for submit server must have no scheme, or scheme file://") + resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE, + Utils.resolveURI(trustStore).getPath) + } + val driverSubmitClientCertPem = maybeDriverSubmitClientCertPem.map { driverSubmitClientCert => + require(KubernetesFileUtils.isUriLocalFile(driverSubmitClientCert), + "Invalid client certificate PEM URI $driverSubmitClientCert: client certificate URI must" + + " have no scheme, or scheme file://") + Utils.resolveURI(driverSubmitClientCert).getPath + } + val securityManager = new SparkSecurityManager(resolvedSparkConf) + val storeBasedSslOptions = securityManager.getSSLOptions(DRIVER_SUBMIT_SSL_NAMESPACE) + DriverSubmitSslConfigurationParameters( + storeBasedSslOptions, + isLocalKeyStore, + resolvedKeyPem.map(new File(_)), + isLocalKeyPem, + resolvedDriverSubmitServerCertPem.map(new File(_)), + isLocalDriverSubmitServerCertPem, + driverSubmitClientCertPem.map(new File(_))) + } + + private def resolveLocalFile(file: Option[String], + fileType: String): (Boolean, Option[String]) = { + file.map { f => + require(isValidSslFileScheme(f), s"Invalid $fileType URI $f, $fileType URI" + + s" for submit server must have scheme file:// or local:// (no scheme defaults to file://") + val isLocal = KubernetesFileUtils.isUriLocalFile(f) + (isLocal, Option.apply(Utils.resolveURI(f).getPath)) + }.getOrElse(false, None) + } + + private def validatePemsDoNotConflictWithStores( + maybeKeyStore: Option[String], + maybeTrustStore: Option[String], + maybeKeyPem: Option[String], + maybeDriverSubmitServerCertPem: Option[String], + maybeSubmitClientCertPem: Option[String]) = { + maybeKeyPem.orElse(maybeDriverSubmitServerCertPem).foreach { _ => + require(maybeKeyStore.isEmpty, + "Cannot specify server PEM files and key store files; must specify only one or the other.") + } + maybeKeyPem.foreach { _ => + require(maybeDriverSubmitServerCertPem.isDefined, + "When specifying the key PEM file, the server certificate PEM file must also be provided.") + } + maybeDriverSubmitServerCertPem.foreach { _ => + require(maybeKeyPem.isDefined, + "When specifying the server certificate PEM file, the key PEM file must also be provided.") + } + maybeTrustStore.foreach { _ => + require(maybeSubmitClientCertPem.isEmpty, + "Cannot specify client cert file and truststore file; must specify only one or the other.") + } + } + + private def isValidSslFileScheme(rawUri: String): Boolean = { + val resolvedScheme = Option.apply(Utils.resolveURI(rawUri).getScheme).getOrElse("file") + resolvedScheme == "file" || resolvedScheme == "local" + } + + private def buildSslConnectionConfiguration( + sslConfigurationParameters: DriverSubmitSslConfigurationParameters) + : (Option[X509TrustManager], SSLContext) = { + val maybeTrustStore = sslConfigurationParameters.submissionClientCertPem.map { certPem => + PemsToKeyStoreConverter.convertCertPemToTrustStore( + certPem, + sslConfigurationParameters.storeBasedSslOptions.trustStoreType) + }.orElse(sslConfigurationParameters.storeBasedSslOptions.trustStore.map { trustStoreFile => + if (!trustStoreFile.isFile) { + throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" + + s" does not exist or is not a file.") + } + val trustStore = KeyStore.getInstance( + sslConfigurationParameters + .storeBasedSslOptions + .trustStoreType + .getOrElse(KeyStore.getDefaultType)) + Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream => + val trustStorePassword = sslConfigurationParameters + .storeBasedSslOptions + .trustStorePassword + .map(_.toCharArray) + .orNull + trustStore.load(trustStoreStream, trustStorePassword) + } + trustStore + }) + maybeTrustStore.map { trustStore => + val trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm) + trustManagerFactory.init(trustStore) + val trustManagers = trustManagerFactory.getTrustManagers + val sslContext = SSLContext.getInstance("TLSv1.2") + sslContext.init(null, trustManagers, SECURE_RANDOM) + (Option.apply(trustManagers(0).asInstanceOf[X509TrustManager]), sslContext) + }.getOrElse((Option.empty[X509TrustManager], SSLContext.getDefault)) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala index 89369b30694ee..554ed17ff25c4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala @@ -22,33 +22,62 @@ import com.google.common.base.Charsets import com.google.common.io.Files import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -private[spark] object KubernetesClientBuilder { - private val API_SERVER_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) - private val CA_CERT_FILE = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) +private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: String) { + private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) + private val SERVICE_ACCOUNT_CA_CERT = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) + private val oauthTokenFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN) + private val caCertFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE) + private val clientKeyFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE) + private val clientCertFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE) /** - * Creates a {@link KubernetesClient}, expecting to be from - * within the context of a pod. When doing so, credentials files - * are picked up from canonical locations, as they are injected - * into the pod's disk space. + * Creates a {@link KubernetesClient}, expecting to be from within the context of a pod. When + * doing so, service account token files can be picked up from canonical locations. */ - def buildFromWithinPod( - kubernetesNamespace: String): DefaultKubernetesClient = { - var clientConfigBuilder = new ConfigBuilder() + def buildFromWithinPod(): DefaultKubernetesClient = { + val baseClientConfigBuilder = new ConfigBuilder() .withApiVersion("v1") .withMasterUrl(KUBERNETES_MASTER_INTERNAL_URL) - .withNamespace(kubernetesNamespace) + .withNamespace(namespace) - if (CA_CERT_FILE.isFile) { - clientConfigBuilder = clientConfigBuilder.withCaCertFile(CA_CERT_FILE.getAbsolutePath) - } + val configBuilder = oauthTokenFile + .orElse(caCertFile) + .orElse(clientKeyFile) + .orElse(clientCertFile) + .map { _ => + var mountedAuthConfigBuilder = baseClientConfigBuilder + oauthTokenFile.foreach { tokenFilePath => + val tokenFile = new File(tokenFilePath) + mountedAuthConfigBuilder = mountedAuthConfigBuilder + .withOauthToken(Files.toString(tokenFile, Charsets.UTF_8)) + } + caCertFile.foreach { caFile => + mountedAuthConfigBuilder = mountedAuthConfigBuilder.withCaCertFile(caFile) + } + clientKeyFile.foreach { keyFile => + mountedAuthConfigBuilder = mountedAuthConfigBuilder.withClientKeyFile(keyFile) + } + clientCertFile.foreach { certFile => + mountedAuthConfigBuilder = mountedAuthConfigBuilder.withClientCertFile(certFile) + } + mountedAuthConfigBuilder + }.getOrElse { + var serviceAccountConfigBuilder = baseClientConfigBuilder + if (SERVICE_ACCOUNT_CA_CERT.isFile) { + serviceAccountConfigBuilder = serviceAccountConfigBuilder.withCaCertFile( + SERVICE_ACCOUNT_CA_CERT.getAbsolutePath) + } - if (API_SERVER_TOKEN.isFile) { - clientConfigBuilder = clientConfigBuilder.withOauthToken( - Files.toString(API_SERVER_TOKEN, Charsets.UTF_8)) + if (SERVICE_ACCOUNT_TOKEN.isFile) { + serviceAccountConfigBuilder = serviceAccountConfigBuilder.withOauthToken( + Files.toString(SERVICE_ACCOUNT_TOKEN, Charsets.UTF_8)) + } + serviceAccountConfigBuilder } - new DefaultKubernetesClient(clientConfigBuilder.build) + new DefaultKubernetesClient(configBuilder.build) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala deleted file mode 100644 index 4c031fcba91ab..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.kubernetes - -import java.io.FileInputStream -import java.security.{KeyStore, SecureRandom} -import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager} - -import com.google.common.base.Charsets -import com.google.common.io.{BaseEncoding, Files} -import io.fabric8.kubernetes.api.model.{EnvVar, EnvVarBuilder, Secret, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} -import io.fabric8.kubernetes.client.KubernetesClient -import scala.collection.JavaConverters._ -import scala.collection.mutable - -import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions} -import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.util.Utils - -private[spark] case class SslConfiguration( - sslOptions: SSLOptions, - isKeyStoreLocalFile: Boolean, - sslPodEnvVars: Array[EnvVar], - sslPodVolumes: Array[Volume], - sslPodVolumeMounts: Array[VolumeMount], - sslSecrets: Array[Secret], - driverSubmitClientTrustManager: Option[X509TrustManager], - driverSubmitClientSslContext: SSLContext) - -private[spark] class SslConfigurationProvider( - sparkConf: SparkConf, - kubernetesAppId: String, - kubernetesClient: KubernetesClient, - kubernetesResourceCleaner: KubernetesResourceCleaner) { - private val SECURE_RANDOM = new SecureRandom() - private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId" - private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl" - - def getSslConfiguration(): SslConfiguration = { - val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() - if (driverSubmitSslOptions.enabled) { - val sslSecretsMap = mutable.HashMap[String, String]() - val sslEnvs = mutable.Buffer[EnvVar]() - val secrets = mutable.Buffer[Secret]() - driverSubmitSslOptions.keyStore.foreach(store => { - val resolvedKeyStoreFile = if (isKeyStoreLocalFile) { - if (!store.isFile) { - throw new SparkException(s"KeyStore specified at $store is not a file or" + - s" does not exist.") - } - val keyStoreBytes = Files.toByteArray(store) - val keyStoreBase64 = BaseEncoding.base64().encode(keyStoreBytes) - sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64) - s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_SECRET_NAME" - } else { - store.getAbsolutePath - } - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_KEYSTORE_FILE) - .withValue(resolvedKeyStoreFile) - .build() - }) - driverSubmitSslOptions.keyStorePassword.foreach(password => { - val passwordBase64 = BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8)) - sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64) - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE) - .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME") - .build() - }) - driverSubmitSslOptions.keyPassword.foreach(password => { - val passwordBase64 = BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8)) - sslSecretsMap += (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64) - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE) - .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME") - .build() - }) - driverSubmitSslOptions.keyStoreType.foreach(storeType => { - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_KEYSTORE_TYPE) - .withValue(storeType) - .build() - }) - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_USE_SSL) - .withValue("true") - .build() - val sslVolume = new VolumeBuilder() - .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME) - .withNewSecret() - .withSecretName(sslSecretsName) - .endSecret() - .build() - val sslVolumeMount = new VolumeMountBuilder() - .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME) - .withReadOnly(true) - .withMountPath(sslSecretsDirectory) - .build() - val sslSecrets = kubernetesClient.secrets().createNew() - .withNewMetadata() - .withName(sslSecretsName) - .endMetadata() - .withData(sslSecretsMap.asJava) - .withType("Opaque") - .done() - kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets) - secrets += sslSecrets - val (driverSubmitClientTrustManager, driverSubmitClientSslContext) = - buildSslConnectionConfiguration(driverSubmitSslOptions) - SslConfiguration( - driverSubmitSslOptions, - isKeyStoreLocalFile, - sslEnvs.toArray, - Array(sslVolume), - Array(sslVolumeMount), - secrets.toArray, - driverSubmitClientTrustManager, - driverSubmitClientSslContext) - } else { - SslConfiguration( - driverSubmitSslOptions, - isKeyStoreLocalFile, - Array[EnvVar](), - Array[Volume](), - Array[VolumeMount](), - Array[Secret](), - None, - SSLContext.getDefault) - } - } - - private def parseDriverSubmitSslOptions(): (SSLOptions, Boolean) = { - val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE) - val resolvedSparkConf = sparkConf.clone() - val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => { - val keyStoreURI = Utils.resolveURI(keyStore) - val isProvidedKeyStoreLocal = keyStoreURI.getScheme match { - case "file" | null => true - case "local" => false - case _ => throw new SparkException(s"Invalid KeyStore URI $keyStore; keyStore URI" + - " for submit server must have scheme file:// or local:// (no scheme defaults" + - " to file://)") - } - (isProvidedKeyStoreLocal, Option.apply(keyStoreURI.getPath)) - }).getOrElse((false, Option.empty[String])) - resolvedKeyStore.foreach { - resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, _) - } - sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE).foreach { trustStore => - val trustStoreURI = Utils.resolveURI(trustStore) - trustStoreURI.getScheme match { - case "file" | null => - resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, trustStoreURI.getPath) - case _ => throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" + - " for submit server must have no scheme, or scheme file://") - } - } - val securityManager = new SparkSecurityManager(resolvedSparkConf) - (securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore) - } - - private def buildSslConnectionConfiguration(driverSubmitSslOptions: SSLOptions): - (Option[X509TrustManager], SSLContext) = { - driverSubmitSslOptions.trustStore.map(trustStoreFile => { - val trustManagerFactory = TrustManagerFactory.getInstance( - TrustManagerFactory.getDefaultAlgorithm) - val trustStore = KeyStore.getInstance( - driverSubmitSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType)) - if (!trustStoreFile.isFile) { - throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" + - s" does not exist or is not a file.") - } - Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream => - driverSubmitSslOptions.trustStorePassword match { - case Some(password) => - trustStore.load(trustStoreStream, password.toCharArray) - case None => trustStore.load(trustStoreStream, null) - } - } - trustManagerFactory.init(trustStore) - val trustManagers = trustManagerFactory.getTrustManagers - val sslContext = SSLContext.getInstance("TLSv1.2") - sslContext.init(null, trustManagers, SECURE_RANDOM) - (Option.apply(trustManagers(0).asInstanceOf[X509TrustManager]), sslContext) - }).getOrElse((Option.empty[X509TrustManager], SSLContext.getDefault)) - } -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index dc61ad4025f0f..3328809e186e4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -27,221 +27,265 @@ package object config { private[spark] val KUBERNETES_NAMESPACE = ConfigBuilder("spark.kubernetes.namespace") - .doc(""" - | The namespace that will be used for running the driver and - | executor pods. When using spark-submit in cluster mode, - | this can also be passed to spark-submit via the - | --kubernetes-namespace command line argument. - """.stripMargin) + .doc("The namespace that will be used for running the driver and executor pods. When using" + + " spark-submit in cluster mode, this can also be passed to spark-submit via the" + + " --kubernetes-namespace command line argument.") .stringConf .createWithDefault("default") private[spark] val DRIVER_DOCKER_IMAGE = ConfigBuilder("spark.kubernetes.driver.docker.image") - .doc(""" - | Docker image to use for the driver. Specify this using the - | standard Docker tag format. - """.stripMargin) + .doc("Docker image to use for the driver. Specify this using the standard Docker tag format.") .stringConf .createWithDefault(s"spark-driver:$sparkVersion") private[spark] val EXECUTOR_DOCKER_IMAGE = ConfigBuilder("spark.kubernetes.executor.docker.image") - .doc(""" - | Docker image to use for the executors. Specify this using - | the standard Docker tag format. - """.stripMargin) + .doc("Docker image to use for the executors. Specify this using the standard Docker tag" + + " format.") .stringConf .createWithDefault(s"spark-executor:$sparkVersion") - private[spark] val KUBERNETES_CA_CERT_FILE = - ConfigBuilder("spark.kubernetes.submit.caCertFile") - .doc(""" - | CA cert file for connecting to Kubernetes over SSL. This - | file should be located on the submitting machine's disk. - """.stripMargin) + private val APISERVER_SUBMIT_CONF_PREFIX = "spark.kubernetes.authenticate.submission" + private val APISERVER_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver" + + private[spark] val KUBERNETES_SUBMIT_CA_CERT_FILE = + ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.caCertFile") + .doc("Path to the CA cert file for connecting to Kubernetes over SSL when creating" + + " Kubernetes resources for the driver. This file should be located on the submitting" + + " machine's disk.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_SUBMIT_CLIENT_KEY_FILE = + ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.clientKeyFile") + .doc("Path to the client key file for authenticating against the Kubernetes API server" + + " when initially creating Kubernetes resources for the driver. This file should be" + + " located on the submitting machine's disk.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_SUBMIT_CLIENT_CERT_FILE = + ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.clientCertFile") + .doc("Path to the client cert file for authenticating against the Kubernetes API server" + + " when initially creating Kubernetes resources for the driver. This file should be" + + " located on the submitting machine's disk.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_SUBMIT_OAUTH_TOKEN = + ConfigBuilder(s"$APISERVER_SUBMIT_CONF_PREFIX.oauthToken") + .doc("OAuth token to use when authenticating against the against the Kubernetes API server" + + " when initially creating Kubernetes resources for the driver. Note that unlike the other" + + " authentication options, this should be the exact string value of the token to use for" + + " the authentication.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_CA_CERT_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.caCertFile") + .doc("Path to the CA cert file for connecting to Kubernetes over TLS from the driver pod" + + " when requesting executors. This file should be located on the submitting machine's disk" + + " and will be uploaded to the driver pod.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_CLIENT_KEY_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.clientKeyFile") + .doc("Path to the client key file for authenticating against the Kubernetes API server from" + + " the driver pod when requesting executors. This file should be located on the submitting" + + " machine's disk, and will be uploaded to the driver pod.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_CLIENT_CERT_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.clientCertFile") + .doc("Path to the client cert file for authenticating against the Kubernetes API server" + + " from the driver pod when requesting executors. This file should be located on the" + + " submitting machine's disk, and will be uploaded to the driver pod.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_OAUTH_TOKEN = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.oauthToken") + .doc("OAuth token to use when authenticating against the Kubernetes API server from the" + + " driver pod when requesting executors. Note that unlike the other authentication options" + + " this should be the exact string value of the token to use for the authentication. This" + + " token value is mounted as a secret on the driver pod.") + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.caCertFile") + .doc("Path on the driver pod's disk containing the CA cert file to use when authenticating" + + " against Kubernetes.") .stringConf .createOptional - private[spark] val KUBERNETES_CLIENT_KEY_FILE = - ConfigBuilder("spark.kubernetes.submit.clientKeyFile") - .doc(""" - | Client key file for authenticating against the Kubernetes - | API server. This file should be located on the submitting - | machine's disk. - """.stripMargin) + private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientKeyFile") + .doc("Path on the driver pod's disk containing the client key file to use when" + + " authenticating against Kubernetes.") + .internal() .stringConf .createOptional - private[spark] val KUBERNETES_CLIENT_CERT_FILE = - ConfigBuilder("spark.kubernetes.submit.clientCertFile") - .doc(""" - | Client cert file for authenticating against the - | Kubernetes API server. This file should be located on - | the submitting machine's disk. - """.stripMargin) + private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientCertFile") + .doc("Path on the driver pod's disk containing the client cert file to use when" + + " authenticating against Kubernetes.") + .internal() + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN = + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.oauthTokenFile") + .doc("Path on the driver pod's disk containing the OAuth token file to use when" + + " authenticating against Kubernetes.") + .internal() .stringConf .createOptional private[spark] val KUBERNETES_SERVICE_ACCOUNT_NAME = - ConfigBuilder("spark.kubernetes.submit.serviceAccountName") - .doc(""" - | Service account that is used when running the driver pod. - | The driver pod uses this service account when requesting - | executor pods from the API server. - """.stripMargin) + ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.serviceAccountName") + .doc("Service account that is used when running the driver pod. The driver pod uses" + + " this service account when requesting executor pods from the API server. If specific" + + " credentials are given for the driver pod to use, the driver will favor" + + " using those credentials instead.") .stringConf - .createWithDefault("default") + .createOptional // Note that while we set a default for this when we start up the // scheduler, the specific default value is dynamically determined // based on the executor memory. private[spark] val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.kubernetes.executor.memoryOverhead") - .doc(""" - | The amount of off-heap memory (in megabytes) to be - | allocated per executor. This is memory that accounts for - | things like VM overheads, interned strings, other native - | overheads, etc. This tends to grow with the executor size - | (typically 6-10%). - """.stripMargin) + .doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This" + + " is memory that accounts for things like VM overheads, interned strings, other native" + + " overheads, etc. This tends to grow with the executor size. (typically 6-10%).") .bytesConf(ByteUnit.MiB) .createOptional private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.kubernetes.driver.memoryOverhead") - .doc(""" - | The amount of off-heap memory (in megabytes) to be - | allocated for the driver and the driver submission server. - | This is memory that accounts for things like VM overheads, - | interned strings, other native overheads, etc. This tends - | to grow with the driver's memory size (typically 6-10%). - """.stripMargin) + .doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the" + + " driver submission server. This is memory that accounts for things like VM overheads," + + " interned strings, other native overheads, etc. This tends to grow with the driver's" + + " memory size (typically 6-10%).") .bytesConf(ByteUnit.MiB) .createOptional private[spark] val KUBERNETES_DRIVER_LABELS = ConfigBuilder("spark.kubernetes.driver.labels") - .doc(""" - | Custom labels that will be added to the driver pod. - | This should be a comma-separated list of label key-value - | pairs, where each label is in the format key=value. Note - | that Spark also adds its own labels to the driver pod - | for bookkeeping purposes. - """.stripMargin) + .doc("Custom labels that will be added to the driver pod. This should be a comma-separated" + + " list of label key-value pairs, where each label is in the format key=value. Note that" + + " Spark also adds its own labels to the driver pod for bookkeeping purposes.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_ANNOTATIONS = ConfigBuilder("spark.kubernetes.driver.annotations") - .doc(""" - | Custom annotations that will be added to the driver pod. - | This should be a comma-separated list of annotation key-value - | pairs, where each annotation is in the format key=value. - """.stripMargin) + .doc("Custom annotations that will be added to the driver pod. This should be a" + + " comma-separated list of annotation key-value pairs, where each annotation is in the" + + " format key=value.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT = - ConfigBuilder("spark.kubernetes.driverSubmitTimeout") - .doc(""" - | Time to wait for the driver process to start running - | before aborting its execution. - """.stripMargin) + ConfigBuilder("spark.kubernetes.driverSubmissionTimeout") + .doc("Time to wait for the driver process to start running before aborting its execution.") .timeConf(TimeUnit.SECONDS) .createWithDefault(60L) - private[spark] val KUBERNETES_DRIVER_SUBMIT_KEYSTORE = - ConfigBuilder("spark.ssl.kubernetes.submit.keyStore") - .doc(""" - | KeyStore file for the driver submission server listening - | on SSL. Can be pre-mounted on the driver container - | or uploaded from the submitting client. - """.stripMargin) + private[spark] val KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE = + ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.keyStore") + .doc("KeyStore file for the driver submission server listening on SSL. Can be pre-mounted" + + " on the driver container or uploaded from the submitting client.") .stringConf .createOptional - private[spark] val KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE = - ConfigBuilder("spark.ssl.kubernetes.submit.trustStore") - .doc(""" - | TrustStore containing certificates for communicating - | to the driver submission server over SSL. - """.stripMargin) + private[spark] val KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE = + ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.trustStore") + .doc("TrustStore containing certificates for communicating to the driver submission server" + + " over SSL.") .stringConf .createOptional private[spark] val DRIVER_SUBMIT_SSL_ENABLED = - ConfigBuilder("spark.ssl.kubernetes.submit.enabled") - .doc(""" - | Whether or not to use SSL when sending the - | application dependencies to the driver pod. - | - """.stripMargin) + ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.enabled") + .doc("Whether or not to use SSL when sending the application dependencies to the driver pod.") .booleanConf .createWithDefault(false) + private[spark] val DRIVER_SUBMIT_SSL_KEY_PEM = + ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.keyPem") + .doc("Key PEM file that the driver submission server will use when setting up TLS" + + " connections. Can be pre-mounted on the driver pod's disk or uploaded from the" + + " submitting client's machine.") + .stringConf + .createOptional + + private[spark] val DRIVER_SUBMIT_SSL_SERVER_CERT_PEM = + ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.serverCertPem") + .doc("Certificate PEM file that is associated with the key PEM file" + + " the submission server uses to set up TLS connections. Can be pre-mounted" + + " on the driver pod's disk or uploaded from the submitting client's machine.") + .stringConf + .createOptional + + private[spark] val DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM = + ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.clientCertPem") + .doc("Certificate pem file that the submission client uses to connect to the submission" + + " server over TLS. This should often be the same as the server certificate, but can be" + + " different if the submission client will contact the driver through a proxy instead of" + + " the driver service directly.") + .stringConf + .createOptional + private[spark] val KUBERNETES_DRIVER_SERVICE_NAME = ConfigBuilder("spark.kubernetes.driver.service.name") - .doc(""" - | Kubernetes service that exposes the driver pod - | for external access. - """.stripMargin) + .doc("Kubernetes service that exposes the driver pod for external access.") .internal() .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY = ConfigBuilder("spark.kubernetes.driver.submissionServerMemory") - .doc(""" - | The amount of memory to allocate for the driver submission server. - """.stripMargin) + .doc("The amount of memory to allocate for the driver submission server.") .bytesConf(ByteUnit.MiB) .createWithDefaultString("256m") private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT = ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort") - .doc(""" - | Whether to expose the driver Web UI port as a service NodePort. Turned off by default - | because NodePort is a limited resource. Use alternatives such as Ingress if possible. - """.stripMargin) + .doc("Whether to expose the driver Web UI port as a service NodePort. Turned off by default" + + " because NodePort is a limited resource. Use alternatives if possible.") .booleanConf .createWithDefault(false) private[spark] val KUBERNETES_DRIVER_POD_NAME = ConfigBuilder("spark.kubernetes.driver.pod.name") - .doc(""" - | Name of the driver pod. - """.stripMargin) + .doc("Name of the driver pod.") .internal() .stringConf .createOptional private[spark] val DRIVER_SERVICE_MANAGER_TYPE = ConfigBuilder("spark.kubernetes.driver.serviceManagerType") - .doc(s""" - | A tag indicating which class to use for creating the - | Kubernetes service and determining its URI for the submission - | client. - """.stripMargin) + .doc("A tag indicating which class to use for creating the Kubernetes service and" + + " determining its URI for the submission client.") .stringConf .createWithDefault(NodePortUrisDriverServiceManager.TYPE) private[spark] val WAIT_FOR_APP_COMPLETION = - ConfigBuilder("spark.kubernetes.submit.waitAppCompletion") - .doc( - """ - | In cluster mode, whether to wait for the application to finish before exiting the - | launcher process. - """.stripMargin) + ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") + .doc("In cluster mode, whether to wait for the application to finish before exiting the" + + " launcher process.") .booleanConf .createWithDefault(true) private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.kubernetes.report.interval") - .doc( - """ - | Interval between reports of the current app status in cluster mode. - """.stripMargin) + .doc("Interval between reports of the current app status in cluster mode.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1s") } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 4af065758e674..0e5fada302421 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -24,7 +24,8 @@ package object constants { private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id" // Secrets - private[spark] val DRIVER_CONTAINER_SECRETS_BASE_DIR = "/var/run/secrets/spark-submission" + private[spark] val DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR = + "/var/run/secrets/spark-submission" private[spark] val SUBMISSION_APP_SECRET_NAME = "spark-submission-server-secret" private[spark] val SUBMISSION_APP_SECRET_PREFIX = "spark-submission-server-secret" private[spark] val SUBMISSION_APP_SECRET_VOLUME_NAME = "spark-submission-secret-volume" @@ -35,6 +36,8 @@ package object constants { private[spark] val SUBMISSION_SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore" private[spark] val SUBMISSION_SSL_SECRETS_PREFIX = "spark-submission-server-ssl" private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets" + private[spark] val SUBMISSION_SSL_KEY_PEM_SECRET_NAME = "spark-submission-server-key-pem" + private[spark] val SUBMISSION_SSL_CERT_PEM_SECRET_NAME = "spark-submission-server-cert-pem" // Default and fixed ports private[spark] val SUBMISSION_SERVER_PORT = 7077 @@ -56,6 +59,8 @@ package object constants { private[spark] val ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE = "SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE" private[spark] val ENV_SUBMISSION_KEYSTORE_TYPE = "SPARK_SUBMISSION_KEYSTORE_TYPE" + private[spark] val ENV_SUBMISSION_KEY_PEM_FILE = "SPARK_SUBMISSION_KEY_PEM_FILE" + private[spark] val ENV_SUBMISSION_CERT_PEM_FILE = "SPARK_SUBMISSION_CERT_PEM_FILE" private[spark] val ENV_SUBMISSION_USE_SSL = "SPARK_SUBMISSION_USE_SSL" private[spark] val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT" private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL" @@ -73,7 +78,7 @@ package object constants { // Miscellaneous private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" - private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit" + private[spark] val DRIVER_SUBMIT_SSL_NAMESPACE = "kubernetes.driversubmitserver" private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 private[spark] val MEMORY_OVERHEAD_MIN = 384L diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala index 0d2d1a1c6f5e3..1ea44109c5f5e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala @@ -20,14 +20,21 @@ import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} import org.apache.spark.SPARK_VERSION +case class KubernetesCredentials( + oauthToken: Option[String], + caCertDataBase64: Option[String], + clientKeyDataBase64: Option[String], + clientCertDataBase64: Option[String]) + case class KubernetesCreateSubmissionRequest( - appResource: AppResource, - mainClass: String, - appArgs: Array[String], - sparkProperties: Map[String, String], - secret: String, - uploadedJarsBase64Contents: TarGzippedData, - uploadedFilesBase64Contents: TarGzippedData) extends SubmitRestProtocolRequest { + appResource: AppResource, + mainClass: String, + appArgs: Array[String], + sparkProperties: Map[String, String], + secret: String, + driverPodKubernetesCredentials: KubernetesCredentials, + uploadedJarsBase64Contents: TarGzippedData, + uploadedFilesBase64Contents: TarGzippedData) extends SubmitRestProtocolRequest { message = "create" clientSparkVersion = SPARK_VERSION } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index 5952acc0d5916..4ca01b2f6bd38 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.rest.kubernetes import java.io.{File, FileOutputStream, StringReader} import java.net.URI import java.nio.file.Paths +import java.security.SecureRandom import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger import javax.servlet.http.{HttpServletRequest, HttpServletResponse} @@ -26,12 +27,15 @@ import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import com.google.common.base.Charsets import com.google.common.io.{BaseEncoding, ByteStreams, Files} import org.apache.commons.codec.binary.Base64 +import org.apache.commons.lang3.RandomStringUtils import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SSLOptions} +import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.rest._ +import org.apache.spark.internal.config.OptionalConfigEntry import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} private case class KubernetesSparkRestServerArguments( @@ -42,7 +46,9 @@ private case class KubernetesSparkRestServerArguments( keyStoreFile: Option[String] = None, keyStorePasswordFile: Option[String] = None, keyStoreType: Option[String] = None, - keyPasswordFile: Option[String] = None) { + keyPasswordFile: Option[String] = None, + keyPemFile: Option[String] = None, + certPemFile: Option[String] = None) { def validate(): KubernetesSparkRestServerArguments = { require(host.isDefined, "Hostname not set via --hostname.") require(port.isDefined, "Port not set via --port") @@ -81,6 +87,12 @@ private object KubernetesSparkRestServerArguments { case "--keystore-key-password-file" :: value :: tail => args = tail resolvedArguments.copy(keyPasswordFile = Some(value)) + case "--key-pem-file" :: value :: tail => + args = tail + resolvedArguments.copy(keyPemFile = Some(value)) + case "--cert-pem-file" :: value :: tail => + args = tail + resolvedArguments.copy(certPemFile = Some(value)) // TODO polish usage message case Nil => resolvedArguments case unknown => throw new IllegalStateException(s"Unknown argument(s) found: $unknown") @@ -152,6 +164,7 @@ private[spark] class KubernetesSparkRestServer( appArgs, sparkProperties, secret, + driverPodKubernetesCredentials, uploadedJars, uploadedFiles) => val decodedSecret = Base64.decodeBase64(secret) @@ -214,6 +227,8 @@ private[spark] class KubernetesSparkRestServer( } else { resolvedSparkProperties.remove("spark.files") } + resolvedSparkProperties ++= writeKubernetesCredentials( + driverPodKubernetesCredentials, tempDir) val command = new ArrayBuffer[String] command += javaExecutable @@ -280,6 +295,48 @@ private[spark] class KubernetesSparkRestServer( CompressionUtils.unpackAndWriteCompressedFiles(files, workingDir) } + private def writeKubernetesCredentials( + kubernetesCredentials: KubernetesCredentials, + rootTempDir: File): Map[String, String] = { + val resolvedDirectory = new File(rootTempDir, "kubernetes-credentials") + if (!resolvedDirectory.mkdir()) { + throw new IllegalStateException(s"Failed to create credentials dir at " + + resolvedDirectory.getAbsolutePath) + } + val oauthTokenFile = writeRawStringCredentialAndGetConf("oauth-token.txt", resolvedDirectory, + KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN, kubernetesCredentials.oauthToken) + val caCertFile = writeBase64CredentialAndGetConf("ca.crt", resolvedDirectory, + KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE, kubernetesCredentials.caCertDataBase64) + val clientKeyFile = writeBase64CredentialAndGetConf("key.key", resolvedDirectory, + KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE, kubernetesCredentials.clientKeyDataBase64) + val clientCertFile = writeBase64CredentialAndGetConf("cert.crt", resolvedDirectory, + KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE, kubernetesCredentials.clientCertDataBase64) + (oauthTokenFile ++ caCertFile ++ clientKeyFile ++ clientCertFile).toMap + } + + private def writeRawStringCredentialAndGetConf( + fileName: String, + dir: File, + conf: OptionalConfigEntry[String], + credential: Option[String]): Option[(String, String)] = { + credential.map { cred => + val credentialFile = new File(dir, fileName) + Files.write(cred, credentialFile, Charsets.UTF_8) + (conf.key, credentialFile.getAbsolutePath) + } + } + + private def writeBase64CredentialAndGetConf( + fileName: String, + dir: File, + conf: OptionalConfigEntry[String], + credential: Option[String]): Option[(String, String)] = { + credential.map { cred => + val credentialFile = new File(dir, fileName) + Files.write(BaseEncoding.base64().decode(cred), credentialFile) + (conf.key, credentialFile.getAbsolutePath) + } + } /** * Retrieve the path on the driver container where the main app resource is, and what value it @@ -330,26 +387,43 @@ private[spark] class KubernetesSparkRestServer( private[spark] object KubernetesSparkRestServer { private val barrier = new CountDownLatch(1) + private val SECURE_RANDOM = new SecureRandom() def main(args: Array[String]): Unit = { val parsedArguments = KubernetesSparkRestServerArguments.fromArgsArray(args) val secretFile = new File(parsedArguments.secretFile.get) - if (!secretFile.isFile) { - throw new IllegalArgumentException(s"Secret file specified by --secret-file" + - " is not a file, or does not exist.") - } + require(secretFile.isFile, "Secret file specified by --secret-file is not a file, or" + + " does not exist.") val sslOptions = if (parsedArguments.useSsl) { - val keyStorePassword = parsedArguments - .keyStorePasswordFile - .map(new File(_)) - .map(Files.toString(_, Charsets.UTF_8)) + validateSslOptions(parsedArguments) val keyPassword = parsedArguments .keyPasswordFile .map(new File(_)) .map(Files.toString(_, Charsets.UTF_8)) + // If key password isn't set but we're using PEM files, generate a password + .orElse(parsedArguments.keyPemFile.map(_ => randomPassword())) + val keyStorePassword = parsedArguments + .keyStorePasswordFile + .map(new File(_)) + .map(Files.toString(_, Charsets.UTF_8)) + // If keystore password isn't set but we're using PEM files, generate a password + .orElse(parsedArguments.keyPemFile.map(_ => randomPassword())) + val resolvedKeyStore = parsedArguments.keyStoreFile.map(new File(_)).orElse( + parsedArguments.keyPemFile.map(keyPemFile => { + parsedArguments.certPemFile.map(certPemFile => { + PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile( + new File(keyPemFile), + new File(certPemFile), + "provided-key", + keyStorePassword, + keyPassword, + parsedArguments.keyStoreType) + }) + }).getOrElse(throw new SparkException("When providing PEM files to set up TLS for the" + + " submission server, both the key and the certificate must be specified."))) new SSLOptions( enabled = true, - keyStore = parsedArguments.keyStoreFile.map(new File(_)), + keyStore = resolvedKeyStore, keyStoreType = parsedArguments.keyStoreType, keyStorePassword = keyStorePassword, keyPassword = keyPassword) @@ -378,5 +452,25 @@ private[spark] object KubernetesSparkRestServer { barrier.await() System.exit(exitCode.get()) } + + private def validateSslOptions(parsedArguments: KubernetesSparkRestServerArguments): Unit = { + parsedArguments.keyStoreFile.foreach { _ => + require(parsedArguments.keyPemFile.orElse(parsedArguments.certPemFile).isEmpty, + "Cannot provide both key/cert PEM files and a keyStore file; select one or the other" + + " for configuring SSL.") + } + parsedArguments.keyPemFile.foreach { _ => + require(parsedArguments.certPemFile.isDefined, + "When providing the key PEM file, the certificate PEM file must also be provided.") + } + parsedArguments.certPemFile.foreach { _ => + require(parsedArguments.keyPemFile.isDefined, + "When providing the certificate PEM file, the key PEM file must also be provided.") + } + } + + private def randomPassword(): String = { + RandomStringUtils.random(1024, 0, Integer.MAX_VALUE, false, false, null, SECURE_RANDOM) + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala index fa8362677f38f..1416476824793 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala @@ -39,8 +39,8 @@ private[spark] class NodePortUrisDriverServiceManager extends DriverServiceManag val urlScheme = if (sparkConf.get(DRIVER_SUBMIT_SSL_ENABLED)) { "https" } else { - logWarning("Submitting application details, application secret, and local" + - " jars to the cluster over an insecure connection. You should configure SSL" + + logWarning("Submitting application details, application secret, Kubernetes credentials," + + " and local jars to the cluster over an insecure connection. You should configure SSL" + " to secure this step.") "http" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/PemsToKeyStoreConverter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/PemsToKeyStoreConverter.scala new file mode 100644 index 0000000000000..e5c43560eccb4 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/PemsToKeyStoreConverter.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import java.io.{File, FileInputStream, FileOutputStream, InputStreamReader} +import java.nio.file.Paths +import java.security.{KeyStore, PrivateKey} +import java.security.cert.Certificate +import java.util.UUID + +import com.google.common.base.Charsets +import org.bouncycastle.asn1.pkcs.PrivateKeyInfo +import org.bouncycastle.cert.X509CertificateHolder +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter +import org.bouncycastle.openssl.{PEMKeyPair, PEMParser} +import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter +import scala.collection.mutable + +import org.apache.spark.SparkException +import org.apache.spark.util.Utils + +private[spark] object PemsToKeyStoreConverter { + + /** + * Loads the given key-cert pair into a temporary keystore file. Returns the File pointing + * to where the keyStore was written to disk. + */ + def convertPemsToTempKeyStoreFile( + keyPemFile: File, + certPemFile: File, + keyAlias: String, + keyStorePassword: Option[String], + keyPassword: Option[String], + keyStoreType: Option[String]): File = { + require(keyPemFile.isFile, s"Key PEM file provided at ${keyPemFile.getAbsolutePath}" + + " does not exist or is not a file.") + require(certPemFile.isFile, s"Cert PEM file provided at ${certPemFile.getAbsolutePath}" + + " does not exist or is not a file.") + val privateKey = parsePrivateKeyFromPemFile(keyPemFile) + val certificates = parseCertificatesFromPemFile(certPemFile) + val resolvedKeyStoreType = keyStoreType.getOrElse(KeyStore.getDefaultType) + val keyStore = KeyStore.getInstance(resolvedKeyStoreType) + keyStore.load(null, null) + keyStore.setKeyEntry( + keyAlias, + privateKey, + keyPassword.map(_.toCharArray).orNull, + certificates) + val keyStoreOutputPath = Paths.get(s"keystore-${UUID.randomUUID()}.$resolvedKeyStoreType") + Utils.tryWithResource(new FileOutputStream(keyStoreOutputPath.toFile)) { storeStream => + keyStore.store(storeStream, keyStorePassword.map(_.toCharArray).orNull) + } + keyStoreOutputPath.toFile + } + + def convertCertPemToTrustStore( + certPemFile: File, + trustStoreType: Option[String]): KeyStore = { + require(certPemFile.isFile, s"Cert PEM file provided at ${certPemFile.getAbsolutePath}" + + " does not exist or is not a file.") + val trustStore = KeyStore.getInstance(trustStoreType.getOrElse(KeyStore.getDefaultType)) + trustStore.load(null, null) + parseCertificatesFromPemFile(certPemFile).zipWithIndex.foreach { case (cert, index) => + trustStore.setCertificateEntry(s"certificate-$index", cert) + } + trustStore + } + + private def withPemParsedFromFile[T](pemFile: File)(f: (PEMParser => T)): T = { + Utils.tryWithResource(new FileInputStream(pemFile)) { pemStream => + Utils.tryWithResource(new InputStreamReader(pemStream, Charsets.UTF_8)) { pemReader => + Utils.tryWithResource(new PEMParser(pemReader))(f) + } + } + } + + private def parsePrivateKeyFromPemFile(keyPemFile: File): PrivateKey = { + withPemParsedFromFile(keyPemFile) { keyPemParser => + val converter = new JcaPEMKeyConverter + keyPemParser.readObject() match { + case privateKey: PrivateKeyInfo => + converter.getPrivateKey(privateKey) + case keyPair: PEMKeyPair => + converter.getPrivateKey(keyPair.getPrivateKeyInfo) + case _ => + throw new SparkException(s"Key file provided at ${keyPemFile.getAbsolutePath}" + + s" is not a key pair or private key PEM file.") + } + } + } + + private def parseCertificatesFromPemFile(certPemFile: File): Array[Certificate] = { + withPemParsedFromFile(certPemFile) { certPemParser => + val certificates = mutable.Buffer[Certificate]() + var pemObject = certPemParser.readObject() + while (pemObject != null) { + pemObject match { + case certificate: X509CertificateHolder => + val converter = new JcaX509CertificateConverter + certificates += converter.getCertificate(certificate) + case _ => + } + pemObject = certPemParser.readObject() + } + if (certificates.isEmpty) { + throw new SparkException(s"No certificates found in ${certPemFile.getAbsolutePath}") + } + certificates.toArray + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 90907ff83ed84..234829a541c30 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -16,17 +16,14 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.UUID -import java.util.concurrent.Executors import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import com.google.common.util.concurrent.ThreadFactoryBuilder import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.deploy.kubernetes.{Client, KubernetesClientBuilder} +import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.rpc.RpcEndpointAddress @@ -76,8 +73,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) - private val kubernetesClient = KubernetesClientBuilder - .buildFromWithinPod(kubernetesNamespace) + private val kubernetesClient = new KubernetesClientBuilder(conf, kubernetesNamespace) + .buildFromWithinPod() private val driverPod = try { kubernetesClient.pods().inNamespace(kubernetesNamespace). diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile index 3bf6b50ff69c1..1f35e7e5eb209 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -42,6 +42,8 @@ CMD SSL_ARGS="" && \ if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_TYPE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-type $SPARK_SUBMISSION_KEYSTORE_TYPE"; fi && \ if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-password-file $SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"; fi && \ if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-key-password-file $SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"; fi && \ + if ! [ -z ${SPARK_SUBMISSION_KEY_PEM_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --key-pem-file $SPARK_SUBMISSION_KEY_PEM_FILE"; fi && \ + if ! [ -z ${SPARK_SUBMISSION_CERT_PEM_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --cert-pem-file $SPARK_SUBMISSION_CERT_PEM_FILE"; fi && \ exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer \ --hostname $HOSTNAME \ --port $SPARK_SUBMISSION_SERVER_PORT \ diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index 03c713b6bc068..048d2cc86e185 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -106,10 +106,6 @@ - - org.bouncycastle - bcpkix-jdk15on - diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 6aa1c1fee0d47..0e55e64fd1d77 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.kubernetes.integrationtest import java.io.File import java.nio.file.Paths import java.util.UUID +import java.util.concurrent.TimeUnit import com.google.common.base.Charsets import com.google.common.collect.ImmutableList @@ -54,6 +55,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { private val HELPER_JAR_FILE = Paths.get("target", "integration-tests-spark-jobs-helpers") .toFile .listFiles()(0) + private val SUBMITTER_LOCAL_MAIN_APP_RESOURCE = s"file://${EXAMPLES_JAR_FILE.getAbsolutePath}" + private val CONTAINER_LOCAL_MAIN_APP_RESOURCE = s"local:///opt/spark/examples/" + + s"integration-tests-jars/${EXAMPLES_JAR_FILE.getName}" + private val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" + + s"integration-tests-jars/${HELPER_JAR_FILE.getName}" private val TEST_EXISTENCE_FILE = Paths.get("test-data", "input.txt").toFile private val TEST_EXISTENCE_FILE_CONTENTS = Files.toString(TEST_EXISTENCE_FILE, Charsets.UTF_8) @@ -66,8 +72,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { private val NAMESPACE = UUID.randomUUID().toString.replaceAll("-", "") private var minikubeKubernetesClient: KubernetesClient = _ private var clientConfig: Config = _ - private var keyStoreFile: File = _ - private var trustStoreFile: File = _ + private var sparkConf: SparkConf = _ override def beforeAll(): Unit = { Minikube.startMinikube() @@ -79,13 +84,6 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .done() minikubeKubernetesClient = Minikube.getKubernetesClient.inNamespace(NAMESPACE) clientConfig = minikubeKubernetesClient.getConfiguration - val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair( - Minikube.getMinikubeIp, - "changeit", - "changeit", - "changeit") - keyStoreFile = keyStore - trustStoreFile = trustStore } before { @@ -100,6 +98,22 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { || servicesList.getItems == null || servicesList.getItems.isEmpty) } + sparkConf = new SparkConf(true) + .setMaster(s"k8s://https://${Minikube.getMinikubeIp}:8443") + .set(KUBERNETES_SUBMIT_CA_CERT_FILE, clientConfig.getCaCertFile) + .set(KUBERNETES_SUBMIT_CLIENT_KEY_FILE, clientConfig.getClientKeyFile) + .set(KUBERNETES_SUBMIT_CLIENT_CERT_FILE, clientConfig.getClientCertFile) + .set(KUBERNETES_NAMESPACE, NAMESPACE) + .set(DRIVER_DOCKER_IMAGE, "spark-driver:latest") + .set(EXECUTOR_DOCKER_IMAGE, "spark-executor:latest") + .setJars(Seq(HELPER_JAR_FILE.getAbsolutePath)) + .set("spark.executor.memory", "500m") + .set("spark.executor.cores", "1") + .set("spark.executors.instances", "1") + .set("spark.app.name", "spark-pi") + .set("spark.ui.enabled", "true") + .set("spark.testing", "false") + .set(WAIT_FOR_APP_COMPLETION, false) } after { @@ -112,7 +126,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .delete }) // spark-submit sets system properties so we have to clear them - new SparkConf(true).getAll.map(_._1).foreach { System.clearProperty } + new SparkConf(true) + .getAll.map(_._1) + .filter(_ != "spark.docker.test.persistMinikube") + .foreach { System.clearProperty } } override def afterAll(): Unit = { @@ -156,31 +173,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { } test("Run a simple example") { - // We'll make assertions based on spark rest api, so we need to turn on - // spark.ui.enabled explicitly since the scalatest-maven-plugin would set it - // to false by default. - val sparkConf = new SparkConf(true) - .setMaster(s"k8s://https://${Minikube.getMinikubeIp}:8443") - .set("spark.kubernetes.submit.caCertFile", clientConfig.getCaCertFile) - .set("spark.kubernetes.submit.clientKeyFile", clientConfig.getClientKeyFile) - .set("spark.kubernetes.submit.clientCertFile", clientConfig.getClientCertFile) - .set("spark.kubernetes.namespace", NAMESPACE) - .set("spark.kubernetes.driver.docker.image", "spark-driver:latest") - .set("spark.kubernetes.executor.docker.image", "spark-executor:latest") - .set("spark.jars", HELPER_JAR_FILE.getAbsolutePath) - .set("spark.executor.memory", "500m") - .set("spark.executor.cores", "1") - .set("spark.executors.instances", "1") - .set("spark.app.name", "spark-pi") - .set("spark.ui.enabled", "true") - .set("spark.testing", "false") - .set("spark.kubernetes.submit.waitAppCompletion", "false") - val mainAppResource = s"file://${EXAMPLES_JAR_FILE.getAbsolutePath}" - new Client( sparkConf = sparkConf, mainClass = SPARK_PI_MAIN_CLASS, - mainAppResource = mainAppResource, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, appArgs = Array.empty[String]).run() val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) @@ -199,64 +195,38 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--class", SPARK_PI_MAIN_CLASS, "--conf", "spark.ui.enabled=true", "--conf", "spark.testing=false", - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", + "--conf", s"${KUBERNETES_SUBMIT_CA_CERT_FILE.key}=${clientConfig.getCaCertFile}", + "--conf", s"${KUBERNETES_SUBMIT_CLIENT_KEY_FILE.key}=${clientConfig.getClientKeyFile}", + "--conf", s"${KUBERNETES_SUBMIT_CLIENT_CERT_FILE.key}=${clientConfig.getClientCertFile}", + "--conf", s"${EXECUTOR_DOCKER_IMAGE.key}=spark-executor:latest", + "--conf", s"${DRIVER_DOCKER_IMAGE.key}=spark-driver:latest", + "--conf", s"${WAIT_FOR_APP_COMPLETION.key}=false", EXAMPLES_JAR_FILE.getAbsolutePath) SparkSubmit.main(args) val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) } - test("Run using spark-submit with the examples jar on the docker image") { - val args = Array( - "--master", s"k8s://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-pi", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", s"local:///opt/spark/examples/integration-tests-jars/${HELPER_JAR_FILE.getName}", - "--class", SPARK_PI_MAIN_CLASS, - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - s"local:///opt/spark/examples/integration-tests-jars/${EXAMPLES_JAR_FILE.getName}") - SparkSubmit.main(args) + test("Run with the examples jar on the docker image") { + sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = CONTAINER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) } test("Run with custom labels and annotations") { - val args = Array( - "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-pi", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", HELPER_JAR_FILE.getAbsolutePath, - "--class", SPARK_PI_MAIN_CLASS, - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.driver.labels=label1=label1value,label2=label2value", - "--conf", "spark.kubernetes.driver.annotations=" + - "annotation1=annotation1value," + - "annotation2=annotation2value", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - EXAMPLES_JAR_FILE.getAbsolutePath) - SparkSubmit.main(args) + sparkConf.set(KUBERNETES_DRIVER_LABELS, "label1=label1value,label2=label2value") + sparkConf.set(KUBERNETES_DRIVER_ANNOTATIONS, "annotation1=annotation1value," + + "annotation2=annotation2value") + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() val driverPodMetadata = minikubeKubernetesClient .pods .withLabel("spark-app-name", "spark-pi") @@ -283,57 +253,41 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { } test("Enable SSL on the driver submit server") { - val args = Array( - "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-pi", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", HELPER_JAR_FILE.getAbsolutePath, - "--class", SPARK_PI_MAIN_CLASS, - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.ssl.kubernetes.submit.enabled=true", - "--conf", "spark.ssl.kubernetes.submit.keyStore=" + - s"file://${keyStoreFile.getAbsolutePath}", - "--conf", "spark.ssl.kubernetes.submit.keyStorePassword=changeit", - "--conf", "spark.ssl.kubernetes.submit.keyPassword=changeit", - "--conf", "spark.ssl.kubernetes.submit.trustStore=" + - s"file://${trustStoreFile.getAbsolutePath}", - "--conf", s"spark.ssl.kubernetes.driverlaunch.trustStorePassword=changeit", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - EXAMPLES_JAR_FILE.getAbsolutePath) - SparkSubmit.main(args) + val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair( + Minikube.getMinikubeIp, + "changeit", + "changeit", + "changeit") + sparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE, s"file://${keyStoreFile.getAbsolutePath}") + sparkConf.set("spark.ssl.kubernetes.driversubmitserver.keyStorePassword", "changeit") + sparkConf.set("spark.ssl.kubernetes.driversubmitserver.keyPassword", "changeit") + sparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE, + s"file://${trustStoreFile.getAbsolutePath}") + sparkConf.set("spark.ssl.kubernetes.driversubmitserver.trustStorePassword", "changeit") + sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() + } + + test("Enable SSL on the driver submit server using PEM files") { + val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp) + sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}") + sparkConf.set(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM, s"file://${certPem.getAbsolutePath}") + sparkConf.set(DRIVER_SUBMIT_SSL_SERVER_CERT_PEM, s"file://${certPem.getAbsolutePath}") + sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() } test("Added files should exist on the driver.") { - val args = Array( - "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-file-existence-test", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", HELPER_JAR_FILE.getAbsolutePath, - "--files", TEST_EXISTENCE_FILE.getAbsolutePath, - "--class", FILE_EXISTENCE_MAIN_CLASS, - "--conf", "spark.ui.enabled=false", - "--conf", "spark.testing=true", - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - EXAMPLES_JAR_FILE.getAbsolutePath, - TEST_EXISTENCE_FILE.getName, - TEST_EXISTENCE_FILE_CONTENTS) + sparkConf.set("spark.files", TEST_EXISTENCE_FILE.getAbsolutePath) + sparkConf.setAppName("spark-file-existence-test") val podCompletedFuture = SettableFuture.create[Boolean] val watch = new Watcher[Pod] { override def eventReceived(action: Action, pod: Pod): Unit = { @@ -364,8 +318,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .pods .withLabel("spark-app-name", "spark-file-existence-test") .watch(watch)) { _ => - SparkSubmit.main(args) - assert(podCompletedFuture.get, "Failed to run driver pod") + new Client( + sparkConf = sparkConf, + mainClass = FILE_EXISTENCE_MAIN_CLASS, + mainAppResource = CONTAINER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array(TEST_EXISTENCE_FILE.getName, TEST_EXISTENCE_FILE_CONTENTS)).run() + assert(podCompletedFuture.get(60, TimeUnit.SECONDS), "Failed to run driver pod") val driverPod = minikubeKubernetesClient .pods .withLabel("spark-app-name", "spark-file-existence-test") @@ -386,27 +344,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { Utils.tryWithResource(minikubeKubernetesClient.services() .withLabel("spark-app-name", "spark-pi") .watch(externalUriProviderWatch)) { _ => - val args = Array( - "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", - "--deploy-mode", "cluster", - "--kubernetes-namespace", NAMESPACE, - "--name", "spark-pi", - "--executor-memory", "512m", - "--executor-cores", "1", - "--num-executors", "1", - "--jars", HELPER_JAR_FILE.getAbsolutePath, - "--class", SPARK_PI_MAIN_CLASS, - "--conf", "spark.ui.enabled=true", - "--conf", "spark.testing=false", - "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", - "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", - "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", - "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", - "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - "--conf", s"${DRIVER_SERVICE_MANAGER_TYPE.key}=${ExternalSuppliedUrisDriverServiceManager.TYPE}", - EXAMPLES_JAR_FILE.getAbsolutePath) - SparkSubmit.main(args) + sparkConf.set(DRIVER_SERVICE_MANAGER_TYPE, ExternalSuppliedUrisDriverServiceManager.TYPE) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() val sparkMetricsService = getSparkMetricsService("spark-pi") expectationsForStaticAllocation(sparkMetricsService) assert(externalUriProviderWatch.annotationSet.get) @@ -425,4 +368,17 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "Resolved URI annotation not set on driver service.") } } + + test("Mount the Kubernetes credentials onto the driver pod") { + sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE, clientConfig.getCaCertFile) + sparkConf.set(KUBERNETES_DRIVER_CLIENT_KEY_FILE, clientConfig.getClientKeyFile) + sparkConf.set(KUBERNETES_DRIVER_CLIENT_CERT_FILE, clientConfig.getClientCertFile) + new Client( + sparkConf = sparkConf, + mainClass = SPARK_PI_MAIN_CLASS, + mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE, + appArgs = Array.empty[String]).run() + val sparkMetricsService = getSparkMetricsService("spark-pi") + expectationsForStaticAllocation(sparkMetricsService) + } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala index bde7b43226660..2078e0585e8f0 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala @@ -16,15 +16,18 @@ */ package org.apache.spark.deploy.kubernetes.integrationtest.sslutil -import java.io.{File, FileOutputStream} +import java.io.{File, FileOutputStream, OutputStreamWriter} import java.math.BigInteger import java.nio.file.Files -import java.security.{KeyPairGenerator, KeyStore, SecureRandom} +import java.security.cert.X509Certificate +import java.security.{KeyPair, KeyPairGenerator, KeyStore, SecureRandom} import java.util.{Calendar, Random} import javax.security.auth.x500.X500Principal +import com.google.common.base.Charsets import org.bouncycastle.asn1.x509.{Extension, GeneralName, GeneralNames} import org.bouncycastle.cert.jcajce.{JcaX509CertificateConverter, JcaX509v3CertificateBuilder} +import org.bouncycastle.openssl.jcajce.JcaPEMWriter import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder import org.apache.spark.util.Utils @@ -39,6 +42,58 @@ private[spark] object SSLUtils { val keyPairGenerator = KeyPairGenerator.getInstance("RSA") keyPairGenerator.initialize(512) val keyPair = keyPairGenerator.generateKeyPair() + val certificate = generateCertificate(ipAddress, keyPair) + val keyStore = KeyStore.getInstance("JKS") + keyStore.load(null, null) + keyStore.setKeyEntry("key", keyPair.getPrivate, + keyPassword.toCharArray, Array(certificate)) + val tempDir = Files.createTempDirectory("temp-ssl-stores").toFile + tempDir.deleteOnExit() + val keyStoreFile = new File(tempDir, "keyStore.jks") + Utils.tryWithResource(new FileOutputStream(keyStoreFile)) { + keyStore.store(_, keyStorePassword.toCharArray) + } + val trustStore = KeyStore.getInstance("JKS") + trustStore.load(null, null) + trustStore.setCertificateEntry("key", certificate) + val trustStoreFile = new File(tempDir, "trustStore.jks") + Utils.tryWithResource(new FileOutputStream(trustStoreFile)) { + trustStore.store(_, trustStorePassword.toCharArray) + } + (keyStoreFile, trustStoreFile) + } + + def generateKeyCertPemPair(ipAddress: String): (File, File) = { + val keyPairGenerator = KeyPairGenerator.getInstance("RSA") + keyPairGenerator.initialize(512) + val keyPair = keyPairGenerator.generateKeyPair() + val certificate = generateCertificate(ipAddress, keyPair) + val tempDir = Files.createTempDirectory("temp-ssl-pems").toFile + tempDir.deleteOnExit() + val keyPemFile = new File(tempDir, "key.pem") + val certPemFile = new File(tempDir, "cert.pem") + Utils.tryWithResource(new FileOutputStream(keyPemFile)) { keyPemStream => + Utils.tryWithResource( + new OutputStreamWriter(keyPemStream, Charsets.UTF_8)) { streamWriter => + Utils.tryWithResource( + new JcaPEMWriter(streamWriter)) { pemWriter => + pemWriter.writeObject(keyPair.getPrivate) + } + } + } + Utils.tryWithResource(new FileOutputStream(certPemFile)) { keyPemStream => + Utils.tryWithResource( + new OutputStreamWriter(keyPemStream, Charsets.UTF_8)) { streamWriter => + Utils.tryWithResource( + new JcaPEMWriter(streamWriter)) { pemWriter => + pemWriter.writeObject(certificate) + } + } + } + (keyPemFile, certPemFile) + } + + private def generateCertificate(ipAddress: String, keyPair: KeyPair): X509Certificate = { val selfPrincipal = new X500Principal(s"cn=$ipAddress") val currentDate = Calendar.getInstance val validForOneHundredYears = Calendar.getInstance @@ -56,25 +111,6 @@ private[spark] object SSLUtils { .setSecureRandom(new SecureRandom()) .build(keyPair.getPrivate) val bcCertificate = certificateBuilder.build(signer) - val jcaCertificate = new JcaX509CertificateConverter().getCertificate(bcCertificate) - val keyStore = KeyStore.getInstance("JKS") - keyStore.load(null, null) - keyStore.setKeyEntry("key", keyPair.getPrivate, - keyPassword.toCharArray, Array(jcaCertificate)) - val tempDir = Files.createTempDirectory("temp-ssl-stores").toFile() - tempDir.deleteOnExit() - val keyStoreFile = new File(tempDir, "keyStore.jks") - Utils.tryWithResource(new FileOutputStream(keyStoreFile)) { - keyStore.store(_, keyStorePassword.toCharArray) - } - val trustStore = KeyStore.getInstance("JKS") - trustStore.load(null, null) - trustStore.setCertificateEntry("key", jcaCertificate) - val trustStoreFile = new File(tempDir, "trustStore.jks") - Utils.tryWithResource(new FileOutputStream(trustStoreFile)) { - trustStore.store(_, trustStorePassword.toCharArray) - } - (keyStoreFile, trustStoreFile) + new JcaX509CertificateConverter().getCertificate(bcCertificate) } - }