Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18838][CORE] Use separate executor service for each event listener #16291

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.{Clock, ListenerEventExecutor, SystemClock, ThreadUtils, Utils}

/**
* An agent that dynamically allocates and removes executors based on the workload.
Expand Down Expand Up @@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager(
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
listenerBus.addListener(listener, ListenerEventExecutor.ExecutorAllocationManagerGroup)

val scheduleTask = new Runnable() {
override def run(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
this(sc, new SystemClock)
}

sc.addSparkListener(this)
sc.listenerBus.addListener(this, ListenerEventExecutor.HeartBeatReceiverGroup)

override val rpcEnv: RpcEnv = sc.env.rpcEnv

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ class SparkContext(config: SparkConf) extends Logging {
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
listenerBus.addListener(logger, ListenerEventExecutor.EventLoggingGroup)
Some(logger)
} else {
None
Expand Down Expand Up @@ -1874,7 +1874,7 @@ class SparkContext(config: SparkConf) extends Logging {
def stop(): Unit = {
if (LiveListenerBus.withinListenerThread.value) {
throw new SparkException(
s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}")
s"Cannot stop SparkContext within listener event executor thread")
}
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.
Expand Down Expand Up @@ -2329,7 +2329,7 @@ class SparkContext(config: SparkConf) extends Logging {
" parameter from breaking Spark's ability to find a valid constructor.")
}
}
listenerBus.addListener(listener)
listenerBus.addListener(listener, ListenerEventExecutor.DefaultUserEventListenerGroup)
logInfo(s"Registered listener $className")
}
} catch {
Expand Down
157 changes: 5 additions & 152 deletions core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
package org.apache.spark.scheduler

import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.DynamicVariable

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils


/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
Expand All @@ -37,12 +36,9 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa

self =>

import LiveListenerBus._

// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
override lazy val eventQueueSize = validateAndGetQueueSize()

private def validateAndGetQueueSize(): Int = {
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
Expand All @@ -52,102 +48,8 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
queueSize
}

// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)

/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)

/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L

// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false

private val logDroppedEvent = new AtomicBoolean(false)

// A counter that represents the number of events produced and consumed in the queue
private val eventLock = new Semaphore(0)

private val listenerThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
LiveListenerBus.withinListenerThread.withValue(true) {
while (true) {
eventLock.acquire()
self.synchronized {
processingEvent = true
}
try {
val event = eventQueue.poll
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
postToAll(event)
} finally {
self.synchronized {
processingEvent = false
}
}
}
}
}
}

/**
* Start sending events to attached listeners.
*
* This first sends out all buffered events posted before this listener bus has started, then
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*
*/
def start(): Unit = {
if (started.compareAndSet(false, true)) {
listenerThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
}
}

def post(event: SparkListenerEvent): Unit = {
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
logError(s"$name has already stopped! Dropping event $event")
return
}
val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}

val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
postToAll(event)
}

/**
Expand All @@ -159,7 +61,8 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
@throws(classOf[TimeoutException])
def waitUntilEmpty(timeoutMillis: Long): Unit = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!queueIsEmpty) {

while (!isListenerBusEmpty) {
if (System.currentTimeMillis > finishTime) {
throw new TimeoutException(
s"The event queue is not empty after $timeoutMillis milliseconds")
Expand All @@ -169,60 +72,10 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
Thread.sleep(10)
}
}

/**
* For testing only. Return whether the listener daemon thread is still alive.
* Exposed for testing.
*/
def listenerThreadIsAlive: Boolean = listenerThread.isAlive

/**
* Return whether the event queue is empty.
*
* The use of synchronized here guarantees that all events that once belonged to this queue
* have already been processed by all attached listeners, if this returns true.
*/
private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent }

/**
* Stop the listener bus. It will wait until the queued events have been processed, but drop the
* new events after stopping.
*/
def stop(): Unit = {
if (!started.get()) {
throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
}
if (stopped.compareAndSet(false, true)) {
// Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
// `stop` is called.
eventLock.release()
listenerThread.join()
} else {
// Keep quiet
}
}

/**
* If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be
* notified with the dropped events.
*
* Note: `onDropEvent` can be called in any thread.
*/
def onDropEvent(event: SparkListenerEvent): Unit = {
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
}
}

private[spark] object LiveListenerBus {
// Allows for Context to check whether stop() call is made within listener thread
val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)

/** The thread name of Spark listener bus */
val name = "SparkListenerBus"
}

Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
currentLine = entry._1
lineNumber = entry._2 + 1

postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
postToAllSync(JsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
// Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private[spark] object SparkUI {
Utils.getContextOrSparkClassLoader).asScala
listenerFactories.foreach { listenerFactory =>
val listeners = listenerFactory.createListeners(conf, sparkUI)
listeners.foreach(listenerBus.addListener)
listeners.foreach(l => listenerBus.addListener(l))
}
sparkUI
}
Expand Down
Loading