Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats. #6082

Closed
wants to merge 6 commits into from
Closed

Conversation

zzvara
Copy link
Contributor

@zzvara zzvara commented May 12, 2015

Added faster RM-heartbeats on pending container allocations with multiplicative back-off.
Also updated related documentations.

zzvara added 2 commits May 12, 2015 15:16
Added faster RM-heartbeats on pending container allocations with multiplicative back-off.
Also updated related documentations.
Added faster RM-heartbeats on pending container allocations with multiplicative back-off.
Also updated related documentations.
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@@ -74,6 +74,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
<td>5000</td>
<td>
The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager.
To avoid the application master to be expired by late reporting, if a higher value is provided, the interval will be set to the half of the expiry interval in YARN's configuration <code>(yarn.am.liveness-monitor.expiry-interval-ms / 2)</code>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind breaking these long lines into multiple lines?

I'd also rephrase this: "The value is capped at half the value of YARN's configuration for the expiry interval (yarn.am.liveness-monitor.expiry-interval-ms)."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

@vanzin
Copy link
Contributor

vanzin commented May 12, 2015

Looks sane, mostly style nits. If you don't mind we should deprecate spark.yarn.scheduler.heartbeat.interval-ms (add an entry in SparkConf for that) and rename it so that it drops the unit from the name.

@zzvara
Copy link
Contributor Author

zzvara commented May 12, 2015

Okay, I will follow your guides.

currentAllocationInterval =
math.min(heartbeatInterval,currentAllocationInterval * 2)
logDebug(s"Number of pending allocations is ${numPendingAllocate}. " +
"Sleeping for " + currentAllocationInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be indented two spaces after the start of the preceding line.

@zzvara
Copy link
Contributor Author

zzvara commented May 13, 2015

Would anyone verify this patch?

@sryza
Copy link
Contributor

sryza commented May 13, 2015

jenkins, test this please

@tgravescs
Copy link
Contributor

so do we really want the backoff here? It complicates the logic and makes it unpredictable as to when exactly it heartbeats. It also becomes useless very quickly. If I just get in a queue that doesn't have space immediately then I stop asking quicker but I still needs containers.

It seems to me if we just have one for when we want containers and one for when we don't it should be good. If after that we still run into problems we can revisit it. Also having YARN tell us when to backoff would be best: https://issues.apache.org/jira/browse/YARN-3630

@sryza
Copy link
Contributor

sryza commented May 13, 2015

@tgravescs that was going to be my feedback as well. However, what about situations where someone requests more containers than YARN can allocate? I think this is a pretty reasonable thing to do because it's fairly cumbersome for users to check the size of the YARN pool they're running and then triangulate with their --executor-cores and --executor-memory to pick the exact number of executors they're allowed. We would end up always heartbeating at the fast rate.

@zzvara
Copy link
Contributor Author

zzvara commented May 13, 2015

I've opened that issue for YARN, but it's not a good practice to rely on that.

Multiplicative back-off is very ancient practice, not so hard to predict and decreases congestion nicely. There are more effective models in network rate-limiting, but it's simple and effective. We just can't HB in every 200ms, basically when our first HB was unsuccessful for containers, there's only a little more chance we got that the next one will be successful. Also, consider a contested server with thousands of Spark jobs. But, yeah. We want to provide a faster start-up for jobs on clusters with a lot of free resources. So we start with 200ms.

@tgravescs
Copy link
Contributor

Normally I would expect us to grab most of it up front and then run for a while without needing more, then perhaps iterate if dynamic allocation is on and we go to different stage. it seems like a person should know pretty quickly after initial testing what their queue limits are, but a valid point.

It just seems this kind of limits the usefulness of it. If the cluster is idle it does let me get stuff quicker, but if the cluster is idle then just decrease your heartbeat anyway. I think having just the 2 configs works the same in that case. if its idle with lots of free resources, I get them and then go to the slower heartbeat.
If the cluster or queue is at all busy you might be delayed and then I'm back to asking slower and this change does me no good. personally I would set the heartbeat when need containers at like 2 or 3 seconds and leave the other at 5 seconds. Making it 200ms for the entire time would be to much load on RM atleast for our clusters.

Also if the cluster is idle with lots of resources you should get them the first time before the sleep even happens, right? We call allocate before launching the reporter thread, which will do an allocate that should get response from first one before doing the sleep.

Have you run this on various conditions to see what actually happens?

@sryza
Copy link
Contributor

sryza commented May 13, 2015

"If the cluster is idle then just decrease your heartbeat anyway"

I think this is part of what we're trying to automate. I don't think a user (or cluster administrator) should need to think about this at all. From their perspective, they just want to get containers as fast as possible with spamming the RM.

@tgravescs
Copy link
Contributor

I agree it should be automated or have reasonable default (5 is definitely high), but I think there are so many different configs and possible setups that is very hard to do based on the current yarn RM. How fast you get containers is factor of to many things - nm heartbeats, size of cluster, load on cluster, etc.
We could just change default to 1 second like MR and have folks like myself change our default to be higher for our clusters.
I'm not necessarily against this, my question is how useful is it in the current form or could it be better? For instance, have we run any tests wit this? Does using the backoff differ any then having 2 configs and setting one 1 second and the other (when I don't need containers) to 3 or 5 seconds? The difference there is in 1400ms I heartbeat in 3 times compared to 1 time in 1 second. How does that affect the overall job time? In this case I'm assuming we are mostly concerned with small jobs as a few seconds in large jobs shouldn't show up. It would be nice to see some numbers on that. If my job is delayed even a few seconds then this doesn't have any affect at all. So if I run a simple test being second in the queue where the first app is asking for containers I assume this does nothing? Could it better having the 2 configs where it heartbeats faster when I need containers but still doesn't overwhelm RM.

