From d5f24aae143f2a0dd82b676bae5e4ebe730f0462 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 7 Aug 2014 16:02:33 -0700 Subject: [PATCH] [SPARK-2678][Core] Backport PR #1801 to branch-1.0 Backport SPARK-2678 fix in PR #1801 to branch-1.0 Author: Cheng Lian Closes #1831 from liancheng/spark-2678-for-1.0 and squashes the following commits: cc59929 [Cheng Lian] Backported SPARK-2678 fix --- .../spark/deploy/SparkSubmitArguments.scala | 39 +++++++------------ .../spark/deploy/SparkSubmitSuite.scala | 12 ++++++ 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 57655aa4c32b1..4aab5a7ee51f1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -202,8 +202,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { /** Fill in values by parsing user options. */ private def parseOpts(opts: Seq[String]): Unit = { - // Delineates parsing of Spark options from parsing of user options. - var inSparkOpts = true + val EQ_SEPARATED_OPT = """(--[^=]+)=(.+)""".r + parse(opts) def parse(opts: Seq[String]): Unit = opts match { @@ -297,33 +297,20 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { verbose = true parse(tail) + case EQ_SEPARATED_OPT(opt, value) :: tail => + parse(opt :: value :: tail) + + case value :: tail if value.startsWith("-") => + SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.") + case value :: tail => - if (inSparkOpts) { - value match { - // convert --foo=bar to --foo bar - case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 => - val parts = v.split("=") - parse(Seq(parts(0), parts(1)) ++ tail) - case v if v.startsWith("-") => - val errMessage = s"Unrecognized option '$value'." - SparkSubmit.printErrorAndExit(errMessage) - case v => - primaryResource = - if (!SparkSubmit.isShell(v)) { - Utils.resolveURI(v).toString - } else { - v - } - inSparkOpts = false - isPython = SparkSubmit.isPython(v) - parse(tail) - } + primaryResource = if (!SparkSubmit.isShell(value)) { + Utils.resolveURI(value).toString } else { - if (!value.isEmpty) { - childArgs += value - } - parse(tail) + value } + isPython = SparkSubmit.isPython(value) + childArgs ++= tail case Nil => } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 02427a4a83506..961f9a7e905db 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -106,6 +106,18 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { appArgs.childArgs should be (Seq("some", "--weird", "args")) } + test("handles arguments to user program with name collision") { + val clArgs = Seq( + "--name", "myApp", + "--class", "Foo", + "userjar.jar", + "--master", "local", + "some", + "--weird", "args") + val appArgs = new SparkSubmitArguments(clArgs) + appArgs.childArgs should be (Seq("--master", "local", "some", "--weird", "args")) + } + test("handles YARN cluster mode") { val clArgs = Seq( "--deploy-mode", "cluster",