-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18838][CORE] Use separate executor service for each event listener #16291
Conversation
Test build #70172 has finished for PR 16291 at commit
|
cc - @zsxwing - Please note that the PR is incomplete and there are some test failures. I just wanted some initial feedback on the design before investing more time on it. |
b4af82f
to
ed79578
Compare
Test build #70173 has finished for PR 16291 at commit
|
ed79578
to
defd536
Compare
Test build #70180 has finished for PR 16291 at commit
|
This was initially introduced by @kayousterhout |
Hmm... I took a quick look and I'm not sure I understand exactly what's going on. It seems you're wrapping each listener with a If that's the case, that sounds wrong. Each listener should process events serially otherwise you risk getting into funny situations like a task end event being processed before the task start event for the same task. I think this would benefit from a proper explanation of the changes being proposed, instead of a bunch of code. How will the listener bus work with the changes? Where will we have thread pools, where will we have message queues? Will each listener get its own dedicated thread, or will there be a limit? What kind of controls will there be to avoid memory pressure? Would it be worth it to allow certain listeners to have dedicated threads while others share the same one like the current approach? That kind of thing. Can you write something up and post it to the bug? |
@vanzin - Thanks for taking a look and sorry about putting my unfinished sloppy code out there. I updated the bug and the PR with the overall design idea, which hopefully will answer your questions.
You are right, that's why the executor service is single threaded which guarantees ordered processing of the events per listener. |
defd536
to
c790861
Compare
Test build #70287 has finished for PR 16291 at commit
|
c790861
to
d687aaf
Compare
Test build #70289 has finished for PR 16291 at commit
|
d687aaf
to
20fbf2f
Compare
Test build #70290 has finished for PR 16291 at commit
|
Test build #70298 has started for PR 16291 at commit |
Jenkins test this please. |
Test build #70299 has started for PR 16291 at commit |
9b4968d
to
d331f1e
Compare
Test build #70311 has finished for PR 16291 at commit
|
d331f1e
to
fbcdcca
Compare
Test build #70322 has finished for PR 16291 at commit
|
fbcdcca
to
6763827
Compare
Test build #70325 has finished for PR 16291 at commit
|
6763827
to
66e4f12
Compare
Test build #70345 has started for PR 16291 at commit |
Jenkins retest this please. |
Test build #70360 has finished for PR 16291 at commit
|
I'm not sure whether that change would be a problem, but I wonder if a system where listeners can "opt in" to this behavior wouldn't be better. e.g., |
@markhamstra, @vanzin thanks for your comments!
That is true. If a listener is slow in processing the events than it might not see the task start event where as other listeners might already have processed the corresponding task end event. I am not sure why would that cause any problem though. Each listener should in theory could work independent of each other and we should only guarantee ordered processing of the events within a listener. This is the only way I can think of to scale the driver for large workloads.
I think that will be an over-kill. Having a separate thread per listener simplifies the design. Also, for user added listeners, it will be hard to decide if we want to have a shared or separate thread per listener. The only downside of having a separate thread per listener is increased memory footprint in worst case. But I would argue that since the average event processing latency is decreased by multi threaded event processor, the backlog on the event queues will clear faster which actually might decrease the driver memory footprint on average case. |
If we were starting from nothing, then yes, it would be valid and advisable to design the Listener infrastructure using only this weaker guarantee. The issue, though, is that we are not starting from nothing, but rather from a system that currently offers a much stronger guarantee on the synchronized behavior of Listeners. If it is the case that no Listeners currently rely on the stronger guarantee and thus could work completely correctly under the weaker guarantee of this PR, then we could make this change without much additional concern. But reaching that level of confidence in current Listeners is a difficult prerequisite -- strictly speaking, it's an impossible task. We could carefully work through all the internal behavior of Spark's Listeners to convince ourselves that they can work correctly under the new, weaker guarantee. At a bare minimum, we need to do that much before we can consider merging this PR -- but that's probably not enough. The problem is that Listeners aren't just internal to Spark. Users have also developed their own custom Listeners that either implement
And all of the above is still really only arguing about whether we could adopt this PR in essentially its present form. There are still questions of whether we should do this or maybe instead we should do something a little different or more. I can see some merit in Marcelo's "opt in" suggestion. If there is utility in having groups of Listeners that can rely upon synchronized behavior, then we should probably retain one or more threads running synchronized Listeners. For example, if Listener A relies upon synchronization with Listeners B and C while D needs to synchronize with E, but F, G and H are all independent, then there are a couple of things we could do. First, the independent Listeners (F, G and H) can each run in its own thread, providing the scalable performance that this PR is aiming for. After that, we could either have one synchronized Listener thread for all the other Listeners, or we could have one thread for A, B and C and one thread for D and E. Whether we support only one synchronized Listener group/thread or multiple, we'd still need some mechanism for Listeners to select into a synchronized group or to indicate that they can and should be run independently on their own thread. |
I don't think it's overkill, and it wouldn't really be that much of a change from your existing design. It also serves as a way of controlling how much extra memory you'll need for these things, since right now each listener would incur in its own thread + message queue. e.g. you could just have a tagging interface (if listener implements "X" then it gets its own dedicated dispatch thread, or if you want to get fancy, you could also define a "group" so that listener in the same group share a dispatch thread). |
I agree with @markhamstra and @vanzin - having ability to tag listeners into groups (default = spark listener group) and preserving current synchronized behavior within group would be ensure backward compatibility at fairly minimal additional complexity. |
Test build #70649 has finished for PR 16291 at commit
|
162001a
to
f2c777d
Compare
jenkins test this please. |
Test build #70650 has finished for PR 16291 at commit
|
f2c777d
to
9e5819d
Compare
Test build #70651 has finished for PR 16291 at commit
|
9e5819d
to
6494cfe
Compare
Test build #70652 has finished for PR 16291 at commit
|
6494cfe
to
e09d66e
Compare
Test build #70656 has started for PR 16291 at commit |
Jenkins retest this please. |
Test build #70660 has finished for PR 16291 at commit
|
Thanks guys for your input. I have made changes to support grouping of listeners to preserve the existing synchronization behavior. Currently, the @rxin, @vanzin, @markhamstra , @mridulm . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a first pass to check the logic, and as I explained in the comments, my main concern is that I think this change should only apply to LiveListenerBus
. This asynchronous behavior is actually not desired in ReplayListenerBus
.
import org.apache.spark.internal.Logging | ||
import org.apache.spark.scheduler.LiveListenerBus | ||
|
||
private class ListenerEventExecutor[L <: AnyRef] (listenerName: String, queueCapacity: Int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a fan of the name, since you don't execute events. I prefer ListenerEventDispatcher.
Also, listenerName
is more of a group name now, right? You mention "listener" in many places of this class, those should be replaced with "listener group".
Finally, not sure why do you need the type parameter?
nit: no space before (
|
||
private class ListenerEventExecutor[L <: AnyRef] (listenerName: String, queueCapacity: Int) | ||
extends Logging { | ||
private val threadFactory = new ThreadFactoryBuilder().setDaemon(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ThreadUtils.namedThreadFactory
.
* guarantee that we do not process any event before starting the event executor. | ||
*/ | ||
private val isStarted = new AtomicBoolean(false) | ||
private val lock = new ReentrantLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, but you could achieve the same thing with synchronized/wait/notifyAll, with less code.
|
||
def stop(): Unit = { | ||
executorService.shutdown() | ||
executorService.awaitTermination(10, TimeUnit.SECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes the current behavior slightly. Right now the bus will wait until the listener thread stops:
listenerThread.join()
Here you might return before things are really shut down. It's unlikely to happen, but if it does happen, it can lead to weird issues. Might be better to try shutdownNow()
if this call fails, which at least makes a best effort to interrupt existing tasks that might be slow.
} catch { | ||
case e: RejectedExecutionException => | ||
droppedEventsCounter.incrementAndGet() | ||
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When measuring elapsed time it's generally better to use System.nanoTime
since it's monotonic.
@@ -35,7 +35,7 @@ import org.apache.spark.util.ListenerBus | |||
* and StreamingQueryManager. So this bus will dispatch events to registered listeners for only | |||
* those queries that were started in the associated SparkSession. | |||
*/ | |||
class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) | |||
class StreamingQueryListenerBus(val sparkListenerBus: LiveListenerBus) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to make this a val
?
@@ -69,7 +69,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) | |||
activeQueryRunIds.synchronized { activeQueryRunIds += s.runId } | |||
sparkListenerBus.post(s) | |||
// post to local listeners to trigger callbacks | |||
postToAll(s) | |||
postToAllSync(s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need this change? Seems like it wasn't a synchronous call before. If you need it, then postToAllSync
isn't really for testing only, and you need to change the comment in that method.
Same below.
@@ -661,7 +661,7 @@ class StreamingContext private[streaming] ( | |||
var shutdownHookRefToRemove: AnyRef = null | |||
if (LiveListenerBus.withinListenerThread.value) { | |||
throw new SparkException( | |||
s"Cannot stop StreamingContext within listener thread of ${LiveListenerBus.name}") | |||
"Cannot stop SparkContext within listener event executor thread") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamingContext
was actually correct.
@@ -26,7 +26,7 @@ import org.apache.spark.util.ListenerBus | |||
* registers itself with Spark listener bus, so that it can receive WrappedStreamingListenerEvents, | |||
* unwrap them as StreamingListenerEvent and dispatch them to StreamingListeners. | |||
*/ | |||
private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) | |||
private[streaming] class StreamingListenerBus(val sparkListenerBus: LiveListenerBus) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, is val
necessary?
} | ||
|
||
private[spark] object ListenerEventExecutor { | ||
val DefaultEventListenerGroup = "default-event-listener" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark's style is to use ALL_CAPS
for constants.
@sitalkedia are you still working on this? |
@tgravescs - I have not gotten time to work on this. Hopefully will get to it sometime next week. |
what's the current status of this PR? |
1 similar comment
what's the current status of this PR? |
What changes were proposed in this pull request?
Currently we are observing the issue of very high event processing delay in driver's
ListenerBus
for large jobs with many tasks. Many critical component of the scheduler likeExecutorAllocationManager
,HeartbeatReceiver
depend on theListenerBus
events and this delay might hurt the job performance significantly or even fail the job. For example, a significant delay in receiving theSparkListenerTaskStart
might causeExecutorAllocationManager
manager to mistakenly remove an executor which is not idle.The problem is that the event processor in
ListenerBus
is a single thread which loops through all the Listeners for each event and processes each event synchronously https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala#L94. This single threaded processor often becomes the bottleneck for large jobs. Also, if one of the Listener is very slow, all the listeners will pay the price of delay incurred by the slow listener. In addition to that a slow listener can cause events to be dropped from the event queue which might be fatal to the job.To solve the above problems, we propose to get rid of the event queue and the single threaded event processor. Instead each listener will have its own dedicate single threaded executor service . When ever an event is posted, it will be submitted to executor service of all the listeners. The Single threaded executor service will guarantee in order processing of the events per listener. The queue used for the executor service will be bounded to guarantee we do not grow the memory indefinitely. The downside of this approach is separate event queue per listener will increase the driver memory footprint.
How was this patch tested?
Tested by running the job on the cluster and the average event processing latency dropped from 2 minutes to 3 milliseconds.