-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18838][core] Add separate listener queues to LiveListenerBus. #19211
Conversation
This change modifies the live listener bus so that all listeners are added to queues; each queue has its own thread to dispatch events, making it possible to separate slow listeners from other more performance-sensitive ones. The public API has not changed - all listeners added with the existing "addListener" method, which after this change mostly means all user-defined listeners, end up in a default queue. Internally, there's an API allowing listeners to be added to specific queues, and that API is used to separate the internal Spark listeners into 3 categories: application status listeners (e.g. UI), executor management (e.g. dynamic allocation), and the event log. The queueing logic, while abstracted away in a separate class, is kept as much as possible hidden away from consumers. Aside from choosing their queue, there's no code change needed to take advantage of queues. Metrics were also simplified a little bit; the live bus now keeps track of metrics per queue instead of individual listeners. This is mostly to make integration with the existing metrics code in `ListenerBus` easier, without having to refactor the code; that can be done later if queue-level metrics are not enough. Test coverage relies on existing tests; a few tests had to be tweaked because they relied on `LiveListenerBus.postToAll` being synchronous, and the change makes that method asynchronous. Other tests were simplified not to use the asynchronous LiveListenerBus.
Test build #81693 has finished for PR 19211 at commit
|
Also fix a problem where the queue size metric might be out of date.
This change makes the event queue implement SparkListenerBus and inherit all the metrics and dispatching behavior, making the change easier on the scheduler and also restoring per-listener metrics.
Test build #81724 has started for PR 19211 at commit |
Test build #81725 has started for PR 19211 at commit |
Test build #81726 has started for PR 19211 at commit |
(Nevermind the test failures, I killed the obsolete builds.) |
retest this please |
Test build #81728 has finished for PR 19211 at commit
|
Test build #81744 has finished for PR 19211 at commit
|
I think this is ready now, pinging some people: @squito @JoshRosen @cloud-fan |
Test build #81795 has finished for PR 19211 at commit
|
I did a read-through of your code and didn't find any glaring issues, but I'd like to go through it a bit more before passing judgment. Overall a nice new feature though. |
@@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager( | |||
* the scheduling task. | |||
*/ | |||
def start(): Unit = { | |||
listenerBus.addListener(listener) | |||
listenerBus.addToQueue(listener, LiveListenerBus.EXECUTOR_MGMT_QUEUE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will we have more queues in the future? If it's not too many, maybe we can use different methods, like addToMGMTQueue
* Start an asynchronous thread to dispatch events to the underlying listeners. | ||
* | ||
* @param sc Used to stop the SparkContext in case the a listener fails. | ||
* @param metrics Used to report listener performance metrics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no metrics
param.
processingEvent = false | ||
} | ||
} | ||
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have something like
private val defaultQueue = ...
private val executorManagementQueue = ...
...
I think the number of queues will be small.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add specific methods for each internal queue; but I'd like to keep the internal management of queues more generic. One of the ideas in #18253 was to allow filtering of which events are enqueued at all (e.g. don't enqueue SparkListenerBlockUpdated
because it's not written to the event logs, reducing the load on the event log queue). Leaving the internal management more generic would allow that to be more easily / cleanly implemented later (instead of "addToEventLogQueue", you'd have a "addCustomQueue" method with a subclass of AsyncEventQueue
).
Test build #81826 has finished for PR 19211 at commit
|
case None => | ||
val newQueue = new AsyncEventQueue(queue, conf, metrics) | ||
newQueue.addListener(listener) | ||
if (started.get() && !stopped.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stopped.get
is always false here.
*/ | ||
private def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized { | ||
if (stopped.get()) { | ||
throw new IllegalStateException("LiveListenerBus is stopped.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we also check !started.get
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. It's ok and actually expected to add listeners before the bus is started.
|
||
/** Post an event to all queues. */ | ||
def post(event: SparkListenerEvent): Unit = { | ||
if (!stopped.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we add an assert that this bus is started?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, as the class javadoc says, it's ok to send events before the bus is started.
Test build #81884 has finished for PR 19211 at commit
|
private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { | ||
|
||
self => | ||
private[spark] class LiveListenerBus(conf: SparkConf) { | ||
|
||
import LiveListenerBus._ | ||
|
||
private var sparkContext: SparkContext = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I missed it but I don't see this being set anywhere now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this should be set in start()
, probably deleted in one of the code moves.
Test build #81895 has finished for PR 19211 at commit
|
queues.asScala.flatMap { queue => queue.findListenersByClass[T]() } | ||
} | ||
|
||
private[spark] def listeners: JList[SparkListenerInterface] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where do we use this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used by tests.
} | ||
|
||
private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): Seq[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a comment to say it testing only.
|
||
private[scheduler] val APP_STATUS_QUEUE = "appStatus" | ||
|
||
private[scheduler] val EXECUTOR_MGMT_QUEUE = "executorMgmt" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MGMT
is hard to understand, how about MANAGEMENT
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a pretty common abbreviation for "management", but sure.
|
||
// After listener bus has stopped, posting events should not increment counter | ||
bus.stop() | ||
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } | ||
assert(counter.count === 5) | ||
assert(bus.metrics.numEventsPosted.getCount === 5) | ||
|
||
// Make sure per-listener-class timers were created: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still have this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh seems we don't need it anymore.
@@ -575,8 +575,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL | |||
|
|||
test("getActive and getActiveOrCreate") { | |||
require(StreamingContext.getActive().isEmpty, "context exists from before") | |||
sc = new SparkContext(conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the test stops the underlying streaming context and thus the spark context, and you can't add listeners to the bus after that. Thus, it was failing.
private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { | ||
|
||
self => | ||
private[spark] class LiveListenerBus(conf: SparkConf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it still valid to call LiveListenerBus.postToAll
? If not, shall we overwrite postToAll
and throw exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also LiveListenerBus.addListener
seems not valid too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LiveListenerBus.postToAll
doesn't exist anymore, and neither does LiveListenerBus.addListener
. But I'm not sure what you're asking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh sorry I missed that, LiveListenerBus
doesn't extend SparkListenerBus
anymore.
LGTM except some minor comments |
this.sc = sc | ||
dispatchThread.start() | ||
} else { | ||
throw new IllegalStateException(s"$name already started!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to be a no-op here, or a warning log instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems fine to me -- its only called in LiveListenerBus, where we guarantee this is true. seems better to fail-fast if its messed up
lgtm do you know how much better this makes it? Eg., if we had an existing case where things go haywire in dynamic allocation because of this -- we could see if after this change, the only dropped events are in eventLog, hopefully. Seems like the right change regardless, but it would be good to know the effect. |
I don't really have a readily available cluster that can easily put that much pressure on the listener bus; I tried with the one I have and monitored the listener bus metrics, and found pretty much what I expected (slightly more latency in the event log queue, but not enough in my tests to actually cause event drops). |
Test build #81940 has finished for PR 19211 at commit
|
thanks, merging to master! |
Hi, @vanzin and @cloud-fan . |
Following assert is failing in some cases, let me fix it and send a follow up PR:
|
This change modifies the live listener bus so that all listeners are added to queues; each queue has its own thread to dispatch events, making it possible to separate slow listeners from other more performance-sensitive ones. The public API has not changed - all listeners added with the existing "addListener" method, which after this change mostly means all user-defined listeners, end up in a default queue. Internally, there's an API allowing listeners to be added to specific queues, and that API is used to separate the internal Spark listeners into 3 categories: application status listeners (e.g. UI), executor management (e.g. dynamic allocation), and the event log. The queueing logic, while abstracted away in a separate class, is kept as much as possible hidden away from consumers. Aside from choosing their queue, there's no code change needed to take advantage of queues. Test coverage relies on existing tests; a few tests had to be tweaked because they relied on `LiveListenerBus.postToAll` being synchronous, and the change makes that method asynchronous. Other tests were simplified not to use the asynchronous LiveListenerBus. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#19211 from vanzin/SPARK-18838. (cherry picked from commit c6ff59a) Cloudera ID: CDH-51951
…veListenerBus. This change modifies the live listener bus so that all listeners are added to queues; each queue has its own thread to dispatch events, making it possible to separate slow listeners from other more performance-sensitive ones. The public API has not changed - all listeners added with the existing "addListener" method, which after this change mostly means all user-defined listeners, end up in a default queue. Internally, there's an API allowing listeners to be added to specific queues, and that API is used to separate the internal Spark listeners into 3 categories: application status listeners (e.g. UI), executor management (e.g. dynamic allocation), and the event log. The queueing logic, while abstracted away in a separate class, is kept as much as possible hidden away from consumers. Aside from choosing their queue, there's no code change needed to take advantage of queues. Test coverage relies on existing tests; a few tests had to be tweaked because they relied on `LiveListenerBus.postToAll` being synchronous, and the change makes that method asynchronous. Other tests were simplified not to use the asynchronous LiveListenerBus. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#19211 from vanzin/SPARK-18838. (cherry picked from commit c6ff59a) Cloudera ID: CDH-51951 (cherry picked from commit 6cb9e4d) (cherry picked from commit 8cbc8656344412f3b2db39367f7def9f6a03512a)
This change modifies the live listener bus so that all listeners are
added to queues; each queue has its own thread to dispatch events,
making it possible to separate slow listeners from other more
performance-sensitive ones.
The public API has not changed - all listeners added with the existing
"addListener" method, which after this change mostly means all
user-defined listeners, end up in a default queue. Internally, there's
an API allowing listeners to be added to specific queues, and that API
is used to separate the internal Spark listeners into 3 categories:
application status listeners (e.g. UI), executor management (e.g. dynamic
allocation), and the event log.
The queueing logic, while abstracted away in a separate class, is kept
as much as possible hidden away from consumers. Aside from choosing their
queue, there's no code change needed to take advantage of queues.
Test coverage relies on existing tests; a few tests had to be tweaked
because they relied on
LiveListenerBus.postToAll
being synchronous,and the change makes that method asynchronous. Other tests were simplified
not to use the asynchronous LiveListenerBus.