Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Allow setting memory on the driver submission server. #161

Merged

Conversation

mccheah
Copy link

@mccheah mccheah commented Feb 28, 2017

Note that the setting here is in addition to setting spark.driver.memory. This is so that the memory request and memory limit on the driver can take into account both of these amounts.

@@ -156,6 +168,14 @@ package object config {
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY =
ConfigBuilder("spark.kubernetes.driver.submitServerMemory")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

submit server is another name for the rest server that receives the submit command? We might have some terminology inconsistencies for this service in docs/code at this point

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been trying to refer to this as the submission server everywhere but we can audit at a later point.

| allocated for the driver. This is memory that accounts for
| things like VM overheads, interned strings, other native
| overheads, etc. This tends to grow with the driver's memory
| size (typically 6-10%).
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc should say whether this overhead applies to (driver + submit server) or just the driver

.withNewResources()
.addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryLimitQuantity)
.endResources()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what were the defaults that we were relying on before?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no reliable default - probably just defaults to the underlying container runtime.

</tr>
<tr>
<td><code>spark.kubernetes.driver.memoryOverhead</code></td>
<td>(driverMemory + driverSubmissionServerMemory) * 0.10, with minimum of 384 </td>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space right before td close tag

@ash211
Copy link

ash211 commented Feb 28, 2017

This looks like it does indeed expose a config value for the size of the driver and the submission service separately, and pass that config through to JVMs and the k8s pod request.

For working around #146 I would set spark.kubernetes.driver.submissionServerMemory to a larger value and try again.

@foxish are you able to take a quick look?

Also curious if @ssuchter has seen any OOM issues with real-world Spark jobs, where files and jars are much larger than Spark's example jobs jar.

@ash211
Copy link

ash211 commented Mar 1, 2017

@mccheah Compile fails here:

[error] /home/user/git/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala:70: recursive value driverSubmitServerMemoryBytes needs type
[error]   private val driverSubmitServerMemoryBytes = Utils.byteStringAsBytes(driverSubmitServerMemoryBytes)
[error]                                                                       ^

@ash211
Copy link

ash211 commented Mar 1, 2017

Ok verified that this change does indeed make the submission server's heap configurable.

I tested by creating a 32MB random file and attempting to submit it using --files

$ dd if=/dev/urandom of=/tmp/32mb.random bs=1M count=32
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster --master k8s://10.0.20.108:6443 --conf spark.executor.instances=5 --conf spark.kubernetes.driver.docker.image=ash211/testrepo:driver-latest$V --conf spark.kubernetes.executor.docker.image=ash211/testrepo:executor-latest$V --files /tmp/32mb.random ./examples/jars/spark-examples_2.11-2.2.0-SNAPSHOT.jar 1000

shows this failure in driver pod's logs that the heap filled up:

2017-03-01 01:46:05 INFO  KubernetesSparkRestServer:54 - Started REST server for submitting applications on port 7077
2017-03-01 01:46:36 WARN  ServletHandler:667 - Error for /v1/submissions/create
java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:596)
	at java.lang.StringBuilder.append(StringBuilder.java:190)
	at scala.collection.mutable.StringBuilder.appendAll(StringBuilder.scala:249)
	at scala.io.BufferedSource.mkString(BufferedSource.scala:97)
	at org.apache.spark.deploy.rest.SubmitRequestServlet.doPost(RestSubmissionServer.scala:282)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
	at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:812)
	at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
	at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
	at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
	at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
	at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
	at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
	at org.spark_project.jetty.server.Server.handle(Server.java:499)
	at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:311)
	at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
	at org.spark_project.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
	at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
	at org.spark_project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
	at java.lang.Thread.run(Thread.java:745)
2017-03-01 01:46:36 WARN  HttpParser:1355 - badMessage: java.lang.IllegalStateException: too much data after closed for HttpChannelOverHttp@6a3142fe{r=2,c=false,a=IDLE,uri=}
2017-03-01 01:46:36 WARN  ServletHandler:667 - Error for /v1/submissions/create
java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:596)
	at java.lang.StringBuilder.append(StringBuilder.java:190)
	at scala.collection.mutable.StringBuilder.appendAll(StringBuilder.scala:249)
	at scala.io.BufferedSource.mkString(BufferedSource.scala:97)
	at org.apache.spark.deploy.rest.SubmitRequestServlet.doPost(RestSubmissionServer.scala:282)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
	at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:812)
	at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)
	at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
	at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
	at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
	at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
	at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
	at org.spark_project.jetty.server.Server.handle(Server.java:499)
	at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:311)
	at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
	at org.spark_project.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
	at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
	at org.spark_project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
	at java.lang.Thread.run(Thread.java:745)

but running with the new higher memory flag: --conf spark.kubernetes.driver.submissionServerMemory=512m like this:

