Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4772] Clear local copies of accumulators as soon as we're done with them #3570

Closed
wants to merge 4 commits into from
Closed

[SPARK-4772] Clear local copies of accumulators as soon as we're done with them #3570

wants to merge 4 commits into from

Conversation

nkronenfeld
Copy link
Contributor

Accumulators keep thread-local copies of themselves. These copies were only cleared at the beginning of a task. This meant that (a) the memory they used was tied up until the next task ran on that thread, and (b) if a thread died, the memory it had used for accumulators was locked up forever on that worker.

This PR clears the thread-local copies of accumulators at the end of each task, in the tasks finally block, to make sure they are cleaned up between tasks. It also stores them in a ThreadLocal object, so that if, for some reason, the thread dies, any memory they are using at the time should be freed up.

@SparkQA
Copy link

SparkQA commented Dec 3, 2014

Test build #24074 has started for PR 3570 at commit 39a82f2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 3, 2014

Test build #24074 has finished for PR 3570 at commit 39a82f2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24074/
Test PASSed.

@JoshRosen
Copy link
Contributor

Hi @nkronenfeld,

Thanks for this PR. These sorts of resource leakage issues can be tricky to debug, so thanks for spotting this. It would be great to file a dedicated JIRA for the memory-leak reported here.

I find the current Accumulators object code to be difficult to understand, so I'd be open to a larger refactoring / reorganization of that code. To handle cleanup after thread death, I think we can make localAccums into a thread-local Map, since thread-locals should get GC'd when threads die. We still need to worry about threads staying in thread pools and being re-used, though, so we should ensure that the thread-local is cleared after each task. I think we should be able to do this by moving the Accumulators.clear() in Executor.scala from the start of the task into the finally block that handles task cleanup.

…readLocal localAccums, and keeping clear(), but also calling it in tasks' finally block, rather than just at the beginning of the task.
@SparkQA
Copy link

SparkQA commented Dec 5, 2014

Test build #24189 has started for PR 3570 at commit 537baad.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 5, 2014

Test build #24189 has finished for PR 3570 at commit 537baad.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24189/
Test FAILed.

…ncompatibility should be irrelevent, as it will only surface if a master is talking to a worker running a different version of spark.
@nkronenfeld nkronenfeld changed the title Clear local copies of accumulators as soon as we're done with them [SPARK-4772] Clear local copies of accumulators as soon as we're done with them Dec 5, 2014
@SparkQA
Copy link

SparkQA commented Dec 5, 2014

Test build #24195 has started for PR 3570 at commit b6c2180.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 5, 2014

Test build #24195 has finished for PR 3570 at commit b6c2180.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24195/
Test PASSed.

@JoshRosen
Copy link
Contributor

The MiMa failure is surprising, since that class was marked as private and therefore shouldn't have been subject to compatibility checks. @ScrapCodes, do you know if this is a MiMa bug? Do we have a JIRA for this?

@nkronenfeld
Copy link
Contributor Author

Should I back out the correction to the mima failure?

@JoshRosen
Copy link
Contributor

No, I'd leave it. I just thought I'd mention it so that we eventually
investigate. I'll finish reviewing this PR later this weekend.

On Fri, Dec 5, 2014 at 7:27 PM, Nathan Kronenfeld notifications@github.com
wrote:

Should I back out the correction to the mima failure?


Reply to this email directly or view it on GitHub
#3570 (comment).

@nkronenfeld
Copy link
Contributor Author

great... I think outside the mima issue, it should be all set, unless I can figure out a way to unit test it. So far, my best methods of testing it involve instrumenting the code in ways I shouldn't check in.

@nkronenfeld
Copy link
Contributor Author

oh, a note for when you're reviewing - I didn't move the clear call, I just added a second one; I saw no particular harm in leaving the old one there too, just in case, but I can't see it doing all that much anymore - it should always be a no-op now. I'd be happier removing it if, again, I could figure out a good unit test to make sure all was functioning properly when I did so. But I would be totally open to removing it in the interests of code cleanliness if you want.

@nkronenfeld
Copy link
Contributor Author

Any word on this?

val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
}
var lastId: Long = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to your changes and I don't expect you to fix it, but this could be an AtomicInteger instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean the lastId?
That should only ever get used on the client - it's only called from the constructor of an individual accumulator, and if someone is creating one of those on a worker, they're already in trouble - so it should be ok as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was just observing that this is only read through the newIdI() method and that it's effectively being used like an AtomicInteger. Just another example of how this particular part of the code is kind of old / out-of-sync with the style of the rest of the codebase. Don't worry about it; we can do a larger cleanup pass on this later.

@JoshRosen
Copy link
Contributor

This looks good to me. If you don't mind, could you update the pull request description to more accurately describe the change that we're actually committing? This is important because that description will become the actual commit message.

Also, it looks like the MiMa issue could have been caused by private object maybe having different semantics that we expected; we ran into a similar issue over at #3622.

@andrewor14
Copy link
Contributor

LGTM, nice catch.

@kayousterhout
Copy link
Contributor

To fix the MiMA problem, can you instead make Accumulators a private[spark] object? No one I've asked seems to understand what "private" even means in this context -- private[spark] describes the desired semantics (based on my understanding), and doing that also removes the need for the MiMA exception (or at least it did for #3622)

@nkronenfeld
Copy link
Contributor Author

comment fixed. I'm trying to test the MiMa related changes to see if they work, and having problems running mima on my machine. I'll probably just push them in the suggested form to see if they pass on Jenkins.

…to get around false positive in mima tests
@SparkQA
Copy link

SparkQA commented Dec 10, 2014

Test build #24274 has started for PR 3570 at commit a581f3f.

  • This patch merges cleanly.

@JoshRosen
Copy link
Contributor

