Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races. #5560

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.SignalLogger
import org.apache.spark.util.{SignalLogger, Utils}

/**
* A web server that renders SparkUIs of completed applications.
Expand Down Expand Up @@ -194,9 +194,7 @@ object HistoryServer extends Logging {
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()

Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
override def run(): Unit = server.stop()
})
Utils.addShutdownHook { () => server.stop() }

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.io.Files
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.util.Utils
import org.apache.spark.util.logging.FileAppender

/**
Expand Down Expand Up @@ -61,20 +62,15 @@ private[deploy] class ExecutorRunner(

// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
// make sense to remove this in the future.
private var shutdownHook: Thread = null
private var shutdownHook: AnyRef = null

private[worker] def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) }
}

/**
Expand Down Expand Up @@ -106,7 +102,7 @@ private[deploy] class ExecutorRunner(
workerThread = null
state = ExecutorState.KILLED
try {
Runtime.getRuntime.removeShutdownHook(shutdownHook)
Utils.removeShutdownHook(shutdownHook)
} catch {
case e: IllegalStateException => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,25 +138,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
}

private def addShutdownHook(): Thread = {
val shutdownHook = new Thread("delete Spark local dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
DiskBlockManager.this.doStop()
}
private def addShutdownHook(): AnyRef = {
Utils.addShutdownHook { () =>
logDebug("Shutdown hook called")
DiskBlockManager.this.doStop()
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
shutdownHook
}

/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop() {
// Remove the shutdown hook. It causes memory leaks if we leave it around.
try {
Runtime.getRuntime.removeShutdownHook(shutdownHook)
} catch {
case e: IllegalStateException => None
}
Utils.removeShutdownHook(shutdownHook)
doStop()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,19 @@ private[spark] class TachyonBlockManager(

private def addShutdownHook() {
tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
tachyonDirs.foreach { tachyonDir =>
try {
if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
Utils.deleteRecursively(tachyonDir, client)
}
} catch {
case e: Exception =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
Utils.addShutdownHook { () =>
logDebug("Shutdown hook called")
tachyonDirs.foreach { tachyonDir =>
try {
if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
Utils.deleteRecursively(tachyonDir, client)
}
} catch {
case e: Exception =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
client.close()
}
})
client.close()
}
}
}
127 changes: 109 additions & 18 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.{Properties, Locale, Random, UUID}
import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection

Expand All @@ -30,7 +30,7 @@ import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
import scala.util.Try
import scala.util.{Failure, Success, Try}
import scala.util.control.{ControlThrowable, NonFatal}

import com.google.common.io.{ByteStreams, Files}
Expand Down Expand Up @@ -64,9 +64,15 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()

val DEFAULT_SHUTDOWN_PRIORITY = 100

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null


private val shutdownHooks = new SparkShutdownHookManager()
shutdownHooks.install()

/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
Expand Down Expand Up @@ -176,18 +182,16 @@ private[spark] object Utils extends Logging {
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()

// Add a shutdown hook to delete the temp dirs when the JVM exits
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
shutdownDeletePaths.foreach { dirPath =>
try {
Utils.deleteRecursively(new File(dirPath))
} catch {
case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
}
addShutdownHook { () =>
logDebug("Shutdown hook called")
shutdownDeletePaths.foreach { dirPath =>
try {
Utils.deleteRecursively(new File(dirPath))
} catch {
case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
}
}
})
}

// Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) {
Expand Down Expand Up @@ -613,7 +617,7 @@ private[spark] object Utils extends Logging {
}
Utils.setupSecureURLConnection(uc, securityMgr)

val timeoutMs =
val timeoutMs =
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
uc.setConnectTimeout(timeoutMs)
uc.setReadTimeout(timeoutMs)
Expand Down Expand Up @@ -1172,7 +1176,7 @@ private[spark] object Utils extends Logging {
/**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
*
*
* NOTE: This method is to be called by the spark-started JVM process.
*/
def tryOrExit(block: => Unit) {
Expand All @@ -1185,11 +1189,11 @@ private[spark] object Utils extends Logging {
}

/**
* Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
* Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
* exception
*
* NOTE: This method is to be called by the driver-side components to avoid stopping the
* user-started JVM process completely; in contrast, tryOrExit is to be called in the
*
* NOTE: This method is to be called by the driver-side components to avoid stopping the
* user-started JVM process completely; in contrast, tryOrExit is to be called in the
* spark-started JVM process .
*/
def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
Expand Down Expand Up @@ -2132,6 +2136,93 @@ private[spark] object Utils extends Logging {
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
}

/**
* Adds a shutdown hook with default priority.
*/
def addShutdownHook(hook: () => Unit): AnyRef = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looked weird to return AnyRef, but I suppose that while callers need a handle to the hook, they don't need to be promised anything about what it is.

addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
}

/**
* Adds a shutdown hook with the given priority. Hooks with lower priority values run
* first.
*/
def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
shutdownHooks.add(priority, hook)
}

/**
* Remove a previously installed shutdown hook.
*/
def removeShutdownHook(ref: AnyRef): Boolean = {
shutdownHooks.remove(ref)
}

}

private [util] class SparkShutdownHookManager {

private val hooks = new PriorityQueue[SparkShutdownHook]()
private var shuttingDown = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use this to reimplement or get rid of Utils.inShutdown()? I'd love to manage to refactor that as part of this effort.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially, but the semantics are slightly different. inShutdown returns whether the JVM is running shutdown hooks; this variable tracks whether the Spark shutdown hooks are running right now. So, if for example the HDFS hook is running, this would still be false, but inShutdown would return true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Actually the HDFS example is a bad one since these hooks would run first, but you get the idea.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, 3 of the 4 uses of inShutdown look like they could be replaced easily without risk. But one controls calling System.exit in an uncaught exception handler, which apparently deadlocks when actually in JVM shutdown. Darn. Unless you have a bright idea about working around that I don't know if we can co-opt and replace use of Utils.inShutdown


/**
* Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
* have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
* the best.
*/
def install(): Unit = {
val hookTask = new Runnable() {
override def run(): Unit = runAll()
}
Try(Class.forName("org.apache.hadoop.util.ShutdownHookManager")) match {
case Success(shmClass) =>
val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
.asInstanceOf[Int]
val shm = shmClass.getMethod("get").invoke(null)
shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
.invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))

case Failure(_) =>
Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
}
}

def runAll(): Unit = synchronized {
shuttingDown = true
while (!hooks.isEmpty()) {
Utils.logUncaughtExceptions(hooks.poll().run())
}
}

def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
checkState()
val hookRef = new SparkShutdownHook(priority, hook)
hooks.add(hookRef)
hookRef
}

def remove(ref: AnyRef): Boolean = synchronized {
checkState()
hooks.remove(ref)
}

private def checkState(): Unit = {
if (shuttingDown) {
throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
}
}

}

private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
extends Comparable[SparkShutdownHook] {

override def compareTo(other: SparkShutdownHook): Int = {
other.priority - priority
}

def run(): Unit = hook()

}

/**
Expand Down
32 changes: 24 additions & 8 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.util

import scala.util.Random

import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
import java.text.DecimalFormatSymbols
import java.util.concurrent.TimeUnit
import java.util.Locale
import java.util.PriorityQueue

import scala.collection.mutable.ListBuffer
import scala.util.Random

import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
Expand All @@ -36,14 +38,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf

class UtilsSuite extends FunSuite with ResetSystemProperties {

test("timeConversion") {
// Test -1
assert(Utils.timeStringAsSeconds("-1") === -1)

// Test zero
assert(Utils.timeStringAsSeconds("0") === 0)

assert(Utils.timeStringAsSeconds("1") === 1)
assert(Utils.timeStringAsSeconds("1s") === 1)
assert(Utils.timeStringAsSeconds("1000ms") === 1)
Expand All @@ -52,7 +54,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1))
assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1))
assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1))

assert(Utils.timeStringAsMs("1") === 1)
assert(Utils.timeStringAsMs("1ms") === 1)
assert(Utils.timeStringAsMs("1000us") === 1)
Expand All @@ -61,7 +63,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1))
assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1))
assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1))

// Test invalid strings
intercept[NumberFormatException] {
Utils.timeStringAsMs("This breaks 600s")
Expand All @@ -79,7 +81,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
Utils.timeStringAsMs("This 123s breaks")
}
}

test("bytesToString") {
assert(Utils.bytesToString(10) === "10.0 B")
assert(Utils.bytesToString(1500) === "1500.0 B")
Expand Down Expand Up @@ -466,4 +468,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
val newFileName = new File(testFileDir, testFileName)
assert(newFileName.isFile())
}

test("shutdown hook manager") {
val manager = new SparkShutdownHookManager()
val output = new ListBuffer[Int]()

val hook1 = manager.add(1, () => output += 1)
manager.add(3, () => output += 3)
manager.add(2, () => output += 2)
manager.add(4, () => output += 4)
manager.remove(hook1)

manager.runAll()
assert(output.toList === List(4, 3, 2))
}
}
Loading