-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30285][CORE] Fix deadlock between LiveListenerBus#stop and AsyncEventQueue#removeListenerOnError #26924
Conversation
…emoveListenerOnError
Gentle ping @squito :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with your analysis of the issue. I do think this would fix it, but I'm wondering if there is a cleaner way. @vanzin any ideas?
one general thing -- I'd replace every use of "race condition" with deadlock in the PR description.
} | ||
|
||
@Override | ||
public void dead_$eq(boolean dead) { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these methods should actually be implemented, so anybody extending this in java gets the fix as well.
ideally we could do this somehow so this doesn't get exposed at all part of the api, but I can't think of a way to do that ...
if (bus.isInStop) { | ||
// If bus is in the progress of stop, we just mark the listener as dead instead of removing | ||
// via calling `bus.removeListener` to avoid race condition | ||
// dead listeners will be removed eventually in `bus.stop` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some grammar nits:
If we're in the middle of stopping the bus, we just mark the listener as dead,
instead of removing, to avoid a deadlock.
Dead listeners will be removed eventually in bus.stop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ok to test |
Test build #115583 has finished for PR 26924 at commit
|
I don't like this approach because it exposes completely internal things in the public API. (Also, exposes Scala-isms in a Java class...) I'm almost convinced that we should just remove the But during shutdown the important event (application end) is posted on the same thread stopping the bus, so there's no race there, and that's the only event I'd be worried about. |
Thanks for your feedback! @squito @vanzin
Another concern is that the |
Test build #115608 has finished for PR 26924 at commit
|
Would you please update PR description according to your latest updates? |
retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sold on the unit test; it seems to be racy regardless of how you write it. If you happen to hit the original bug you'd fail the test, but I'm not sure how effective the test actually is in hitting that situation. But maybe that's enough...
val suffix = if (throwInterruptedException) "throw interrupt" else "set Thread interrupted" | ||
test(s"SPARK-30285: Fix deadlock in AsyncEventQueue.removeListenerOnError: $suffix") { | ||
val conf = new SparkConf(false) | ||
.set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you really need this?
* else count SparkListenerJobEnd numbers | ||
*/ | ||
private class DelayInterruptingJobCounter( | ||
val throwInterruptedException: Boolean, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent more
}) | ||
stoppingThread.start() | ||
// Notify interrupting listener starts to work | ||
interruptingListener.sleep = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you trying to make sure listeners throw the exception after "stop()" is called? That's going to be hard, and your code isn't really guaranteeing that.
You could use a CountDownLatch
that you signal right before calling stop()
(in the thread) to unblock the listener; that will at least narrow the race down a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could check the stopped
status of bus
in the listener.
This would be better than using a CountDownLatch
, however, it can't get rid of racing completely. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CountDownLatch
always make things deterministic and it sounds better to me.
What do you mean by "it can't get rid of racing completely"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the PR description, to reproduce the original issue, we have to make sure:
- Holding the synchronized lock of
bus
in the stopping thread - Trying to acquire the synchronized lock of
bus
in the interrupting listener thread
But signal the listener starts to interrupt just before bus.stop
by a CountDownLatch
can't guarantee this 100%, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, you should insert CountDownLatch
after bus.stop
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, checking the stoped status can't guarantee this. It's likely that the bus has already set the stoped status to true, but has not acquired the synchronized lock yet.
IIUC, you want to let interruptingListener
start to work once bus
has moved to stop
status and acquired the synchronized lock, right?
But how can bus
acquired the synchronized lock now? This fix has already removed the synchronized lock. The only thing you could do is to check bus
status now and I think it's enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got your point.
Now, there are two things.
- Without the fix, how the test would behave.
- With the fix, how to make sure that there is no deadlock when a listener is interrupted after bus.stop is called.
For (1), we can't avoid racing without changing the bus.stop
code (e.g. add a callback).
For (2), we at least have to expose the internal stoped
status of bus
, which maybe is not recommended.
So WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only focus on LiveListenerBus
may be impossible to workaround the difficulties you mentioned above. Maybe we should move to AsyncEventQueue
.
How about this way:
-
Add a method
status()
inAsyncEventQueue
for testing only; -
In
interruptingListener
, keep checkingAsyncEventQueue.status()
until it's stopped. So, whenAsyncEventQueue
is stopped, we're sure thatLiveListenerBus
has stopped too and acquired the lock(without fix).
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this would work. In AsyncEventQueue
, in fact, there is also a stoped
status that we could check.
But associating a listener with its AsyncEventQueue
would be another problem we have to resolve. Currently, it's encapsulated by bus.addToXXXQueue
inside the bus code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You guys are trying to fabricate a test that will not be testing what the actual code is doing when a real app is running. That's the problem.
To do that you'd need the stop()
code in the listener bus to wait holding a lock while the queues are being drained; and one of those queues need to run into the error that causes it to remove a bad listener. That's hard to do without inserting callbacks that don't exist into the code; and adding those callbacks would only be enabling the test, which is why that's questionably.
So you basically need this in the new stop()
:
def stop() {
// do some stop stuff here
testStartCallback()
// clear the queues here
testEndCallback()
}
The two callbacks are needed because otherwise there is no guarantee that what the queues do will happen before stop()
does its thing.
But really I don't see what really that test would be actually testing now that there is no synchronized block anymore.
Anything you do here without these callbacks will be racy, and thus may not hit the original issue. Also, without the synchronized block, there's nothing to cause a deadlock in the first place, so that's why I said the test isn't that great to begin with.
So I'd avoid trying to create a fancy test that isn't really testing the issue and just adding unneeded hooks into the main code. The current test is ok and as close as you'll get without the above callbacks; so either go with that, or just remove the test.
// Notify interrupting listener starts to work | ||
interruptingListener.sleep = false | ||
// Wait for bus to stop | ||
stoppingThread.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you're trying to detect a deadlock, shouldn't this have a timeout?
Test build #115628 has finished for PR 26924 at commit
|
Test build #115640 has finished for PR 26924 at commit
|
I don't think there's a way to write a proper test here without changing a bunch of things in the bus and queue code to expose internal hooks... and I don't think that's desirable. I guess the current test is good enough as an attempt to test this. But Jenkins seems to be hosed, so running the tests here will probably have to wait until after the holidays... |
retest this please |
Test build #115655 has finished for PR 26924 at commit
|
retest this please |
Test build #115663 has finished for PR 26924 at commit
|
Ok, thanks for taking care. I saw the "Jenkins looks hosed" discussion in the spark dev mail list. Let's wait until then. |
PySpark failure introduced by a mistake merge, which has been reverted just now. |
retest this please. |
Test build #115700 has finished for PR 26924 at commit
|
A little weird. The failed test can pass locally. |
retest this please. |
Test build #115798 has finished for PR 26924 at commit
|
retest this, please |
Test build #115804 has finished for PR 26924 at commit
|
retest this please |
Test build #116057 has finished for PR 26924 at commit
|
Merging to master / 2.4. |
…ncEventQueue#removeListenerOnError There is a deadlock between `LiveListenerBus#stop` and `AsyncEventQueue#removeListenerOnError`. We can reproduce as follows: 1. Post some events to `LiveListenerBus` 2. Call `LiveListenerBus#stop` and hold the synchronized lock of `bus`(https://github.com/apache/spark/blob/5e92301723464d0876b5a7eec59c15fed0c5b98c/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L229), waiting until all the events are processed by listeners, then remove all the queues 3. Event queue would drain out events by posting to its listeners. If a listener is interrupted, it will call `AsyncEventQueue#removeListenerOnError`, inside it will call `bus.removeListener`(https://github.com/apache/spark/blob/7b1b60c7583faca70aeab2659f06d4e491efa5c0/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala#L207), trying to acquire synchronized lock of bus, resulting in deadlock This PR removes the `synchronized` from `LiveListenerBus.stop` because underlying data structures themselves are thread-safe. To fix deadlock. No. New UT. Closes #26924 from wangshuo128/event-queue-race-condition. Authored-by: Wang Shuo <wangshuo128@gmail.com> Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 10cae04) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
FYI I had to resolve a trivial conflict and make a small scala 2.11-related change to the code in 2.4. |
Thanks a lot! |
What changes were proposed in this pull request?
There is a deadlock between
LiveListenerBus#stop
andAsyncEventQueue#removeListenerOnError
.We can reproduce as follows:
LiveListenerBus
LiveListenerBus#stop
and hold the synchronized lock ofbus
(spark/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
Line 229 in 5e92301
AsyncEventQueue#removeListenerOnError
, inside it will callbus.removeListener
(spark/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
Line 207 in 7b1b60c
This PR removes the
synchronized
fromLiveListenerBus.stop
because underlying data structures themselves are thread-safe.Why are the changes needed?
To fix deadlock.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New UT.