diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 7d83909075806..83ae57b7f1516 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -21,7 +21,7 @@ import akka.actor.Actor import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId import org.apache.spark.scheduler.TaskScheduler -import org.apache.spark.util.{ActorLogReceive, ThreadStackTrace} +import org.apache.spark.util.ActorLogReceive /** * A heartbeat from executors to the driver. This is a shared message used by several internal @@ -35,12 +35,7 @@ private[spark] case class Heartbeat( private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) /** - * A thread dump sent from executors to the driver. - */ -private[spark] case class ThreadDump(executorId: String, threadStackTraces: Array[ThreadStackTrace]) - -/** - * Lives in the driver to receive heartbeats from executors. + * Lives in the driver to receive heartbeats from executors.. */ private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor with ActorLogReceive with Logging { @@ -48,9 +43,7 @@ private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) override def receiveWithLogging = { case Heartbeat(executorId, taskMetrics, blockManagerId) => val response = HeartbeatResponse( - ! scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) + !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) sender ! response - case ThreadDump(executorId, stackTraces: Array[ThreadStackTrace]) => - scheduler.executorThreadDumpReceived(executorId, stackTraces) } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0d6ba4176ec5f..19c0c9cc5ad72 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -21,7 +21,7 @@ import scala.language.implicitConversions import java.io._ import java.net.URI -import java.util.{Arrays, Properties, Timer, TimerTask, UUID} +import java.util.{Arrays, Properties, UUID} import java.util.concurrent.atomic.AtomicInteger import java.util.UUID.randomUUID import scala.collection.{Map, Set} @@ -40,6 +40,7 @@ import akka.actor.Props import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import org.apache.spark.executor.TriggerThreadDump import org.apache.spark.input.WholeTextFileInputFormat import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ @@ -50,7 +51,7 @@ import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.JobProgressListener -import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils} +import org.apache.spark.util._ /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -241,18 +242,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { // the bound port to the cluster manager properly ui.foreach(_.bind()) - // If we are not running in local mode, then start a new timer thread for capturing driver thread - // dumps for display in the web UI (in local mode, this is handled by the local Executor): - private val threadDumpTimer = new Timer("Driver thread dump timer", true) - if (!isLocal && conf.getBoolean("spark.executor.sendThreadDumps", true)) { - val threadDumpInterval = conf.getInt("spark.executor.heartbeatInterval", 10000) - threadDumpTimer.scheduleAtFixedRate(new TimerTask { - override def run(): Unit = { - listenerBus.post(SparkListenerExecutorThreadDump("", Utils.getThreadDump())) - } - }, threadDumpInterval, threadDumpInterval) - } - /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf) @@ -362,6 +351,18 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { override protected def childValue(parent: Properties): Properties = new Properties(parent) } + /** Called by the web UI to obtain executor thread dumps */ + private[spark] def getExecutorThreadDump(executorId: String): Array[ThreadStackTrace] = { + if (executorId == "") { + Utils.getThreadDump() + } else { + val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get + val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem) + AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump(), actorRef, + AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf)) + } + } + private[spark] def getLocalProperties: Properties = localProperties.get() private[spark] def setLocalProperties(props: Properties) { @@ -971,7 +972,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { def stop() { postApplicationEnd() ui.foreach(_.stop()) - threadDumpTimer.cancel() // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 697154d762d41..3711824a40cfc 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -131,7 +131,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Create a new ActorSystem using driver's Spark properties to run the backend. val driverConf = new SparkConf().setAll(props) val (actorSystem, boundPort) = AkkaUtils.createActorSystem( - "sparkExecutor", hostname, port, driverConf, new SecurityManager(driverConf)) + SparkEnv.executorActorSystemName, + hostname, port, driverConf, new SecurityManager(driverConf)) // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a1682832711c4..a306e54c8f555 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal -import akka.actor.ActorSystem +import akka.actor.{Props, ActorSystem} import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil @@ -92,6 +92,10 @@ private[spark] class Executor( } } + // Create an actor for receiving RPCs from the driver + private val executorActor = env.actorSystem.actorOf( + Props(new ExecutorActor(executorId)), "ExecutorActor") + // Create our ClassLoader // do this after SparkEnv creation so can access the SecurityManager private val urlClassLoader = createClassLoader() @@ -128,6 +132,7 @@ private[spark] class Executor( def stop() { env.metricsSystem.report() + env.actorSystem.stop(executorActor) isStopped = true threadPool.shutdown() if (!isLocal) { @@ -355,17 +360,13 @@ private[spark] class Executor( val retryAttempts = AkkaUtils.numRetries(conf) val retryIntervalMs = AkkaUtils.retryWaitMs(conf) val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf, env.actorSystem) - val threadDumpsEnabled = conf.getBoolean("spark.executor.sendThreadDumps", true) val t = new Thread() { override def run() { // Sleep a random interval so the heartbeats don't end up in sync Thread.sleep(interval + (math.random * interval).asInstanceOf[Int]) + while (!isStopped) { - if (threadDumpsEnabled) { - // Send the thread-dump as a fire-and-forget, best-effort message: - heartbeatReceiverRef ! ThreadDump(executorId, Utils.getThreadDump()) - } val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]() for (taskRunner <- runningTasks.values()) { if (!taskRunner.attemptedTask.isEmpty) { @@ -384,6 +385,7 @@ private[spark] class Executor( } } } + val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId) try { val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef, diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 73a7f6a2ba977..f81fa6d8089fc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -39,7 +39,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage._ -import org.apache.spark.util._ +import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils} import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat /** @@ -174,13 +174,6 @@ class DAGScheduler( timeout.duration).asInstanceOf[Boolean] } - /** - * Called by the TaskScheduler when a thread dump is received from an executor. - */ - def executorThreadDumpReceived(execId: String, stackTraces: Array[ThreadStackTrace]) { - listenerBus.post(SparkListenerExecutorThreadDump(execId, stackTraces)) - } - // Called by TaskScheduler when an executor fails. def executorLost(execId: String) { eventProcessActor ! ExecutorLost(execId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 71e72fe2b97f3..86afe3bd5265f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -26,7 +26,7 @@ import org.apache.spark.{Logging, TaskEndReason} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{Distribution, Utils, ThreadStackTrace} +import org.apache.spark.util.{Distribution, Utils} @DeveloperApi sealed trait SparkListenerEvent @@ -77,12 +77,6 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan @DeveloperApi case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent -@DeveloperApi -case class SparkListenerExecutorThreadDump( - execId: String, - threadStackTraces: Array[ThreadStackTrace]) - extends SparkListenerEvent - /** * Periodic updates from executors. * @param execId executor id @@ -178,11 +172,6 @@ trait SparkListener { */ def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { } - /** - * Called when the driver receives thread dumps from an executor in a heartbeat. - */ - def onExecutorThreadDump(executorThreadDump: SparkListenerExecutorThreadDump) {} - /** * Called when the driver receives task metrics from an executor in a heartbeat. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 11b4f4e44ae51..e79ffd7a3587d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -70,8 +70,6 @@ private[spark] trait SparkListenerBus extends Logging { foreachListener(_.onApplicationEnd(applicationEnd)) case metricsUpdate: SparkListenerExecutorMetricsUpdate => foreachListener(_.onExecutorMetricsUpdate(metricsUpdate)) - case threadDump: SparkListenerExecutorThreadDump => - foreachListener(_.onExecutorThreadDump(threadDump)) case SparkListenerShutdown => } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 58057c77f07b7..a129a434c9a1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.ThreadStackTrace /** * Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl. @@ -66,12 +65,7 @@ private[spark] trait TaskScheduler { * indicating that the block manager should re-register. */ def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], - blockManagerId: BlockManagerId): Boolean - - /** - * Called when a thread dump has been received from an executor. - */ - def executorThreadDumpReceived(execId: String, stackTraces: Array[ThreadStackTrace]) + blockManagerId: BlockManagerId): Boolean /** * Get an application ID associated with the job. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 7c66a23af616d..2b39c7fc872da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -31,9 +31,10 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.util.{ThreadStackTrace, Utils} +import org.apache.spark.util.Utils import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId +import akka.actor.Props /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. @@ -341,10 +342,6 @@ private[spark] class TaskSchedulerImpl( dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId) } - override def executorThreadDumpReceived(execId: String, stackTraces: Array[ThreadStackTrace]) { - dagScheduler.executorThreadDumpReceived(execId, stackTraces) - } - def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) { taskSetManager.handleTaskGettingResult(tid) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index c69af69187b3e..58b78f041cd85 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -47,7 +47,7 @@ private[spark] class LocalActor( private var freeCores = totalCores - private val localExecutorId = "" + private val localExecutorId = "localhost" private val localExecutorHostname = "localhost" val executor = new Executor( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index d08e1419e3e41..b63c7f191155c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -88,6 +88,10 @@ class BlockManagerMaster( askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId)) } + def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = { + askDriverWithReply[Option[(String, Int)]](GetActorSystemHostPortForExecutor(executorId)) + } + /** * Remove a block from the slaves that have it. This can only be used to remove * blocks that the driver knows about. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 5e375a2553979..f8f1c8ae1dc83 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -86,6 +86,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case GetPeers(blockManagerId) => sender ! getPeers(blockManagerId) + case GetActorSystemHostPortForExecutor(executorId) => + sender ! getActorSystemHostPortForExecutor(executorId) + case GetMemoryStatus => sender ! memoryStatus @@ -412,6 +415,17 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus Seq.empty } } + + private def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = { + for ( + blockManagerId <- blockManagerIdByExecutor.get(executorId); + info <- blockManagerInfo.get(blockManagerId); + host <- info.slaveActor.path.address.host; + port <- info.slaveActor.path.address.port + ) yield { + (host, port) + } + } } @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 291ddfcc113ac..3f32099d08cc9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -92,6 +92,8 @@ private[spark] object BlockManagerMessages { case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster + case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster + case class RemoveExecutor(execId: String) extends ToBlockManagerMaster case object StopBlockManagerMaster extends ToBlockManagerMaster diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 6cd16e67824b6..edd6d1fef623b 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -28,7 +28,8 @@ import org.apache.spark.ui.{SparkUI, SparkUITab} private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { val executorsListener = parent.executorsListener val jobProgressListener = parent.jobProgressListener - val threadDumpEnabled = parent.conf.getBoolean("spark.executor.sendThreadDumps", true) + val threadDumpEnabled = parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true) + val sc = parent.sc attachPage(new ExecutorsPage(this, threadDumpEnabled)) if (threadDumpEnabled) { diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala index 64e7f1d2a4a8c..006d0c51532cd 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ThreadDumpPage.scala @@ -19,20 +19,23 @@ package org.apache.spark.ui.exec import javax.servlet.http.HttpServletRequest +import scala.util.Try import scala.xml.{Text, Node} import org.apache.spark.ui.{UIUtils, WebUIPage} class ThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") { + private val sc = parent.sc + def render(request: HttpServletRequest): Seq[Node] = { val executorId = Option(request.getParameter("executorId")).getOrElse { return Text(s"Missing executorId parameter") } - val maybeThreadDump = parent.jobProgressListener synchronized { - parent.jobProgressListener.executorIdToLastThreadDump.get(executorId) - } - val content = maybeThreadDump.map { case (time, threadDump) => + val time = System.currentTimeMillis() + val maybeThreadDump = Try(sc.get.getExecutorThreadDump(executorId)) + + val content = maybeThreadDump.map { threadDump => val dumpRows = threadDump.map { thread =>
@@ -62,7 +65,7 @@ class ThreadDumpPage(parent: ExecutorsTab) extends WebUIPage("threadDump") { }
{dumpRows}
- }.getOrElse(Text("No thread dump to display")) + }.getOrElse(Text("Error fetching thread dump")) UIUtils.headerSparkPage(s"Thread dump for executor $executorId", content, parent) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index a1a0b1c0d7fee..b5207360510dd 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -26,7 +26,6 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId import org.apache.spark.ui.jobs.UIData._ -import org.apache.spark.util.ThreadStackTrace /** * :: DeveloperApi :: @@ -66,9 +65,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() - /** Map entries are (last update timestamp, thread dump) pairs */ - val executorIdToLastThreadDump = HashMap[String, (Long, Array[ThreadStackTrace])]() - var schedulingMode: Option[SchedulingMode] = None def blockManagerIds = executorIdToBlockManagerId.values.toSeq @@ -269,11 +265,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.executorRunTime += timeDelta } - override def onExecutorThreadDump(executorThreadDump: SparkListenerExecutorThreadDump) { - val timeAndThreadDump = (System.currentTimeMillis(), executorThreadDump.threadStackTraces) - executorIdToLastThreadDump.put(executorThreadDump.execId, timeAndThreadDump) - } - override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) { val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), { diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index f41c8d0315cb3..625f490192c4d 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -201,4 +201,18 @@ private[spark] object AkkaUtils extends Logging { logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } + + def makeExecutorRef( + name: String, + conf: SparkConf, + host: String, + port: Int, + actorSystem: ActorSystem): ActorRef = { + val executorActorSystemName = SparkEnv.executorActorSystemName + Utils.checkHost(host, "Expected hostname") + val url = s"akka.tcp://$executorActorSystemName@$host:$port/user/$name" + val timeout = AkkaUtils.lookupTimeout(conf) + logInfo(s"Connecting to $name: $url") + Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) + } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8bd8e9412cd67..5b2e7d3a7edb9 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -74,7 +74,6 @@ private[spark] object JsonProtocol { // These aren't used, but keeps compiler happy case SparkListenerShutdown => JNothing case SparkListenerExecutorMetricsUpdate(_, _) => JNothing - case SparkListenerExecutorThreadDump(_, _) => JNothing } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index bde6e9b87f4f8..a2e4f712db55b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.{CallSite, ThreadStackTrace} +import org.apache.spark.util.CallSite import org.apache.spark.executor.TaskMetrics class BuggyDAGEventProcessActor extends Actor { @@ -83,7 +83,6 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def stop() = {} override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true - override def executorThreadDumpReceived(execId: String, threadDump: Array[ThreadStackTrace]) {} override def submitTasks(taskSet: TaskSet) = { // normally done by TaskSetManager taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) @@ -374,8 +373,6 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def defaultParallelism() = 2 override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true - override def executorThreadDumpReceived(execId: String, threadDump: Array[ThreadStackTrace]) { - } } val noKillScheduler = new DAGScheduler( sc, diff --git a/docs/configuration.md b/docs/configuration.md index d4c80bcc0fc91..4e8e3769b600f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -394,6 +394,13 @@ Apart from these, the following properties are also available, and may be useful Allows stages and corresponding jobs to be killed from the web ui. + + spark.ui.threadDumpsEnabled + true + + Allows executor and driver thread dumps to be collected and viewed from the web ui. + + spark.eventLog.enabled false @@ -655,13 +662,6 @@ Apart from these, the following properties are also available, and may be useful the driver know that the executor is still alive and update it with metrics for in-progress tasks. - - spark.executor.sendThreadDumps - true - If set to true, executors will periodically send thread dumps to the driver for display - in the web UI. The frequency of these dumps is controlled by - spark.executor.heartbeatInterval./td> - #### Networking