Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Update Spark CLI for extraJavaOptions and Fix SubmitBuilder tests #370

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions cli/dcos-spark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,7 @@ type SparkCommand struct {
}

func (cmd *SparkCommand) runSubmit(a *kingpin.Application, e *kingpin.ParseElement, c *kingpin.ParseContext) error {
marathonConfig, err := fetchMarathonConfig()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW it may be necessary to make dispatcher additions in order for this to be removable? I don't remember for sure.

Effectively the dispatcher could grab these values from its environment and include them in the task automatically, instead of requiring the CLI to pass them in from the outside.

if err != nil {
return err
}

jsonPayload, err := buildSubmitJson(cmd, marathonConfig)
jsonPayload, err := buildSubmitJson(cmd)
if err != nil {
return err
}
Expand Down
78 changes: 1 addition & 77 deletions cli/dcos-spark/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func GetRandomStringSecret() (string, error) {
}

// KERBEROS
func SetupKerberos(args *sparkArgs, marathonConfig map[string]interface{}) error {
func SetupKerberos(args *sparkArgs) error {
kerberosPrincipal := getPrincipal(args)
if kerberosPrincipal != "" {
err := validateKerberosInputs(args)
Expand All @@ -169,9 +169,6 @@ func SetupKerberos(args *sparkArgs, marathonConfig map[string]interface{}) error
// re-assign if the user used spark.yarn.principal
args.properties["spark.yarn.principal"] = args.kerberosPrincipal

// krb5.conf forwarding of environment variables:
forwardEnvironmentVariablesFromMarathonConfig(args, marathonConfig)

// mesos secrets
err = setupKerberosSecretsConfigs(args)
if err != nil {
Expand Down Expand Up @@ -205,79 +202,6 @@ func validateKerberosInputs(args *sparkArgs) error {
return nil
}

func propertyGiven(marathonJson map[string]interface{}) func(path []string) (bool, string) {
_marathonJson := marathonJson
return func(_path []string) (bool, string) {
value, err := getStringFromTree(_marathonJson, _path)
if value == "" && err != nil {
return false, value
}
return true, value
}
}

func addEnvvarToDriverAndExecutor(args *sparkArgs, key, value string) {
driverProp := fmt.Sprintf("spark.mesos.driverEnv.%s", key)
executorProp := fmt.Sprintf("spark.executorEnv.%s", key)
_, contains := args.properties[driverProp]
if !contains {
args.properties[driverProp] = value
}
_, contains = args.properties[executorProp]
if !contains {
args.properties[executorProp] = value
}
}

func forwardEnvironmentVariablesFromMarathonConfig(args *sparkArgs, marathonJson map[string]interface{}) {
propertyChecker := propertyGiven(marathonJson)
// We allow the user to set SPARK_SECURITY_KERBEROS_KDC_HOSTNAME and SPARK_SECURITY_KERBEROS_KDC_PORT, and
// SPARK_SECURITY_KERBEROS_REALM, these values will be used to template a krb5.conf. If the user sets
// SPARK_MESOS_KRB5_CONF_BASE64 it will be overwritten, but log a warning to be sure.
kdcPropCount := 0
given, value := propertyChecker([]string{"app", "env", SPARK_KDC_HOSTNAME_KEY})
if given {
client.PrintMessage("Using KDC hostname '%s' from dispatcher env:%s", value, SPARK_KDC_HOSTNAME_KEY)
addEnvvarToDriverAndExecutor(args, SPARK_KDC_HOSTNAME_KEY, value)
kdcPropCount += 1
}

given, value = propertyChecker([]string{"app", "env", SPARK_KDC_PORT_KEY})
if given {
client.PrintMessage("Using KDC port '%s' from dispatcher env:%s", value, SPARK_KDC_PORT_KEY)
addEnvvarToDriverAndExecutor(args, SPARK_KDC_PORT_KEY, value)
kdcPropCount += 1
}

given, value = propertyChecker([]string{"app", "env", SPARK_KERBEROS_REALM_KEY})
if given {
client.PrintMessage("Using KDC realm '%s' from dispatcher env:%s", value, SPARK_KERBEROS_REALM_KEY)
addEnvvarToDriverAndExecutor(args, SPARK_KERBEROS_REALM_KEY, value)
kdcPropCount += 1
}

if kdcPropCount > 0 && kdcPropCount != 3 {
client.PrintMessage(
"WARNING: Missing some of the 3 dispatcher environment variables (%s, %s, %s) " +
"required for templating krb5.conf",
SPARK_KDC_HOSTNAME_KEY, SPARK_KDC_PORT_KEY, SPARK_KERBEROS_REALM_KEY)
}

given, value = propertyChecker([]string{"app", "env", SPARK_KERBEROS_KRB5_BLOB})
if given {
if kdcPropCount > 0 {
client.PrintMessage(
"WARNING: Found base64-encoded krb5.conf in dispatcher env:%s, ignoring %s, %s, and %s",
SPARK_KERBEROS_KRB5_BLOB, SPARK_KDC_HOSTNAME_KEY, SPARK_KDC_PORT_KEY, SPARK_KERBEROS_REALM_KEY)
}
addEnvvarToDriverAndExecutor(args, SPARK_KERBEROS_KRB5_BLOB, value)
} else {
if kdcPropCount == 0 {
client.PrintMessage("No KDC krb5.conf parameters were found in the dispatcher Marathon configuration")
}
}
}

func addConfigForKerberosKeytabs(args *sparkArgs, secretPath, property string) {
keytabEnvVarPlaceholder := "DCOS_SPARK_KERBEROS_KEYTAB"
for _, taskType := range TASK_TYPES {
Expand Down
130 changes: 55 additions & 75 deletions cli/dcos-spark/submit_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type sparkVal struct {
func (f *sparkVal) flag(section *kingpin.Application) *kingpin.Clause {
return section.Flag(f.flagName, fmt.Sprintf("%s (%s)", f.desc, f.propName))
}

func newSparkVal(flagName, propName, desc string) *sparkVal {
return &sparkVal{flagName, propName, desc, "", false}
}
Expand Down Expand Up @@ -197,6 +198,10 @@ Args:
val.flag(submit).StringVar(&val.s)
args.stringVals = append(args.stringVals, val)

val = newSparkVal("executor-java-options", "spark.executor.extraJavaOptions", "Extra Java options to pass to the executors.")
val.flag(submit).StringVar(&val.s)
args.stringVals = append(args.stringVals, val)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we need to add this new option. The user can specify this with "--conf spark.executor.extraJavaOptions".

val = newSparkVal("driver-library-path", "spark.driver.extraLibraryPath", "Extra library path entries to pass to the driver.")
val.flag(submit).StringVar(&val.s)
args.stringVals = append(args.stringVals, val)
Expand Down Expand Up @@ -282,7 +287,7 @@ func parseApplicationFile(args *sparkArgs) error {

func cleanUpSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string) {

// collapse two or more spaces to one.
// collapse two or more spaces to one.
argsCompacted := collapseSpacesPattern.ReplaceAllString(argsStr, " ")
// clean up any instances of shell-style escaped newlines: "arg1\\narg2" => "arg1 arg2"
argsCleaned := strings.TrimSpace(backslashNewlinePattern.ReplaceAllLiteralString(argsCompacted, " "))
Expand All @@ -292,6 +297,7 @@ func cleanUpSubmitArgs(argsStr string, boolVals []*sparkVal) ([]string, []string
argsEquals := make([]string, 0)
appFlags := make([]string, 0)
i := 0
inQuotes := false
ARGLOOP:
for i < len(args) {
arg := args[i]
Expand Down Expand Up @@ -322,11 +328,16 @@ ARGLOOP:
}
break
}
// Parse Spark configuration:
// join this arg to the next arg if...:
// 1. we're not at the last arg in the array
// 2. we start with "--"
// 3. we don't already contain "=" (already joined)
// 4. we aren't a boolean value (no val to join)


// if this is a configuration flag like --conf or --driver-driver-options that doesn't have a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "--driver-driver-options" should be "--driver-java-options"

// '=' for assignment.
if i < len(args)-1 && strings.HasPrefix(arg, "--") && !strings.Contains(arg, "=") {
// check for boolean:
for _, boolVal := range boolVals {
Expand All @@ -336,12 +347,36 @@ ARGLOOP:
continue ARGLOOP
}
}
// merge this --key against the following val to get --key=val
argsEquals = append(argsEquals, arg+"="+args[i+1])

// if this is the beginning of a string of args e.g. '-Djava.option=setting -Djava.paramter=nonsense'
// we want to remove the leading single quote. Also remove internal quotes when the arg == --conf or some
// other named configuration
// e.g.: next = spark.driver.extraJavaOptions='-Djava.something=somethingelse
// arg = --conf
arg = strings.TrimPrefix(arg, "'")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow why we need to trim the prefix here and then check why next also has the prefix.

next := args[i + 1]
if strings.HasPrefix(next, "'") { // e.g. --driver-java-options '-Djava.config=setting... <-- next
inQuotes = true
}
next = strings.Replace(next, "'", "", -1) // remove internal quotes
argsEquals = append(argsEquals, arg + "=" + next)
i += 2
} else if strings.HasSuffix(arg, "'") { // attach the final arg to the string of args without the quote
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about escaped quotes? \'?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added the test:

	   java_options = []string{"-Djava.thirdConfig=\\'thirdSetting\\'"}
	   inputArgs = "--driver-java-option='-Djava.thirdConfig=\\'thirdSetting\\'' --conf spark.cores.max=8"
	   suite.testLongArgInternal(inputArgs, java_options)

and get the following stacktrace:

panic: runtime error: index out of range [recovered]
	panic: runtime error: index out of range

goroutine 52 [running]:
testing.tRunner.func1(0xc4200780d0)
	/usr/local/go/src/testing/testing.go:622 +0x29d
panic(0x1423780, 0x169ff80)
	/usr/local/go/src/runtime/panic.go:489 +0x2cf
_/Users/elezar/src/mesosphere/spark-build/cli/dcos-spark.cleanUpSubmitArgs(0x14a744a, 0x53, 0xc4204a6b28, 0x1, 0x1, 0xc420055958, 0x10b70be, 0x16a0710, 0x0, 0x0, ...)
	/Users/elezar/src/mesosphere/spark-build/cli/dcos-spark/submit_builder.go:367 +0x3d8
_/Users/elezar/src/mesosphere/spark-build/cli/dcos-spark.(*CliTestSuite).testLongArgInternal(0xc420195260, 0x14a744a, 0x53, 0xc420055a78, 0x1, 0x1)
	/Users/elezar/src/mesosphere/spark-build/cli/dcos-spark/submit_builder_test.go:80 +0x7e
_/Users/elezar/src/mesosphere/spark-build/cli/dcos-spark.(*CliTestSuite).TestStringLongArgs(0xc420195260)
	/Users/elezar/src/mesosphere/spark-build/cli/dcos-spark/submit_builder_test.go:113 +0x333
reflect.Value.call(0xc4204b4720, 0xc4204a6260, 0x13, 0x148fb5d, 0x4, 0xc420045f80, 0x1, 0x1, 0xc420032ec0, 0x148e620, ...)
	/usr/local/go/src/reflect/value.go:434 +0x91f
reflect.Value.Call(0xc4204b4720, 0xc4204a6260, 0x13, 0xc420032f80, 0x1, 0x1, 0x13d46c6, 0x12, 0x0)
	/usr/local/go/src/reflect/value.go:302 +0xa4
github.com/stretchr/testify/suite.Run.func2(0xc4200780d0)
	/Users/elezar/go/src/github.com/stretchr/testify/suite/suite.go:102 +0x25f
testing.tRunner(0xc4200780d0, 0xc420380e00)
	/usr/local/go/src/testing/testing.go:657 +0x96
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:697 +0x2ca
--driver-java-option='-Djava.thirdConfig=\'thirdSetting\'' --conf spark.cores.max=8exit status 2
FAIL	_/Users/elezar/src/mesosphere/spark-build/cli/dcos-spark	0.029s
Error: Tests failed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the test:

inputArgs := "--driver-java-option='-Djava.thirdConfig=assetting --conf spark.cores.max=8"
java_options := []string{"-Djava.thirdConfig=thirdSetting"}
suite.testLongArgInternal(inputArgs, java_options)

Yields:

--- FAIL: TestCliTestSuite (0.00s)
    --- FAIL: TestCliTestSuite/TestIncorrectStrings (0.00s)
    	submit_builder_test.go:87: Expected to find -Djava.thirdConfig=thirdSetting at index 0. Found --driver-java-option=-Djava.thirdConfig=assetting instead

I would expect a relevant error code instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following test case:

inputArgs := "--driver-java-option='-Djava.thirdConfig=\"a setting with a space\"' --conf spark.cores.max=8"
javaOptions := []string{"-Djava.thirdConfig=\"a setting with a space\""}
suite.testLongArgInternal(inputArgs, javaOptions)

Yields:

--- FAIL: TestCliTestSuite (0.00s)
    --- FAIL: TestCliTestSuite/TestIncorrectStrings (0.00s)
    	submit_builder_test.go:82: Failed to parse --driver-java-option='-Djava.thirdConfig="a setting with a space"' --conf spark.cores.max=8, should have 2 args, got 5
    	submit_builder_test.go:87: Expected to find -Djava.thirdConfig="a setting with a space" at index 0. Found --driver-java-option=-Djava.thirdConfig="a instead

I would expect the spaces in the argument to be maintained.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option here is misspelled ("--driver-java-option"), missing an "s" at the end. So, this may be a different issue.

inQuotes = false // has suffix means we're out of the quotes
arg = strings.TrimSuffix(arg, "'")
argsEquals[len(argsEquals) - 1] = argsEquals[len(argsEquals) - 1] + " " + arg
i += 1
} else {
// already joined or at the end, pass through:
argsEquals = append(argsEquals, arg)
cleanedArg := strings.Replace(arg, "'", "", -1)
if inQuotes { // join this arg to the last one because it's all in quotes
argsEquals[len(argsEquals) - 1] = argsEquals[len(argsEquals) - 1] + " " + cleanedArg
} else {
if strings.Contains(arg, "'") { // e.g. --driver-java-options='-Djava.firstConfig=firstSetting
inQuotes = true
}
// already joined or at the end, pass through
argsEquals = append(argsEquals, cleanedArg)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can document somewhere that we expect single quotes to be used when a value has spaces in it.

i += 1
}
}
Expand Down Expand Up @@ -383,33 +418,7 @@ func getValsFromPropertiesFile(path string) map[string]string {
return vals
}

func fetchMarathonConfig() (map[string]interface{}, error) {
// fetch the spark task definition from Marathon, extract the docker image and HDFS config url:
url := client.CreateServiceURL("replaceme", "")
url.Path = fmt.Sprintf("/marathon/v2/apps/%s", config.ServiceName)

responseBytes, err := client.CheckHTTPResponse(
client.HTTPQuery(client.CreateHTTPURLRequest("GET", url, nil, "", "")))

responseJson := make(map[string]interface{})
err = json.Unmarshal(responseBytes, &responseJson)
if err != nil {
return responseJson, err
}

if config.Verbose {
client.PrintMessage("Response from Marathon lookup of task '%s':", config.ServiceName)
prettyJson, err := json.MarshalIndent(responseJson, "", " ")
if err != nil {
log.Fatalf("Failed to prettify json (%s): %s", err, responseJson)
} else {
client.PrintMessage("%s\n", string(prettyJson))
}
}
return responseJson, nil
}

func buildSubmitJson(cmd *SparkCommand, marathonConfig map[string]interface{}) (string, error) {
func buildSubmitJson(cmd *SparkCommand) (string, error) {
// first, import any values in the provided properties file (space separated "key val")
// then map applicable envvars
// then parse all -Dprop.key=propVal, and all --conf prop.key=propVal
Expand Down Expand Up @@ -483,59 +492,30 @@ func buildSubmitJson(cmd *SparkCommand, marathonConfig map[string]interface{}) (
args.properties["spark.app.name"] = args.mainClass
}

// driver image
var imageSource string
// driver image: use provided value
_, contains := args.properties["spark.mesos.executor.docker.image"]
if contains {
imageSource = "Spark config: spark.mesos.executor.docker.image"
} else {
if cmd.submitDockerImage == "" {
dispatcher_image, err := getStringFromTree(marathonConfig, []string{"app", "container", "docker", "image"})
if err != nil {
return "", err
}
args.properties["spark.mesos.executor.docker.image"] = dispatcher_image
imageSource = "dispatcher: container.docker.image"
} else {
args.properties["spark.mesos.executor.docker.image"] = cmd.submitDockerImage
imageSource = "flag: --docker-image"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing this section will change the existing behavior: if a special Docker image was provided when installing the Dispatcher, that same image will also be used for Drivers and/or Executors (see the logic below). Unless we make some changes in the Dispatcher.

if !contains && cmd.submitDockerImage != "" {
args.properties["spark.mesos.executor.docker.image"] = cmd.submitDockerImage
}

_, contains = args.properties["spark.mesos.executor.docker.forcePullImage"]
if contains {
client.PrintMessage("Using image '%s' for the driver (from %s)",
args.properties["spark.mesos.executor.docker.image"], imageSource)
} else {
client.PrintMessage("Using image '%s' for the driver and the executors (from %s).",
args.properties["spark.mesos.executor.docker.image"], imageSource)
client.PrintMessage("To disable this image on executors, set "+
"spark.mesos.executor.docker.forcePullImage=false")
if !contains {
client.PrintMessage("Enabling forcePullImage by default. " +
"To disable this, set spark.mesos.executor.docker.forcePullImage=false")
args.properties["spark.mesos.executor.docker.forcePullImage"] = "true"
}

// Get the DCOS_SPACE from the marathon app
dispatcherID, err := getStringFromTree(marathonConfig, []string{"app", "id"})
if err != nil {
client.PrintMessage("Failed to get Dispatcher app id from Marathon app definition: %s", err)
return "", err
}
client.PrintVerbose("Setting DCOS_SPACE to %s", dispatcherID)
appendToProperty("spark.mesos.driver.labels", fmt.Sprintf("DCOS_SPACE:%s", dispatcherID),
args)
appendToProperty("spark.mesos.task.labels", fmt.Sprintf("DCOS_SPACE:%s", dispatcherID),
args)

// HDFS config
hdfs_config_url, err := getStringFromTree(marathonConfig, []string{"app", "labels", "SPARK_HDFS_CONFIG_URL"})
if err == nil && len(hdfs_config_url) != 0 { // fail silently: it's normal for this to be unset
hdfs_config_url = strings.TrimRight(hdfs_config_url, "/")
appendToProperty("spark.mesos.uris",
fmt.Sprintf("%s/hdfs-site.xml,%s/core-site.xml", hdfs_config_url, hdfs_config_url), args)
// Get the DCOS_SPACE from the service name
dcosSpace := config.ServiceName
if !strings.HasPrefix(dcosSpace, "/") {
dcosSpace = "/" + dcosSpace
}
client.PrintVerbose("Setting DCOS_SPACE to %s", dcosSpace)
appendToProperty("spark.mesos.driver.labels", fmt.Sprintf("DCOS_SPACE:%s", dcosSpace), args)
appendToProperty("spark.mesos.task.labels", fmt.Sprintf("DCOS_SPACE:%s", dcosSpace), args)

// kerberos configuration:
err = SetupKerberos(args, marathonConfig)
err = SetupKerberos(args)
if err != nil {
return "", err
}
Expand Down
Loading