./bin/spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster --master k8s://10.0.20.108:6443 --conf spark.executor.instances=5 --conf spark.kubernetes.driver.docker.image=ash211/testrepo:driver-latest$V --conf spark.kubernetes.executor.docker.image=ash211/testrepo:executor-latest$V --conf spark.kubernetes.driver.submissionServerMemory=512m --files /tmp/32mb.random ./examples/jars/spark-examples_2.11-2.2.0-SNAPSHOT.jar 1000

causes the job to complete successfully.

So I think that confirms that this config does what we expect.

And as a side note, it's kind of crazy that there are enough copies and base64 inflation in the current implementation that a 256mb heap doesn't support uploading files that are only 32mb in the default config.

file size | heap size | success
  32mb    |   512mb   | Y
  32mb    |   256mb   | N
  32mb    |   384mb   | N
  32mb    |   448mb   | N
  32mb    |   480mb   | Y
  32mb    |   464mb   | N

@@ -63,6 +63,19 @@ private[spark] class Client(
.map(_.split(","))
.getOrElse(Array.empty[String])

// Memory settings
private val driverMemory = sparkConf.get("spark.driver.memory", "1g")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)?

.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverContainerMemoryBytes).toInt,
MEMORY_OVERHEAD_MIN))
private val driverContainerMemoryWithOverhead = driverContainerMemoryBytes + memoryOverheadBytes

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the above calculation to config.scala?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather leave config.scala for just the config values and keys - there should be no logic there.

@@ -60,7 +60,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
.getOrElse(
throw new SparkException("Must specify the driver pod name"))

private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g")
private val executorMemory = conf.get("spark.executor.memory", "1g")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can also use conf.get(EXECUTOR_MEMORY)

@ash211
Copy link

ash211 commented Mar 2, 2017

@mccheah a couple small changes on here but I think this is basically there.

@mccheah mccheah force-pushed the configurable-driver-memory branch from a0d6165 to 7321a3e Compare March 3, 2017 00:19
…te-incremental' into configurable-driver-memory
@ash211
Copy link

ash211 commented Mar 3, 2017

Any further comments @lins05 ?

@ash211
Copy link

ash211 commented Mar 3, 2017

Want to make sure this gets in before the code freeze today.

@foxish
Copy link
Member

foxish commented Mar 3, 2017

Looking at this now.

private val driverSubmitServerMemoryString = sparkConf.get(
KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.key,
KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.defaultValueString)
private val driverContainerMemoryMb = driverMemoryMb + driverSubmitServerMemoryMb
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we add the two here? Shouldn't we ideally take the max(...) since only one of them is running at a time, either the submit server or the driver code?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we don't shut down the submit server after it starts the application, so both will be running for the whole time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw @ash211 opened an issue. We should solve this in a better way after the alpha.

// Memory settings
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
private val driverSubmitServerMemoryMb = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY)
private val driverSubmitServerMemoryString = sparkConf.get(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have separate driverSubmitServerMemoryMb and driverSubmitServerMemoryString and similarly for executors?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The strings are used in the environment variables so that they can be passed directly to the JVMs launch commands.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we could achieve this using Utils.memoryStringToMb. Just a minor nit because it seems like repeated logic to fetch the same parameter in different ways.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trouble there is when we use SparkConf.get(config) it returns us a long, not a string - where the long is the number of megabytes pre-converted from the string value. I think the issue is that we want to go in the other direction; that is, to convert the numeric value we get from SparkConf.get into a memory string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I wonder if the Utils package has a helper for that. But in any case, it isn't a major concern. Merging. Thanks!

@@ -63,6 +63,7 @@ package object constants {
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this environment variable used? I didn't see any corresponding change to the dockerfile launching the driver.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@foxish
Copy link
Member

foxish commented Mar 3, 2017

LGTM except for minor nit in comment above

@foxish foxish merged commit f6823f3 into k8s-support-alternate-incremental Mar 3, 2017
@foxish foxish deleted the configurable-driver-memory branch March 3, 2017 21:39
@ash211
Copy link

ash211 commented Mar 6, 2017

Tested this change with our internal app and verified that it can now complete the submission process by setting a large heap size for the rest service (2g suffices)

ash211 pushed a commit that referenced this pull request Mar 8, 2017
* Allow setting memory on the driver submission server.

* Address comments

* Address comments
foxish pushed a commit that referenced this pull request Jul 24, 2017
* Allow setting memory on the driver submission server.

* Address comments

* Address comments
ifilonenko pushed a commit to ifilonenko/spark that referenced this pull request Feb 25, 2019
…n-k8s#161)

* Allow setting memory on the driver submission server.

* Address comments

* Address comments

(cherry picked from commit f6823f3)
puneetloya pushed a commit to puneetloya/spark that referenced this pull request Mar 11, 2019
…n-k8s#161)

* Allow setting memory on the driver submission server.

* Address comments

* Address comments
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants