Skip to content

Commit

Permalink
[SPARK-22778][KUBERNETES] Added the missing service metadata for Kube…
Browse files Browse the repository at this point in the history
…rnetesClusterManager

## What changes were proposed in this pull request?

This PR added the missing service metadata for `KubernetesClusterManager`. Without the metadata, the service loader couldn't load `KubernetesClusterManager`, and caused the driver to fail to create a `ExternalClusterManager`, as being reported in SPARK-22778. The PR also changed the `k8s:` prefix used to `k8s://`, which is what existing Spark on k8s users are familiar and used to.

## How was this patch tested?

Manual testing verified that the fix resolved the issue in SPARK-22778.

/cc vanzin felixcheung jiangxb1987

Author: Yinan Li <liyinan926@gmail.com>

Closes #19972 from liyinan926/fix-22778.
  • Loading branch information
liyinan926 authored and Marcelo Vanzin committed Dec 14, 2017
1 parent 6d99940 commit 2fe1633
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 10 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2757,7 +2757,7 @@ private[spark] object Utils extends Logging {

/**
* Check the validity of the given Kubernetes master URL and return the resolved URL. Prefix
* "k8s:" is appended to the resolved URL as the prefix is used by KubernetesClusterManager
* "k8s://" is appended to the resolved URL as the prefix is used by KubernetesClusterManager
* in canCreate to determine if the KubernetesClusterManager should be used.
*/
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
Expand All @@ -2770,7 +2770,7 @@ private[spark] object Utils extends Logging {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
return s"k8s:$resolvedURL"
return s"k8s://$resolvedURL"
}

val masterScheme = new URI(masterWithoutK8sPrefix).getScheme
Expand All @@ -2789,7 +2789,7 @@ private[spark] object Utils extends Logging {
throw new IllegalArgumentException("Invalid Kubernetes master scheme: " + masterScheme)
}

return s"k8s:$resolvedURL"
s"k8s://$resolvedURL"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class SparkSubmitSuite
childArgsMap.get("--arg") should be (Some("arg1"))
mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS)
classpath should have length (0)
conf.get("spark.master") should be ("k8s:https://host:port")
conf.get("spark.master") should be ("k8s://https://host:port")
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.driver.memory") should be ("4g")
conf.get("spark.kubernetes.namespace") should be ("spark")
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1148,16 +1148,16 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {

test("check Kubernetes master URL") {
val k8sMasterURLHttps = Utils.checkAndGetK8sMasterUrl("k8s://https://host:port")
assert(k8sMasterURLHttps === "k8s:https://host:port")
assert(k8sMasterURLHttps === "k8s://https://host:port")

val k8sMasterURLHttp = Utils.checkAndGetK8sMasterUrl("k8s://http://host:port")
assert(k8sMasterURLHttp === "k8s:http://host:port")
assert(k8sMasterURLHttp === "k8s://http://host:port")

val k8sMasterURLWithoutScheme = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1:8443")
assert(k8sMasterURLWithoutScheme === "k8s:https://127.0.0.1:8443")
assert(k8sMasterURLWithoutScheme === "k8s://https://127.0.0.1:8443")

val k8sMasterURLWithoutScheme2 = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1")
assert(k8sMasterURLWithoutScheme2 === "k8s:https://127.0.0.1")
assert(k8sMasterURLWithoutScheme2 === "k8s://https://127.0.0.1")

intercept[IllegalArgumentException] {
Utils.checkAndGetK8sMasterUrl("k8s:https://host:port")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s:" prefix here.
val master = sparkConf.get("spark.master").substring("k8s:".length)
// We just need to get rid of the "k8s://" prefix here.
val master = sparkConf.get("spark.master").substring("k8s://".length)
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None

val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(
Expand Down

0 comments on commit 2fe1633

Please sign in to comment.