Skip to content

Commit

Permalink
Cleanup metric initialization a bit.
Browse files Browse the repository at this point in the history
Also fix a problem where the queue size metric might be out of date.
  • Loading branch information
Marcelo Vanzin committed Sep 13, 2017
1 parent 9fc30ba commit 6bee214
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.util.Utils
* listeners; they are used internally by `LiveListenerBus`, and are tightly coupled to the
* lifecycle of that implementation.
*/
private class AsyncEventQueue(val name: String, conf: SparkConf)
private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics)
extends SparkListenerInterface
with Logging {

Expand Down Expand Up @@ -86,7 +86,14 @@ private class AsyncEventQueue(val name: String, conf: SparkConf)
private val started = new AtomicBoolean(false)
private val stopped = new AtomicBoolean(false)

private var droppedEvents: Counter = null
private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")

// Remove the queue size gauge first, in case it was created by a previous incarnation of
// this queue that was removed from the listener bus.
metrics.metricRegistry.remove(s"queue.$name.size")
metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] {
override def getValue: Int = taskQueue.size()
})

private val dispatchThread = new Thread(s"spark-listener-group-$name") {
setDaemon(true)
Expand Down Expand Up @@ -125,20 +132,9 @@ private class AsyncEventQueue(val name: String, conf: SparkConf)
* @param sc Used to stop the SparkContext in case the a listener fails.
* @param metrics Used to report listener performance metrics.
*/
private[scheduler] def start(sc: SparkContext, metrics: LiveListenerBusMetrics): Unit = {
private[scheduler] def start(sc: SparkContext): Unit = {
if (started.compareAndSet(false, true)) {
this.sc = sc
this.droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")

// Avoid warnings in the logs if this queue is being re-created; it will reuse the same
// gauge as before.
val queueSizeGauge = s"queue.$name.size"
if (metrics.metricRegistry.getGauges().get(queueSizeGauge) == null) {
metrics.metricRegistry.register(queueSizeGauge, new Gauge[Int] {
override def getValue: Int = taskQueue.size()
})
}

dispatchThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
queue.addListener(listener)

case None =>
val newQueue = new AsyncEventQueue(queue, conf)
val newQueue = new AsyncEventQueue(queue, conf, metrics)
newQueue.addListener(listener)
if (started.get() && !stopped.get()) {
newQueue.start(sparkContext, metrics)
newQueue.start(sparkContext)
}
super.addListener(newQueue)
}
Expand Down Expand Up @@ -130,7 +130,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
throw new IllegalStateException("LiveListenerBus already started.")
}

queues.asScala.foreach(_.start(sc, metrics))
queues.asScala.foreach(_.start(sc))
metricsSystem.registerSource(metrics)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
bus.metrics.metricRegistry.counter("queue.default.numDroppedEvents").getCount
}

private def queueSize(bus: LiveListenerBus): Option[Int] = {
Option(bus.metrics.metricRegistry.getGauges().get("queue.default.size"))
.map(_.getValue().asInstanceOf[Int])
private def queueSize(bus: LiveListenerBus): Int = {
bus.metrics.metricRegistry.getGauges().get("queue.default.size").getValue().asInstanceOf[Int]
}

private def eventProcessingTimeCount(bus: LiveListenerBus): Long = {
Expand Down Expand Up @@ -76,7 +75,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Metrics are initially empty.
assert(bus.metrics.numEventsPosted.getCount === 0)
assert(numDroppedEvents(bus) === 0)
assert(queueSize(bus) === None)
assert(queueSize(bus) === 0)
assert(eventProcessingTimeCount(bus) === 0)

// Post five events:
Expand All @@ -85,15 +84,15 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Five messages should be marked as received and queued, but no messages should be posted to
// listeners yet because the the listener bus hasn't been started.
assert(bus.metrics.numEventsPosted.getCount === 5)
assert(queueSize(bus) === None)
assert(queueSize(bus) === 5)
assert(counter.count === 0)

// Starting listener bus should flush all buffered events
bus.start(mockSparkContext, mockMetricsSystem)
Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)
assert(queueSize(bus) === Some(0L))
assert(queueSize(bus) === 0)
assert(eventProcessingTimeCount(bus) === 5)

// After listener bus has stopped, posting events should not increment counter
Expand Down Expand Up @@ -186,18 +185,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Post a message to the listener bus and wait for processing to begin:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
listenerStarted.acquire()
assert(queueSize(bus) === Some(0))
assert(queueSize(bus) === 0)
assert(numDroppedEvents(bus) === 0)

// If we post an additional message then it should remain in the queue because the listener is
// busy processing the first event:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
assert(queueSize(bus) === Some(1))
assert(queueSize(bus) === 1)
assert(numDroppedEvents(bus) === 0)

// The queue is now full, so any additional events posted to the listener will be dropped:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
assert(queueSize(bus) === Some(1))
assert(queueSize(bus) === 1)
assert(numDroppedEvents(bus) === 1)

// Allow the the remaining events to be processed so we can stop the listener bus:
Expand Down

0 comments on commit 6bee214

Please sign in to comment.