diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 0c749716de953..d0e792e40c45a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -565,11 +565,8 @@ class StreamingContext private[streaming] ( state = StreamingContextState.ACTIVE StreamingContext.setActiveContext(this) } - shutdownHookRef = Utils.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY) { () => - logInfo("Invoking stop() from shutdown hook") - // Do not stop SparkContext, let its own shutdown hook stop it - stop(stopSparkContext = false) - } + shutdownHookRef = Utils.addShutdownHook( + StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") @@ -659,6 +656,13 @@ class StreamingContext private[streaming] ( state = STOPPED } } + + private def stopOnShutdown(): Unit = { + val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false) + logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook") + // Do not stop SparkContext, let its own shutdown hook stop it + stop(stopSparkContext = false, stopGracefully = stopGracefully) + } } /**