Skip to content

Commit

Permalink
Add ThreadDumpCollector
Browse files Browse the repository at this point in the history
  • Loading branch information
roczei committed Oct 14, 2024
1 parent 38f067d commit fcf3b89
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,7 @@ private[spark] object LogKeys {
case object TEMP_PATH extends LogKey
case object TEST_SIZE extends LogKey
case object THREAD extends LogKey
case object THREAD_DUMPS extends LogKey
case object THREAD_ID extends LogKey
case object THREAD_NAME extends LogKey
case object THREAD_POOL_KEEPALIVE_TIME extends LogKey
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ class SparkContext(config: SparkConf) extends Logging {
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
private var _heartbeater: Heartbeater = _
private var _driverThreadDumpCollector: ThreadDumpCollector = _
private var _resources: immutable.Map[String, ResourceInformation] = _
private var _shuffleDriverComponents: ShuffleDriverComponents = _
private var _plugins: Option[PluginContainer] = None
Expand Down Expand Up @@ -613,6 +614,15 @@ class SparkContext(config: SparkConf) extends Logging {
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
_heartbeater.start()

// Create and start the thread dump collector for the Spark driver
if (_conf.get(DRIVER_THREAD_DUMP_COLLECTOR_ENABLED)) {
_driverThreadDumpCollector = new ThreadDumpCollector(
() => ThreadDumpCollector.saveThreadDumps(env),
"driver-threadDumpCollector",
conf.get(THREAD_DUMP_COLLECTOR_INTERVAL))
_driverThreadDumpCollector.start()
}

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
Expand Down Expand Up @@ -2375,6 +2385,12 @@ class SparkContext(config: SparkConf) extends Logging {
}
_heartbeater = null
}
if (_conf.get(DRIVER_THREAD_DUMP_COLLECTOR_ENABLED) && _driverThreadDumpCollector != null) {
Utils.tryLogNonFatalError {
_driverThreadDumpCollector.stop()
}
_driverThreadDumpCollector = null
}
if (env != null && _heartbeatReceiver != null) {
Utils.tryLogNonFatalError {
env.rpcEnv.stop(_heartbeatReceiver)
Expand Down
102 changes: 102 additions & 0 deletions core/src/main/scala/org/apache/spark/ThreadDumpCollector.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.nio.charset.StandardCharsets
import java.time.Instant
import java.util.concurrent.TimeUnit

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.config.{THREAD_DUMP_COLLECTOR_DIR,
THREAD_DUMP_COLLECTOR_OUTPUT_TYPE}
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Creates a thread dump collector thread which will call the specified collectThreadDumps
* function at intervals of intervalMs.
*
* @param collectThreadDumps the thread dump collector function to call.
* @param name the thread name for the thread dump collector.
* @param intervalMs the interval between stack trace collections.
*/
private[spark] class ThreadDumpCollector(
collectThreadDumps: () => Unit,
name: String,
intervalMs: Long) extends Logging {
// Executor for the thread dump collector task
private val threadDumpCollector = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name)

/** Schedules a task to collect the thread dumps */
def start(): Unit = {
val threadDumpCollectorTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(collectThreadDumps())
}
threadDumpCollector.scheduleAtFixedRate(threadDumpCollectorTask, intervalMs, intervalMs,
TimeUnit.MILLISECONDS)
}

def stop(): Unit = {
threadDumpCollector.shutdown()
threadDumpCollector.awaitTermination(10, TimeUnit.SECONDS)
}

}

private[spark] object ThreadDumpCollector extends Logging {
def saveThreadDumps(env: SparkEnv): Unit = {
env.conf.get(THREAD_DUMP_COLLECTOR_OUTPUT_TYPE) match {
case "LOG" => writeThreadDumpsToLog(env)
case "FILE" => writeThreadDumpsToFile(env)
}
}

/** Print all thread dumps */
private def writeThreadDumpsToLog(env: SparkEnv): Unit = {
logWarning(log"Thread dumps from ${MDC(LogKeys.EXECUTOR_ID, env.executorId)}:\n" +
log"${MDC(LogKeys.THREAD_DUMPS, Utils.getThreadDump().map(_.toString).mkString)}")
}

private def writeThreadDumpsToFile(env: SparkEnv): Unit = {
val timestamp = Instant.now().getEpochSecond().toString()
val threadDumpFileName = env.conf.getAppId + "-" + env.executorId + "-" + timestamp + ".txt"
val collectedThreadDump = Utils.getThreadDump().map(_.toString).mkString
val hadoopConf = SparkHadoopUtil.get.newConfiguration(env.conf)
val rootDir = env.conf.get(THREAD_DUMP_COLLECTOR_DIR)
val fileSystem: FileSystem = new Path(rootDir).getFileSystem(hadoopConf)
val threadDumpFilePermissions = new FsPermission(Integer.parseInt("770", 8).toShort)
val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, threadDumpFileName))
try {
val outputStream = SparkHadoopUtil.createFile(fileSystem, dfsLogFile, true)
fileSystem.setPermission(dfsLogFile, threadDumpFilePermissions)
outputStream.write(collectedThreadDump.getBytes(StandardCharsets
.UTF_8))
outputStream.close()
} catch {
case e: Exception =>
logError(
log"Could not save thread dumps into file from executor ${
MDC(LogKeys.EXECUTOR_ID,
env.executorId)
}", e)
}
}
}
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ private[spark] class Executor(
log"${LogMDC(OS_ARCH, System.getProperty("os.arch"))}")
logInfo(log"Java version ${LogMDC(JAVA_VERSION, System.getProperty("java.version"))}")

private var executorThreadDumpCollector: ThreadDumpCollector = _

private val executorShutdown = new AtomicBoolean(false)
val stopHookReference = ShutdownHookManager.addShutdownHook(
() => stop()
Expand Down Expand Up @@ -325,6 +327,15 @@ private[spark] class Executor(

heartbeater.start()

// Create and start the thread dump collector for the Spark executor
if (conf.get(EXECUTOR_THREAD_DUMP_COLLECTOR_ENABLED)) {
executorThreadDumpCollector = new ThreadDumpCollector(
() => ThreadDumpCollector.saveThreadDumps(env),
"executor-ThreadDumpCollector",
conf.get(THREAD_DUMP_COLLECTOR_INTERVAL))
executorThreadDumpCollector.start()
}

private val appStartTime = conf.getLong("spark.app.startTime", 0)

// To allow users to distribute plugins and their required files
Expand Down Expand Up @@ -445,6 +456,15 @@ private[spark] class Executor(
case NonFatal(e) =>
logWarning("Unable to stop heartbeater", e)
}
try {
if (conf.get(EXECUTOR_THREAD_DUMP_COLLECTOR_ENABLED) && executorThreadDumpCollector !=
null) {
executorThreadDumpCollector.stop()
}
} catch {
case NonFatal(e) =>
logWarning("Unable to stop the executor thread dump collector", e)
}
ShuffleBlockPusher.stop()
if (threadPool != null) {
threadPool.shutdown()
Expand Down
39 changes: 39 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2801,4 +2801,43 @@ package object config {
.version("4.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val DRIVER_THREAD_DUMP_COLLECTOR_ENABLED = ConfigBuilder("spark.driver" +
".threadDumpCollector.enabled")
.doc("Whether to enable automatic thread dump collection for driver")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

private[spark] val EXECUTOR_THREAD_DUMP_COLLECTOR_ENABLED = ConfigBuilder("spark.executor" +
".threadDumpCollector.enabled")
.doc("Whether to enable automatic thread dump collection for each executor")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

private[spark] val THREAD_DUMP_COLLECTOR_INTERVAL =
ConfigBuilder("spark.threadDumpCollectorInterval")
.doc("The interval of time between two thread dump collection.")
.version("4.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(_ > 0, "Value should be positive")
.createWithDefaultString("10s")

private[spark] val THREAD_DUMP_COLLECTOR_DIR = ConfigBuilder("spark.threadDumpCollector.dir")
.doc("Set the default directory for saving the thread dump files.")
.version("4.0.0")
.stringConf
.createWithDefault("file:/tmp/spark-thread-dumps")

private[spark] val THREAD_DUMP_COLLECTOR_OUTPUT_TYPE =
ConfigBuilder("spark.threadDumpCollector.output.type")
.doc("Specifies the type of saving the thread dumps. Can be either LOG (the default) or " +
"FILE")
.version("4.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(Set("LOG", "FILE"))
.createWithDefault("LOG")

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

class ThreadDumpCollectorSuite {

}

0 comments on commit fcf3b89

Please sign in to comment.