If there is a difference great, lets go with this, if not is it really necessarily or is there a better option. Then the question comes down to RM load. If we do end up heartbeating in every 1 second does that hurt the RM. This again is going to be very cluster dependent. I would guess on most small and medium clusters its fine. Folks with larger clusters can configure it up slightly.

@zzvara
Copy link
Contributor Author

zzvara commented May 14, 2015

@tgravescs Of course it differs from when you have a constant HB interval. In any case you wish to have an adaptive solution or the one they've implemented in MR? I can easily set up a scenario for you, where a 1 second of HB interval will stress the RM and every user will suffer. I really don't see your point here. Multiplicative back-off is a very simple and adaptive solution. What you are trying to do is not adaptive to the cluster, thus being not adaptive to the user, because practically the user sits on the cluster.

Also if the cluster is idle with lots of resources you should get them the first time before the sleep even happens, right? We call allocate before launching the reporter thread, which will do an allocate that should get response from first one before doing the sleep.

No. There is not everything about free resources, but the RM has its own logic that needs time to complete. 200ms could be tuned, but back-off is essential here.

@zzvara
Copy link
Contributor Author

zzvara commented May 14, 2015

Can we test this, please?

@srowen
Copy link
Member

srowen commented May 14, 2015

ok to test

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/33043/
Test FAILed.

@zzvara
Copy link
Contributor Author

zzvara commented May 19, 2015

Oops, what happened there? Changed one character only.

@srowen
Copy link
Member

srowen commented May 19, 2015

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 19, 2015

Test build #33080 has started for PR 6082 at commit a1d2101.

@SparkQA
Copy link

SparkQA commented May 19, 2015

Test build #33080 has finished for PR 6082 at commit a1d2101.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/33080/
Test PASSed.

@tgravescs
Copy link
Contributor

@srowen Based on the recent conversations are you ok with this going in?

@srowen
Copy link
Member

srowen commented May 19, 2015

Yeah I defer to your judgment @tgravescs . I suppose it wasn't obvious to me this is a win in whatever a normal case is, and your test indicated it wasn't in your case. So I'm a little uncomfortable with the logic that it should go in because it helps in theory, and a test must be bad if it disagrees. @ehnalis it's not true that this can't do any harm, as you even say. Yes, you can tune away the harm in that type of case, but then you've put in another lever to know about to get it tuned. I can appreciate the argument that adaptiveness is likely to be better in more cases than it's worse, even at defaults. I know start-up time is an issue. I don't object to merging to master, so proceed if everyone is comfortable with the logic and defaults.

@tgravescs
Copy link
Contributor

+1 and merging this.

@asfgit asfgit closed this in 3ddf051 May 20, 2015
@andrewor14
Copy link
Contributor

This caused the following (benign) exception on the AM:

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "Reporter" java.lang.IllegalArgumentException: timeout value is negative
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:356)

@vanzin
Copy link
Contributor

vanzin commented May 21, 2015

Is it benign? Won't that cause the reporter thread to stop? IllegalArgumentException is not caught in the block that calls Thread.sleep.

@andrewor14
Copy link
Contributor

No, I thought it was benign at first because this stops a "reporter thread", but apparently this thread does more than just reporting? I have filed SPARK-7775.

@andrewor14
Copy link
Contributor

I actually don't see how sleepInterval would ever be negative.