@kayousterhout I'm glad to see that changing it to private[spark] fixed things without requiring a MiMa exclude for the private -> private[spark] change. This is going to make it much easier to backport this into maintenance branches, since I won't have to move the MiMa exclude on each cherry-pick.

@SparkQA
Copy link

SparkQA commented Dec 10, 2014

Test build #24274 has finished for PR 3570 at commit a581f3f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24274/
Test PASSed.

@JoshRosen
Copy link
Contributor

LGTM. Just in case you missed my earlier comment, are you still planning to update the PR description to reflect the actual changes vs. the ones you had planned?

@nkronenfeld
Copy link
Contributor Author

I thought I'd done so, it looks like it lost my changes
I'll fix that asap

@nkronenfeld
Copy link
Contributor Author

sorry, must have accidentally hit cancel instead of comment the first time. Should be set now.

@JoshRosen
Copy link
Contributor

Thanks for updating the description. This looks good to me, so I'm going to merge this into master, branch-1.0, and branch-1.1 (and I'll tag it for a post-release backport into branch-1.2). Thanks again for this fix!

@asfgit asfgit closed this in 94b377f Dec 10, 2014
asfgit pushed a commit that referenced this pull request Dec 10, 2014
… with them

Accumulators keep thread-local copies of themselves.  These copies were only cleared at the beginning of a task.  This meant that (a) the memory they used was tied up until the next task ran on that thread, and (b) if a thread died, the memory it had used for accumulators was locked up forever on that worker.

This PR clears the thread-local copies of accumulators at the end of each task, in the tasks finally block, to make sure they are cleaned up between tasks.  It also stores them in a ThreadLocal object, so that if, for some reason, the thread dies, any memory they are using at the time should be freed up.

Author: Nathan Kronenfeld <nkronenfeld@oculusinfo.com>

Closes #3570 from nkronenfeld/Accumulator-Improvements and squashes the following commits:

a581f3f [Nathan Kronenfeld] Change Accumulators to private[spark] instead of adding mima exclude to get around false positive in mima tests
b6c2180 [Nathan Kronenfeld] Include MiMa exclude as per build error instructions - this version incompatibility should be irrelevent, as it will only surface if a master is talking to a worker running a different version of spark.
537baad [Nathan Kronenfeld] Fuller refactoring as intended, incorporating JR's suggestions for ThreadLocal localAccums, and keeping clear(), but also calling it in tasks' finally block, rather than just at the beginning of the task.
39a82f2 [Nathan Kronenfeld] Clear local copies of accumulators as soon as we're done with them

(cherry picked from commit 94b377f)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/Accumulators.scala
	core/src/main/scala/org/apache/spark/executor/Executor.scala
asfgit pushed a commit that referenced this pull request Dec 10, 2014
… with them

Accumulators keep thread-local copies of themselves.  These copies were only cleared at the beginning of a task.  This meant that (a) the memory they used was tied up until the next task ran on that thread, and (b) if a thread died, the memory it had used for accumulators was locked up forever on that worker.

This PR clears the thread-local copies of accumulators at the end of each task, in the tasks finally block, to make sure they are cleaned up between tasks.  It also stores them in a ThreadLocal object, so that if, for some reason, the thread dies, any memory they are using at the time should be freed up.

Author: Nathan Kronenfeld <nkronenfeld@oculusinfo.com>

Closes #3570 from nkronenfeld/Accumulator-Improvements and squashes the following commits:

a581f3f [Nathan Kronenfeld] Change Accumulators to private[spark] instead of adding mima exclude to get around false positive in mima tests
b6c2180 [Nathan Kronenfeld] Include MiMa exclude as per build error instructions - this version incompatibility should be irrelevent, as it will only surface if a master is talking to a worker running a different version of spark.
537baad [Nathan Kronenfeld] Fuller refactoring as intended, incorporating JR's suggestions for ThreadLocal localAccums, and keeping clear(), but also calling it in tasks' finally block, rather than just at the beginning of the task.
39a82f2 [Nathan Kronenfeld] Clear local copies of accumulators as soon as we're done with them

(cherry picked from commit 94b377f)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/Accumulators.scala
	core/src/main/scala/org/apache/spark/executor/Executor.scala
asfgit pushed a commit that referenced this pull request Dec 17, 2014
… with them

Accumulators keep thread-local copies of themselves.  These copies were only cleared at the beginning of a task.  This meant that (a) the memory they used was tied up until the next task ran on that thread, and (b) if a thread died, the memory it had used for accumulators was locked up forever on that worker.

This PR clears the thread-local copies of accumulators at the end of each task, in the tasks finally block, to make sure they are cleaned up between tasks.  It also stores them in a ThreadLocal object, so that if, for some reason, the thread dies, any memory they are using at the time should be freed up.

Author: Nathan Kronenfeld <nkronenfeld@oculusinfo.com>

Closes #3570 from nkronenfeld/Accumulator-Improvements and squashes the following commits:

a581f3f [Nathan Kronenfeld] Change Accumulators to private[spark] instead of adding mima exclude to get around false positive in mima tests
b6c2180 [Nathan Kronenfeld] Include MiMa exclude as per build error instructions - this version incompatibility should be irrelevent, as it will only surface if a master is talking to a worker running a different version of spark.
537baad [Nathan Kronenfeld] Fuller refactoring as intended, incorporating JR's suggestions for ThreadLocal localAccums, and keeping clear(), but also calling it in tasks' finally block, rather than just at the beginning of the task.
39a82f2 [Nathan Kronenfeld] Clear local copies of accumulators as soon as we're done with them

(cherry picked from commit 94b377f)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
@JoshRosen
Copy link
Contributor

I've merged this into branch-1.2, so this fix will be included in Spark 1.2.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants