-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4194] [core] Make SparkContext initialization exception-safe. #5335
Conversation
This fixes the thread leak. I also changed the unit test to keep track of allocated contexts and making sure they're closed after tests are run; this is needed since some tests use this pattern: val sc = createContext() doSomethingThatMayThrow() sc.stop()
SparkContext has a very long constructor, where multiple things are initialized, multiple threads are spawned, and multiple opportunities for exceptions to be thrown exist. If one of these happens at an innoportune time, lots of garbage tends to stick around. This patch re-organizes SparkContext so that its internal state is initialized in a big "try" block. The fields keeping state are now completely private to SparkContext, and are "vars", because Scala doesn't allow you to initialize a val later. The existing API interface is kept by turning vals into defs (which works because Scala guarantees the same binary interface for those). On top of that, a few things in other areas were changed to avoid more things leaking: - Executor was changed to explicitly wait for the heartbeat thread to stop. LocalBackend was changed to wait for the "StopExecutor" message to be received, since otherwise there could be a race between that message arriving and the actor system being shut down. - ConnectionManager could possibly hang during shutdown, because an interrupt at the wrong moment could cause the selector thread to still call select and then wait forever. So also wake up the selector so that this situation is avoided.
Note the PR contains the commits from #5311. I hope once that is pushed that github will figure things out. (If not I'll rebase manually.) So if you want to skip those changes, just look at the last commit in the list. With both of these PRs, I was able to run the core/ unit tests and verify that:
I tried to run MiMA checks locally and they look ok, but let's see what jenkins says. |
Test build #29621 has finished for PR 5335 at commit
|
Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
Oops, borked merge, fixing... |
Test build #29684 has finished for PR 5335 at commit
|
Jenkins, retest this please. |
Test build #29685 has finished for PR 5335 at commit
|
Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala
Test build #29686 has finished for PR 5335 at commit
|
Test build #29692 has finished for PR 5335 at commit
|
Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala core/src/main/scala/org/apache/spark/executor/Executor.scala core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
Test build #29764 has finished for PR 5335 at commit
|
Ping? |
I tend to trust your hand on this. This is a big change and it's hard to match up the existing logic with new logic in the diff, though it looks like that was the intent and effect from spot-checking some elements. That the tests have passed several times is a good sign. The reorganization is significantly positive since the fields, and their initialization, are clearly grouped now. One minor style thing, why the I think the additional changes look reasonable, like using a Java I favor this though weakly on the grounds that I'm mostly relying on tests for correctness. The intent is sound. @rxin @pwendell do you have any thoughts on this one? |
That's actually used in lots of places in Spark. It's used when some variable / field name conflicts with a |
OK right, and that's true of all of them here. |
ping @JoshRosen - I think he's proposed this exact change to me in the past. |
See the description of PR #3121 for my previous discussion of this. If we want to avoid introducing |
That aproach would be a lot more complicated. The first reason why it would be complicated is that you'd need an uber-constructor in SparkContext that takes all the initialized internal values. Unless there's some fancy Scala feature I'm not aware of, that in itself is scary as hell, and would mean the other constructors would be similarly ugly in that they'd have to call the companion object. It would also cause (even more) duplication of the declaration of these things, since they'd have to be declared in the companion object's method too. Finally, it would complicate So while I would love to simplify the code in SparkContext, the alternative suggestion, as far as I can see, does nothing towards that. And that's why I chose private vars. It's not optimal, and I really wish Scala would allow me to initialize a val after its declaration, like Java does. But it's the easiest approach, and it doesn't expose any mutable SparkContext state that wasn't already exposed before. |
I also feel that the current approach makes more sense than Josh's alternative. Changes to SparkContext get a lot of scrutiny during code review, so clear documentation, IMO is sufficient to ensure this is followed correctly (famous last words). I didn't have time to dive into this to make sure there are no correctness issues, but the broad approach looks good to me... I think it's worth fixing this up. |
Seems like there's support for this change, though needs a rebase @vanzin . Any objections to proceeding after that? |
Conflicts: core/src/main/scala/org/apache/spark/executor/Executor.scala core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
Test build #30260 has finished for PR 5335 at commit
|
Test build #30293 has finished for PR 5335 at commit
|
SparkContext has a very long constructor, where multiple things are
initialized, multiple threads are spawned, and multiple opportunities
for exceptions to be thrown exist. If one of these happens at an
innoportune time, lots of garbage tends to stick around.
This patch re-organizes SparkContext so that its internal state is
initialized in a big "try" block. The fields keeping state are now
completely private to SparkContext, and are "vars", because Scala
doesn't allow you to initialize a val later. The existing API interface
is kept by turning vals into defs (which works because Scala guarantees
the same binary interface for those).
On top of that, a few things in other areas were changed to avoid more
things leaking:
stop. LocalBackend was changed to wait for the "StopExecutor"
message to be received, since otherwise there could be a race
between that message arriving and the actor system being shut down.
interrupt at the wrong moment could cause the selector thread to
still call select and then wait forever. So also wake up the
selector so that this situation is avoided.