val sleepInterval =
  if (numPendingAllocate > 0) {
    val currentAllocationInterval =
      math.min(heartbeatInterval, nextAllocationInterval)
    nextAllocationInterval *= 2
    currentAllocationInterval
  } else {
    nextAllocationInterval = initialAllocationInterval
    heartbeatInterval
  }

According to my logs:

2015-05-20 23:49:21,430 INFO  [main] yarn.ApplicationMaster (Logging.scala:logInfo(59)) -
Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals

It appears that heartbeatInterval and initialAllocationInterval are both >= 0, and this is the only place where we set nextAllocationInterval. At the very least we know it went into the if case when this happened.

@vanzin
Copy link
Contributor

vanzin commented May 21, 2015

Hmmm...

nextAllocationInterval *= 2

With enough iterations this will overflow and become negative, and then math.min will choose that value...

@andrewor14
Copy link
Contributor

Ah... overflow. That's probably it. It seems that we need a cap on the interval.

@andrewor14
Copy link
Contributor

Fix @ #6305.

asfgit pushed a commit that referenced this pull request May 21, 2015
```
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "Reporter" java.lang.IllegalArgumentException: timeout value is negative
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:356)
```
This kills the reporter thread. This is caused by #6082 (merged into master branch only).

Author: Andrew Or <andrew@databricks.com>

Closes #6305 from andrewor14/yarn-negative-sleep and squashes the following commits:

b970770 [Andrew Or] Use existing cap
56d6e5e [Andrew Or] Avoid negative sleep
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
Added faster RM-heartbeats on pending container allocations with multiplicative back-off.
Also updated related documentations.

Author: ehnalis <zoltan.zvara@gmail.com>

Closes apache#6082 from ehnalis/yarn and squashes the following commits:

a1d2101 [ehnalis] MIss-spell fixed.
90f8ba4 [ehnalis] Changed default HB values.
6120295 [ehnalis] Removed the bug, when allocation heartbeat would not start from initial value.
08bac63 [ehnalis] Refined style, grammar, removed duplicated code.
073d283 [ehnalis] [SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats.
d4408c9 [ehnalis] [SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats.
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
```
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "Reporter" java.lang.IllegalArgumentException: timeout value is negative
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:356)
```
This kills the reporter thread. This is caused by apache#6082 (merged into master branch only).

Author: Andrew Or <andrew@databricks.com>

Closes apache#6305 from andrewor14/yarn-negative-sleep and squashes the following commits:

b970770 [Andrew Or] Use existing cap
56d6e5e [Andrew Or] Avoid negative sleep
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
Added faster RM-heartbeats on pending container allocations with multiplicative back-off.
Also updated related documentations.

Author: ehnalis <zoltan.zvara@gmail.com>

Closes apache#6082 from ehnalis/yarn and squashes the following commits:

a1d2101 [ehnalis] MIss-spell fixed.
90f8ba4 [ehnalis] Changed default HB values.
6120295 [ehnalis] Removed the bug, when allocation heartbeat would not start from initial value.
08bac63 [ehnalis] Refined style, grammar, removed duplicated code.
073d283 [ehnalis] [SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats.
d4408c9 [ehnalis] [SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats.
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
```
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "Reporter" java.lang.IllegalArgumentException: timeout value is negative
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:356)
```
This kills the reporter thread. This is caused by apache#6082 (merged into master branch only).

Author: Andrew Or <andrew@databricks.com>

Closes apache#6305 from andrewor14/yarn-negative-sleep and squashes the following commits:

b970770 [Andrew Or] Use existing cap
56d6e5e [Andrew Or] Avoid negative sleep
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
Added faster RM-heartbeats on pending container allocations with multiplicative back-off.
Also updated related documentations.

Author: ehnalis <zoltan.zvara@gmail.com>

Closes apache#6082 from ehnalis/yarn and squashes the following commits:

a1d2101 [ehnalis] MIss-spell fixed.
90f8ba4 [ehnalis] Changed default HB values.
6120295 [ehnalis] Removed the bug, when allocation heartbeat would not start from initial value.
08bac63 [ehnalis] Refined style, grammar, removed duplicated code.
073d283 [ehnalis] [SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats.
d4408c9 [ehnalis] [SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats.
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
```
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "Reporter" java.lang.IllegalArgumentException: timeout value is negative
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:356)
```
This kills the reporter thread. This is caused by apache#6082 (merged into master branch only).

Author: Andrew Or <andrew@databricks.com>

Closes apache#6305 from andrewor14/yarn-negative-sleep and squashes the following commits:

b970770 [Andrew Or] Use existing cap
56d6e5e [Andrew Or] Avoid negative sleep
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants