-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18838][CORE] Introduce multiple queues in LiveListenerBus #18253
Conversation
@vanzin @cloud-fan Can I have a review on this new PR ? |
I'm busy, but I'll get to it eventually. You could at least write a proper commit summary in the meantime. |
ok to test |
Test build #78001 has finished for PR 18253 at commit
|
Ok I rebased & updated my commit message. |
Test build #78111 has finished for PR 18253 at commit
|
Test build #78116 has finished for PR 18253 at commit
|
Test build #78117 has finished for PR 18253 at commit
|
I took a quick look and this does indeed look very much like work in progress. I also have a feeling that it's way over-engineered; there's a lot of base classes that are not that interesting, for example:
Changing "post" to "postToAll" as part of this change also is adding a lot of unnecessary noise. I'm not a fan of the current class hierarchy of the listener bus and I think that change makes sense, but at the same time it should be done separately since it's distracting here. I also saw methods that are not fully implemented in the code, so I assume you're still working on this. I'd also like to see better justification for your custom queue implementation. Have you identified the use of The approach in #16291 had a lot of good things going for it, and mostly needed some clean up (and be modified to only change the live listener bus, and not the replay one). Your current approach seems a lot more complicated than that. |
Test build #78128 has finished for PR 18253 at commit
|
Test build #78134 has finished for PR 18253 at commit
|
Test build #78136 has finished for PR 18253 at commit
|
Test build #78138 has finished for PR 18253 at commit
|
Test build #78139 has finished for PR 18253 at commit
|
Test build #78141 has finished for PR 18253 at commit
|
Test build #78146 has finished for PR 18253 at commit
|
Test build #78165 has finished for PR 18253 at commit
|
Test build #78167 has finished for PR 18253 at commit
|
Test build #78168 has finished for PR 18253 at commit
|
Test build #78169 has finished for PR 18253 at commit
|
Test build #78186 has finished for PR 18253 at commit
|
Test build #78205 has finished for PR 18253 at commit
|
Test build #78219 has finished for PR 18253 at commit
|
Test build #78225 has finished for PR 18253 at commit
|
Test build #78226 has finished for PR 18253 at commit
|
@vanzin Ok it is ready now.
I removed it.
I simplified it and add usages in the other commits. It is basically usefull to hold the metrics, and I need a common way to add a group of dependent listeners to the LiveListenerBus and the ReplayBus.
100% agree. I removed it
This implementation had 2 advantages: it is a 1 producer - 1 consumer queue whereas the BlockingQueue is a n producers - m consumers. So it use much less synchronization. The other advantage (the main one) is that no object is created for each message added to the queue. So it produces a lot less garbage. More independent queues we have, more it is significant.
I change it to 1 ms instead of 20ms. This time is much less than the average processing time of the fastest listener (around 5 ms for the HeartbeatListener). It is just to force the consumer thread to escape in case of empty queue to give more chance to the producer thread to be scheduled. I can remove it if you want. |
Yes, but have you quantified how much you win with that? If the blocking queue approach has enough throughput for the listener bus, it's safer to use it.
Well, you could use an Here's a link with numbers for 4M ops per sec in the 1P-1C case looks plenty fast for Spark's need.
I think if you really insist on going this route, you should use |
Yes I agree. But you get the synchronization too. I am still agree that it should not have a big impact yet. But using an ArrayBlockingQueue does not simplify the code a lot. The current implementation is not complicated, not too verbose, and base on a simple pure scala array. I do not think that it has a huge complexity cost compared to the java ArrayBlockingQueue. I change the Thread.sleep to a Thread.yield to be less agressive for the thread unscheduling. Even with very few messages it should not consume too much CPU, and it will be much more reactive when messages are bursting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started reviewing, but again, I noticed the same thing I commented on before. This is way over-engineered. You can do this in a much, much simpler way. There's no need to create all the different abstractions you're adding - the current listener abstraction is enough to achieve what is being proposed here.
All you need is to add a "queue name" parameter to the addListener
method, and potentially an "event filter" parameter. Everything else is hidden in the listener implementation, and doesn't need to be exposed to any calling code.
@@ -532,7 +533,10 @@ class SparkContext(config: SparkConf) extends Logging { | |||
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, | |||
_conf, _hadoopConfiguration) | |||
logger.start() | |||
listenerBus.addListener(logger) | |||
listenerBus.addProcessor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having a hard time finding the declaration of this method. I can't find it in your code nor in the existing master branch. Can you link to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In LiveListenerBus.scala line 86
@@ -2350,13 +2354,12 @@ class SparkContext(config: SparkConf) extends Logging { | |||
try { | |||
val listenerClassNames: Seq[String] = | |||
conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "") | |||
for (className <- listenerClassNames) { | |||
// Use reflection to find the right constructor | |||
val extraListeners = listenerClassNames.map{ className => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map { className =>
You have a lot of style issues in your code - indentation, spacing, etc. Please read the style section in http://spark.apache.org/contributing.html and try to follow it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
I do not understand, the PR pass the Scala style tests. How can I still have style issues ?
listener | ||
} | ||
if (extraListeners.nonEmpty) { | ||
val group = new FixGroupOfListener(extraListeners, "extraListeners") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FixGroupOfListener
is a bad class name. I'm not even sure what it's supposed to be, but the closest I can think of is ListenerGroup
.
But perhaps this shouldn't be exposed at all. If you add a queue name to the listener registration method, you can hide this from callers altogether. That is, if I understood what this class is in the first place.
Then you wouldn't need addIsolatedListener
either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin are you suggesting making this a call to addProcessor? Or, and addListener override? Just trying to understand the code at this stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, I think that modifying the existing addListener method is a bad idea. It will impact a lot the code. We want to keep this method with its current behavior (add a listener to the "default" queue) and be able to add listener in other queue. I think that adding a String label and doing matching on it to determine the queue is quite error prone. I prefer having a more constrained API.
For the FixedGroupOfListener name, I can change it. But I have 2 kind of group of listeners:
FixGroupOfListener
: For group of inter-dependant listeners (like UI listeners). I can rename it toListenerImmutableGroup
ModifiableGroupOfListener
: For the "default" queue. I can rename it toListenerGroup
Are these name OK for you ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, I think that modifying the existing addListener method is a bad idea. It will impact a lot the code.
That's why overloaded methods exist.
But I have 2 kind of group of listeners
I don't think there's really a distinction between the two types of groups you mention. The "UI group" is just a modifiable group that you don't modify after it's been created.
@@ -227,6 +169,7 @@ private[spark] class EventLoggingListener( | |||
* ".inprogress" suffix. | |||
*/ | |||
def stop(): Unit = { | |||
flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be necessary (close()
does it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } | ||
|
||
override def onOtherEvent(event: SparkListenerEvent): Unit = { | ||
def log(event: SparkListenerEvent): Unit = { | ||
if (event.logEvent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you're adding an event filter, you could perform this check there...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep the current behavior, it is not simple to put this filtering (if (event.logEvent)
) in the event filter. Indeed I want to perform it only if the type of the event is not a "basic" type. It would imply to complexify a lot the EventFilter, which acts here as a "pre-filter" (discard only part of the event that we do not want to log)
import org.apache.spark.scheduler.bus.ListenerBusQueue.{FixGroupOfListener, ModifiableGroupOfListener} | ||
|
||
// For generic message processor (like event logging) | ||
private[scheduler] class ProcessorListenerBusQueue( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, the name of this file is weird. But more importantly, why are these classes even necessary?
Why can't you have a single queue implementation that manages a group of listeners? Whether the group has a single listener or multiple shouldn't matter - the implementation can be the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the name of the file, I can change it ! Do you have a better name ? I can even put the content of the file (the 2 concrete implementations) in the BusQueue.scala file.
I refactored a bit this file. Now I have only 2 implementations:
ProcessorBusQueue
: This is the implementation for generic processor (In which we do not do the dispatch by event type)ListenerBusQueue
: This is the implementation for listener (with the dispatch by event type)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still confused about why you need 2 implementations. Why doesn't ListenerBusQueue
work for everybody? And why shouldn't it?
I need more time to actually grok all this code, but like Wenchen suggested before, this is a big change and it would benefit from a more detailed explanation of exactly how you're organizing the hierarchy of listener, groups, etc. Your PR description only explains which queues you created, but not any of the changes that were needed to achieve that.
If it makes it easier, you can create a README.md
file with a longer explanation for how things are organized. (for example, check ommon/network-common/src/main/java/org/apache/spark/network/crypto
where I added a README to explain details of what that whole body of code is doing).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi just some comments as I try to understand the code.
listener | ||
} | ||
if (extraListeners.nonEmpty) { | ||
val group = new FixGroupOfListener(extraListeners, "extraListeners") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin are you suggesting making this a call to addProcessor? Or, and addListener override? Just trying to understand the code at this stage.
@@ -532,7 +533,10 @@ class SparkContext(config: SparkConf) extends Logging { | |||
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, | |||
_conf, _hadoopConfiguration) | |||
logger.start() | |||
listenerBus.addListener(logger) | |||
listenerBus.addProcessor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
logEvent(toLog) | ||
nbMessageProcessed = nbMessageProcessed + 1 | ||
if (nbMessageProcessed == FLUSH_FREQUENCY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be >= FLUSH_FREQUENCY.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* This method is thread-safe and can be called in any thread. | ||
*/ | ||
final override def addListener(listener: SparkListenerInterface): Unit = { | ||
startStopAddRemoveLock.lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should probably be in a try/finally block, with unlock in the finally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same for other lock/unlocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done using Scala Try
} else { | ||
onDropEvent(event) | ||
throw new IllegalStateException("LiveListener bus already started!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
definitely want to unlock before this.
@@ -27,7 +27,12 @@ private[spark] trait SparkListenerBus | |||
|
|||
protected override def doPostEvent( | |||
listener: SparkListenerInterface, | |||
event: SparkListenerEvent): Unit = { | |||
event: SparkListenerEvent): Unit = SparkListenerEventDispatcher.dispatch(listener, event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this change necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have just extracted the dispatch method to be able to use it in GroupOfListenersBusQueue
and in SingleListenerBusQueue
(in the file ListenerBusQueueImpl.scala)
retest this please |
@vanzin I simplifed a lot the code. There are now only one implementation for the queue and for the group of listeners. I removed the extra trait in the listener hierarchy too. |
/** | ||
* Add a generic listener to an isolated pool. | ||
*/ | ||
def addProcessor(processor: SparkListenerEvent => Unit, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between addProcessor
and addListener
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with addProcessor, you do not have to provide a SparkListenerInterface (So with a method per message type), but just a generic function which handle SparkListenerEvent (the super type of each event type). So when you do a generic processing (see EventLoggingListener for example) it is very convenient, and cherry on the top you avoid the horrible and costly dispatch function , which in this case (generic processing) is a burden
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we do it in a separated PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just a small technical refactoring which came for almost free with the new qeue object. It is also very convenient to be able to handle the asynchronous LiveListenerBus in the test. I think that we can keep it in this PR.
The PR description is good for explaining the new behavior, but can you say more about the implementation? IMO, we just need to duplicate the event queue for each important listener like event logging listener, and non-important listeners can share one event queue like the current behavior. Then each event queue is processed by an individual thread. |
@cloud-fan PR description updated with some details on the implementation |
I think
|
Agree that it still seems like there's too many moving parts here. I don't see a whole lot of difference between
Also to reinforce a previous comment, your code has a ton of style issues. It doesn't matter that checkstyle doesn't complain about them; you still have to follow the code convention of the project or we'll be forever pointing out style issues in your code. |
@cloud-fan |
@vanzin
I will do a pass to try to fix the code style issues. |
I really dislike I think part of the confusion here is that the current code is trying to both refactor the I don't doubt that there's benefit in taking a holistic look into this part of the class hierarchy; but it would be good to do that separately, both so that we can clearly see that the proposed hierarchy makes sense, and so that it's easier to review things. It's easier to wrap your head around the code if it's focused on one problem instead of two. |
I pushed some code to my repo: https://github.com/vanzin/spark/tree/SPARK-18838 Which is an attempt to do things the way I've been trying to explain. It tries to keep changes as local as possible to It's just a p.o.c. so I cut a few corners (like metrics), and I only ran |
@vanzin I pushed some comments on your code. I think that trying to keep the exact same class hierarchy leads to a very complex code, with many drawbacks. |
The LiveListenerBus can now manage multiple queues for different listeners This will allow to increase a lot its deqeuing rate. All the listeners are still added to the main queue. So the behavior is the same as the previous one. In further commits some listeners will be moved to dedicated queues. ## How was this patch tested? unit test + manual tests have been run on the cluster
You commented on my code, not on the idea. My code was hacked together quickly, it can be cleaned up a lot. Your comments don't prove that separating the refactoring of the listener bus hierarchy from the introduction of queues is impossible or undesirable. |
The eventLoggingListener is now in a dedicated asynchronous queue. This listener could represent 50% of the event processing time of the standard queue ## How was this patch tested? unit test + manual tests have been run on the cluster
The ExecutorAllocationManager is now in a dedicated asynchronous queue. This listener suffer a lot of event drops. Put it in a dedicated queue decrease a lot the chance of them ## How was this patch tested? unit test + manual tests have been run on the cluster
The UI event listeners are now in a dedicated asynchronous queue. This set of listener could represent 40% of the event processing time + do not block the listener bus with call from the GUI ## How was this patch tested? unit test + manual tests have been run on the cluster
The extralisteners are now in a dedicated asynchronous queue. So they cannot interfere with the execution of the spark internal listeners ## How was this patch tested? unit test + manual tests have been run on the cluster
The streaming listener which is a bus too (for streaming event & listener) is now in a dedicated asynchronous queue. So they streaming listeners are run without impact from the other listeners ## How was this patch tested? unit test + manual tests have been run on the cluster
- wait on empty queue instead of looping
What changes were proposed in this pull request?
In this PR the single queue of the LiveListenerBus was replaced by multiple independent queues.
The queue and its processing thread have been extracted from
LiveListenerBus
into a classBusQueue
. The definition of most of the methods ofListenerBus
has been extracted into a traitWithListenerBus
. TheLiveListenerBus
implements it directly. It hold the "default" queue associated with a group of listeners and a list of queues. The methodaddListener
of theWithListenerBus
has a new optional boolean parameter (default value false) to require a independent queue for this listener instead of the default one. This parameter is ignored in the default implementation (inListenerBus
) .A listener which is also a set of listeners has been added. It allows to keep the current behavior for a group of dependent listeners or the default queue. It handles the per listener metrics.
The methods
addProcessor
andremoveProcessor
have been added toLiveListenerBus
to be able to add message processing at the super typeSparkListenerEvent
in addition to the per event type processing of the listener interface.How was this patch tested?
utest + manual tests on the cluster