-
Notifications
You must be signed in to change notification settings - Fork 118
Introduce blocking submit to kubernetes by default #53
Conversation
@@ -63,6 +64,20 @@ private[spark] class Client( | |||
private val driverLaunchTimeoutSecs = sparkConf.getTimeAsSeconds( | |||
"spark.kubernetes.driverLaunchTimeout", s"${DEFAULT_LAUNCH_TIMEOUT_SECONDS}s") | |||
|
|||
private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder( | |||
"spark.kubernetes.submit.waitAppCompletion") | |||
.doc("In cluster mode, whether to wait for the application to finish before exiting the " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "In cluster mode" => "In kubernetes cluster node"
@@ -24,7 +24,7 @@ import javax.net.ssl.X509TrustManager | |||
import com.google.common.io.Files | |||
import com.google.common.util.concurrent.{SettableFuture, ThreadFactoryBuilder} | |||
import io.fabric8.kubernetes.api.model._ | |||
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClientException, Watch, Watcher} | |||
import io.fabric8.kubernetes.client.{Config, ConfigBuilder => KConfigBuilder, DefaultKubernetesClient, KubernetesClientException, Watch, Watcher} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KConfigBuilder is not intuitive, maybe K8SConfigBuilder
?
.timeConf(TimeUnit.MILLISECONDS) | ||
.createWithDefaultString("1s") | ||
|
||
private val fireAndForget: Boolean = !sparkConf.get(WAIT_FOR_APP_COMPLETION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove the trailing semicolon
The string representation of
We should extract the important details like namespace/account, start/running time, service cluster ip, etc. and format them properly. As a reference, yarn cluster mode would format reports like:
|
Agreed that the |
New formatting:
|
We should look into using a One way to use a Edit: Using the watch with the above way removes any dependency on the Watch using a non-daemon thread pool. |
|
||
private def formatPodState(pod: Pod): String = { | ||
|
||
val details = Seq[(String, String)]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fabric8's objects are Jackson-annotated so I wonder if we could just use a Jackson objectmapper and pass the object through it. I've been meaning to try this for error messaging as well.
I considered the watch, but I want to make sure we get the periodic polling of the log message as well. Are you suggesting keeping a "last received state" through the watch, and having a separate timer that just logs that state every n seconds until the watch receives a terminal state? That would reduce load on the apiserver which would be nice. |
That could work to support the periodic polling. The main advantage I see is staying away from having busy-waiting loops in the code. We should look to abstract away the busy waiting loop with constructs like timers and watches. |
I like the idea of using a watch and caching the results. Like you said, we get the entire set of mutations and don't overload the apiserver. The change LGTM otherwise. |
On the other hand, I'm also wondering if the watch implementation may cause us to miss events during the establishment/teardown of the persistent connection since we are getting events over an HTTP streaming interface. This could be a very rare case, but if it did happen, it would certainly give us unexpected results. |
@foxish do you know of other places in the community where a service polls/watches for status of another pod and reacts on state changes? We're basically using the watch as a form of a message bus to receive events, and you're suggesting that the state change events are not delivered with exactly once semantics across the pod's entire lifetime. So for this code to use watches we're going to need to tighten our understanding of event delivery guarantees. Otherwise if the "pod completed" event gets missed our blocking submit will block forever (until the user's In the meantime, maybe polling apiserver isn't that bad? |
We would want to create the watch before creating the pod - you can create watches for things that don't exist and then the |
It turns out that polling would run into a similar problem. Suppose we poll every 10 seconds, since polling too frequently would certainly cause overload. But if two events occur between |
+1. Reacting to events is always more efficient than polling. Its seems to me like we're doing too much with the by-interval reporting and events together. Are there any reasons why we can't just stick to reporting the SSE to the client and getting rid of the by-interval reporting totally ? Except if its customary to always report status at some interval. Then in that case, I think @mccheah's suggestion is an ideal approach |
Anything else stopping this PR from being merged ? |
Are we in consensus that this should use a watch instead? |
We're not guaranteed to see every intermediate state even with a watch because there is a fixed window size of buffered events, and in case of a network partition, we may miss some. I don't think here there is even a requirement to see every intermediate state, as long as we can guarantee that eventually, we show the user the final state his/her job ends up in, or display an error in case the apiserver is unreachable. Yes, a watch and periodically printing out the state of the pod SGTM. If the apiserver becomes unreachable, we should also show that, as an unknown state. |
|
||
var previousPhase: String = null | ||
|
||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going with polling then we can use a TimerTask
or ScheduledExecutorService
to fetch the status periodically, and block the main thread with a CountDownLatch
. When the Timer thread detects the pod is finished, count down the latch to unblock the main thread.
Two new configuration settings: - spark.kubernetes.submit.waitAppCompletion - spark.kubernetes.report.interval
Finishing up last touches for switching to a Watch from polling, and also to accommodate the large Config change in a recently-merged PR. Should have revised code up for review later today |
70146ae
to
a69b865
Compare
@iyanuobidele @foxish @lins05 @mccheah ready for re-review now that I've moved to a Watch-based monitoring. Logging now looks like this:
|
The continuous logging might be cut off awkwardly in fire-and-forget mode. We could disable the periodic logging in fire-and-forget mode and just report the changes in pod status. |
I'll make that change today: to turn off the periodic logging in fire-and-forget mode since it cuts off as soon as the submitter is able to forget (local jars are uploaded). |
Which is enabled with spark.kubernetes.submit.waitAppCompletion=false (default: true)
…ntal' into blocking-submit
…ntal' into blocking-submit
Logs when running with
Logs when running without that (defaults to true):
|
@mccheah anything else? This PR's been open for about a week so I'd like to merge pretty soon and only hold it up longer for things that are truly blocking merge and not for things that can be added post-merge. |
* logging will be disabled. | ||
*/ | ||
private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownLatch, | ||
appId: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: four-space indentation from the leftmost margin here
Going to merge this - was going to refactor |
* Introduce blocking submit to kubernetes by default Two new configuration settings: - spark.kubernetes.submit.waitAppCompletion - spark.kubernetes.report.interval * Minor touchups * More succinct logging for pod state * Fix import order * Switch to watch-based logging * Spaces in comma-joined volumes, labels, and containers * Use CountDownLatch instead of SettableFuture * Match parallel ConfigBuilder style * Disable logging in fire-and-forget mode Which is enabled with spark.kubernetes.submit.waitAppCompletion=false (default: true) * Additional log line for when application is launched * Minor wording changes * More logging * Drop log to DEBUG
#53 added these config but didn't document them
* Document blocking submit calls #53 added these config but didn't document them * Update running-on-kubernetes.md
* Document blocking submit calls apache-spark-on-k8s#53 added these config but didn't document them * Update running-on-kubernetes.md (cherry picked from commit 82275ae)
* Introduce blocking submit to kubernetes by default Two new configuration settings: - spark.kubernetes.submit.waitAppCompletion - spark.kubernetes.report.interval * Minor touchups * More succinct logging for pod state * Fix import order * Switch to watch-based logging * Spaces in comma-joined volumes, labels, and containers * Use CountDownLatch instead of SettableFuture * Match parallel ConfigBuilder style * Disable logging in fire-and-forget mode Which is enabled with spark.kubernetes.submit.waitAppCompletion=false (default: true) * Additional log line for when application is launched * Minor wording changes * More logging * Drop log to DEBUG
* Document blocking submit calls #53 added these config but didn't document them * Update running-on-kubernetes.md
* Introduce blocking submit to kubernetes by default Two new configuration settings: - spark.kubernetes.submit.waitAppCompletion - spark.kubernetes.report.interval * Minor touchups * More succinct logging for pod state * Fix import order * Switch to watch-based logging * Spaces in comma-joined volumes, labels, and containers * Use CountDownLatch instead of SettableFuture * Match parallel ConfigBuilder style * Disable logging in fire-and-forget mode Which is enabled with spark.kubernetes.submit.waitAppCompletion=false (default: true) * Additional log line for when application is launched * Minor wording changes * More logging * Drop log to DEBUG
* Document blocking submit calls #53 added these config but didn't document them * Update running-on-kubernetes.md
…8s#53) * Introduce blocking submit to kubernetes by default Two new configuration settings: - spark.kubernetes.submit.waitAppCompletion - spark.kubernetes.report.interval * Minor touchups * More succinct logging for pod state * Fix import order * Switch to watch-based logging * Spaces in comma-joined volumes, labels, and containers * Use CountDownLatch instead of SettableFuture * Match parallel ConfigBuilder style * Disable logging in fire-and-forget mode Which is enabled with spark.kubernetes.submit.waitAppCompletion=false (default: true) * Additional log line for when application is launched * Minor wording changes * More logging * Drop log to DEBUG
…8s#53) * Introduce blocking submit to kubernetes by default Two new configuration settings: - spark.kubernetes.submit.waitAppCompletion - spark.kubernetes.report.interval * Minor touchups * More succinct logging for pod state * Fix import order * Switch to watch-based logging * Spaces in comma-joined volumes, labels, and containers * Use CountDownLatch instead of SettableFuture * Match parallel ConfigBuilder style * Disable logging in fire-and-forget mode Which is enabled with spark.kubernetes.submit.waitAppCompletion=false (default: true) * Additional log line for when application is launched * Minor wording changes * More logging * Drop log to DEBUG
* Document blocking submit calls apache-spark-on-k8s#53 added these config but didn't document them * Update running-on-kubernetes.md
Fixes #46
Two new configuration settings, modeled off the equivalent spark.yarn... settings: