-
Notifications
You must be signed in to change notification settings - Fork 118
Access the Driver Launcher Server over NodePort for app launch + submit jars #30
Conversation
containerPorts += new ContainerPortBuilder() | ||
.withContainerPort(portValue) | ||
.build() | ||
private def getSubmitErrorMessage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this message construction as part of this PR since the indentation levels were growing confusing at the top level method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 that run()
method was getting ginormous -- I'd be happy breaking it up even more
@@ -110,37 +108,41 @@ private[spark] class Client( | |||
.done() | |||
try { | |||
val selectors = Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue).asJava | |||
val (servicePorts, containerPorts) = configurePorts() | |||
val containerPorts = configureContainerPorts() | |||
val driverLauncherServicePort = new ServicePortBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should delay the creation of the service as much as possible; meaning that we should wait till after the pod is submitted and running. Also, we should make a decision as to whether there are local jars being submitted or not. If there aren't, we don't need to expose the NodePort at all. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delaying the service creation makes sense, but I don't think we should branch on whether or not local jars are uploaded. The latter is mostly due to the complexity we would add. Right now our launch logic is pretty tightly coupled to KubernetesSparkRestServer, and even without the consideration of local jars we do a lot there to build the classpath and form the specific Java command. Presumably when we start to handle Python and other languages that class will need more functionality. The driver's Dockerfile assumes the usage of KubernetesSparkRestServer as well. To branch we would need to create a separate Dockerfile and launch setup and that doesn't seem worth the added complexity.
.filter(_.getName == DRIVER_LAUNCHER_SERVICE_PORT_NAME) | ||
.head | ||
.getNodePort | ||
// NodePort is exposed on every node, so just pick one of them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@foxish can we use the API server here? Is the API server also going to be guaranteed to be listening on the NodePort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to use the APIServer proxy if we're using NodePort, it should be exposed directly on the NodeIP:port.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant can we use the API server's host with the NodePort. Basically instead of selecting a node at random we can always pick to use the API server host.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The port should be exposed on all nodes and we can pick any at random. Using the Master's port seems less than ideal because we're trying to shift traffic away from it by not using the apiserver proxy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I was mostly thinking in the context of when we introduce SSL. Mainly because the certificate issued by the server will now need to have its hostnames match all of the kubelets so the client can domain verify against any of them. I suppose if we document this clearly then we should be ok though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.withName(UI_PORT_NAME) | ||
.withPort(uiPort) | ||
.withNewTargetPort(uiPort) | ||
.build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's typical Spark style to always include parenthesis when calling a function
.build -> .build(), same else where
- Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started
33f4f11
to
3e7ce16
Compare
@foxish @ash211 @erikerlandson As mentioned in my latest comment on #24, I think this implementation here is what we want to go with for now. |
sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) | ||
sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) | ||
val submitRequest = buildSubmissionRequest() | ||
val selectors = Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue).asJava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bad merge here, will fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a pretty close look -- some small coding/style nits but the thing I'm actually concerned about is the no SSL on driver upload, and especially sending a secret in plaintext across the network.
What's the right way to bring that secret transfer back to being over SSL?
EDIT: now I see your other PR adding SSL..
val node = kubernetesClient.nodes.list.getItems.asScala.head | ||
val nodeAddress = node.getStatus.getAddresses.asScala.head.getAddress | ||
val url = s"http://$nodeAddress:$servicePort" | ||
HttpClientUtil.createClient[KubernetesSparkRestApi](uri = url) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like there's no ssl here anymore -- I think we still want the spark-submit upload of jars to be encrypted though? especially because there's a secret being passed as part of the KubernetesCreateSubmissionRequest
and now that's in plaintext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, saw that later... let's get that PR merged into this one and review the whole shebang together
kubernetesClient.services().withName(kubernetesAppId).edit() | ||
.editSpec() | ||
.withType("ClusterIP") | ||
.withPorts(uiServicePort) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally we could just remove the now-unnecessary port rather than resetting back to this state. Possibly an admin may have added additional ports in the meantime, that this would now override?
Alternatively we can save the "additional app ports" feature for a separate PR since this is already a larger one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't say "remove this port" with the services API; the ports need to be entirely re-defined on every edit of the service.
containerPorts += new ContainerPortBuilder() | ||
.withContainerPort(portValue) | ||
.build() | ||
private def getSubmitErrorMessage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 that run()
method was getting ginormous -- I'd be happy breaking it up even more
.map(_.toInt) | ||
.getOrElse(DEFAULT_UI_PORT)) | ||
(servicePorts, containerPorts) | ||
private def configureContainerPorts(): Seq[ContainerPort] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name more like createContainerPortSpecs
val finalErrorMessage = s"$topLevelMessage\n" + | ||
s"$podStatusPhase\n" + | ||
s"$podStatusMessage\n\n$failedDriverContainerStatusString" | ||
val finalErrorMessage: String = getSubmitErrorMessage(kubernetesClient, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe name this more like constructDriverSubmitErrorMessage
-- getX
makes me think there's no external calls being made, whereas this hits apiserver again looking for more detailed diagnostics
uri = url, | ||
sslSocketFactory = sslContext.getSocketFactory, | ||
trustContext = trustManager) | ||
private def getDriverLauncherClient( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
constructDriverLauncherClient
adjustServicePort onFailure { | ||
case throwable: Throwable => | ||
submitCompletedFuture.setException(throwable) | ||
kubernetesClient.services().delete(service) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need a try/catch around this also? We do this often enough and log a message that I wonder if we need a tryOrLogError
wrapper we can put around this thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a Utils.tryAndLogNonFatal
but I've actually had some problems with that method awhile ago, particularly with nested closures. However it was awhile ago and I can't remember what the exact problem was. Might want to look into that a little more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need a try-catch here since the error will appear on the separate thread.
val submitCompletedFuture = SettableFuture.create[Boolean] | ||
val secretDirectory = s"$SPARK_SUBMISSION_SECRET_BASE_DIR/$kubernetesAppId" | ||
|
||
val submitPending = new AtomicBoolean(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this addition because previously a modification of any sort to the pod would trigger the watcher and start a second jar upload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More that I'm not sure if the implementation of watches can cause this method to be running twice simultaneously. If this method could be run again while a first invocation of the method has not completed and hence not set the future's status, then yes it would be possible that the upload logic is triggered twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this doesn't even matter since some of the logic is asynchronous to this method's call (see our usage of futures) - so we certainly need this and we can't use synchronized
to work around that.
Also I think you could edit the PR to be named something more like "Upload local jars to driver via NodePort instead of ServerApi" so the content of the change is more clear from reading the PR description, not just that there was a change (pretty much all you get from generic revamp/refactor/edit titles) |
Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod()
@aash addressed the comments. Used |
In a subsequent PR, we should do #57. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a few more comments on here -- next step is to merge #49 into here
.build(), | ||
new ContainerPortBuilder() | ||
.withContainerPort(uiPort) | ||
.build()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be rewritten to:
private def buildContainerPorts(): Seq[ContainerPort] = {
Seq(sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
sparkConf.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT),
DRIVER_LAUNCHER_SERVICE_INTERNAL_PORT
uiPort)
.map(p => new ContainerPortBuilder().withContainerPort(p).build())
}
.filter(_.getName == DRIVER_LAUNCHER_SERVICE_PORT_NAME) | ||
.head | ||
.getNodePort | ||
// NodePort is exposed on every node, so just pick one of them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -92,7 +92,7 @@ private[spark] class KubernetesClusterSchedulerBackend( | |||
protected var totalExpectedExecutors = new AtomicInteger(0) | |||
|
|||
private val driverUrl = RpcEndpointAddress( | |||
System.getenv(s"${convertToEnvMode(kubernetesDriverServiceName)}_SERVICE_HOST"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does removing this also allow us to remove the convertToEnvMode
method? I don't see it used elsewhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we can remove that
@@ -92,7 +92,7 @@ private[spark] class KubernetesClusterSchedulerBackend( | |||
protected var totalExpectedExecutors = new AtomicInteger(0) | |||
|
|||
private val driverUrl = RpcEndpointAddress( | |||
System.getenv(s"${convertToEnvMode(kubernetesDriverServiceName)}_SERVICE_HOST"), | |||
sc.getConf.get("spark.driver.host"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why set this using spark config instead of the k8s-provided envvar? does this get printed in logs / Spark UI and therefore we should set it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now don't create the service before creating the pod, which means this environment variable won't be set. Also, SERVICE_HOST
refers to the service's IP but we're no longer exposing the driver port through the service (this is different from the driver launcher server port)
val node = kubernetesClient.nodes.list.getItems.asScala.head | ||
val nodeAddress = node.getStatus.getAddresses.asScala.head.getAddress | ||
val url = s"http://$nodeAddress:$servicePort" | ||
HttpClientUtil.createClient[KubernetesSparkRestApi](uri = url) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, saw that later... let's get that PR merged into this one and review the whole shebang together
…te-incremental' into nodeport-upload
* Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts
@@ -132,6 +132,24 @@ To specify a main application resource that is in the Docker image, and if it ha | |||
--conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ | |||
container:///home/applications/examples/example.jar | |||
|
|||
### Setting Up SSL For Submitting the Driver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be something we want to note here about certificates and hostname matching against the Kubelets.
--hostname $HOSTNAME \ | ||
--port $SPARK_DRIVER_LAUNCHER_SERVER_PORT \ | ||
--secret-file $SPARK_SUBMISSION_SECRET_LOCATION \ | ||
${SSL_ARGS} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about creating a shell script for this? That would make it more readable and editor-friendly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considered this - we can follow up in a separate PR, but I appreciate the visibility of embedding the command directly in the Docker file, as opposed to the indirection where a user would look at the Docker file and then have to look at a separate script afterwards.
} | ||
} finally { | ||
if (!submitSucceeded) { | ||
Utils.tryLogNonFatalError({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the paren around the anonymous block
} finally { | ||
kubernetesClient.secrets().delete(secret) | ||
Utils.tryLogNonFatalError({ | ||
kubernetesClient.secrets().delete(submitServerSecret) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
kubernetesClient.secrets().delete(submitServerSecret) | ||
}) | ||
Utils.tryLogNonFatalError({ | ||
kubernetesClient.secrets().delete(sslSecrets: _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
s"file://${trustStoreFile.getAbsolutePath}", | ||
"--conf", s"spark.ssl.kubernetes.driverlaunch.trustStorePassword=changeit", | ||
EXAMPLES_JAR) | ||
SparkSubmit.main(args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can call expectationsForStaticAllocation
here to make some assertions.
Also, I think the naming of spark.ssl.kubernetes.driverlaunch.enabled
is not as good as spark.kubernetes.driverlaunch.ssl.enabled
. We should put it in the scope of spark.kubernetes.*
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use spark.ssl
to be consistent with the other SSL-related things in the Spark project.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't call expectationsForStaticAllocation
because this test solely checks for the possibility to submit the app at all over SSL. If there were test failures because we allocated the wrong number of executors, for example, the test would fail with a false negative.
I think this is in a good spot now -- @foxish any last thoughts before merging? |
@ash211 Nothing from my end, merging. |
…it jars (#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
…it jars (#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
…it jars (#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
…it jars (apache-spark-on-k8s#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (apache-spark-on-k8s#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
…it jars (apache-spark-on-k8s#30) * Revamp ports and service setup for the driver. - Expose the driver-submission service on NodePort and contact that as opposed to going through the API server proxy - Restrict the ports that are exposed on the service to only the driver submission service when uploading content and then only the Spark UI after the job has started * Move service creation down and more thorough error handling * Fix missed merge conflict * Add braces * Fix bad merge * Address comments and refactor run() more. Method nesting was getting confusing so pulled out the inner class and removed the extra method indirection from createDriverPod() * Remove unused method * Support SSL configuration for the driver application submission (apache-spark-on-k8s#49) * Support SSL when setting up the driver. The user can provide a keyStore to load onto the driver pod and the driver pod will use that keyStore to set up SSL on its server. * Clean up SSL secrets after finishing submission. We don't need to persist these after the pod has them mounted and is running already. * Fix compilation error * Revert image change * Address comments * Programmatically generate certificates for integration tests. * Address comments * Resolve merge conflicts * Fix bad merge * Remove unnecessary braces * Fix compiler error
opposed to going through the API server proxy
submission service when uploading content and then only the Spark UI
after the job has started
Closes #24 and is a prerequisite for #28.