From 3ff2cbbce9b3f515e2070a97433ac5ded2451a7d Mon Sep 17 00:00:00 2001 From: Anirudh Ramanathan Date: Tue, 7 Nov 2017 16:59:02 -0800 Subject: [PATCH] Spark Submit changes and test (#542) * Spark Submit Unit tests * Improvements * Add missing options * Added check for jar --- .../org/apache/spark/deploy/SparkSubmit.scala | 10 +++---- .../spark/deploy/SparkSubmitSuite.scala | 26 +++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 85b76013ba5f3..38b57c1a01279 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -483,20 +483,20 @@ object SparkSubmit extends CommandLineUtils { sysProp = "spark.kubernetes.namespace"), // Other options - OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, + OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, sysProp = "spark.executor.cores"), - OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, + OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), - OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, + OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, sysProp = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, sysProp = "spark.files"), OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"), OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, sysProp = "spark.jars"), - OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER, + OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, sysProp = "spark.driver.memory"), - OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER, + OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, sysProp = "spark.driver.cores"), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, sysProp = "spark.driver.supervise"), diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 6fa3a09b2ef1e..faaa60269686d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -381,6 +381,32 @@ class SparkSubmitSuite sysProps("spark.ui.enabled") should be ("false") } + test("handles k8s cluster mode") { + val clArgs = Seq( + "--deploy-mode", "cluster", + "--master", "k8s://h:p", + "--executor-memory", "5g", + "--class", "org.SomeClass", + "--kubernetes-namespace", "foo", + "--driver-memory", "4g", + "--conf", "spark.kubernetes.driver.docker.image=bar", + "/home/thejar.jar", + "arg1") + val appArgs = new SparkSubmitArguments(clArgs) + val (childArgs, classpath, sysProps, mainClass) = prepareSubmitEnvironment(appArgs) + + val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap + childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar")) + childArgsMap.get("--main-class") should be (Some("org.SomeClass")) + childArgsMap.get("--arg") should be (Some("arg1")) + mainClass should be ("org.apache.spark.deploy.k8s.submit.Client") + classpath should have length (0) + sysProps("spark.executor.memory") should be ("5g") + sysProps("spark.driver.memory") should be ("4g") + sysProps("spark.kubernetes.namespace") should be ("foo") + sysProps("spark.kubernetes.driver.docker.image") should be ("bar") + } + test("handles confs with flag equivalents") { val clArgs = Seq( "--deploy-mode", "cluster",