Skip to content

Commit

Permalink
Added conf to specify graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed May 21, 2015
1 parent 4c18652 commit e3d5475
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,8 @@ class StreamingContext private[streaming] (
state = StreamingContextState.ACTIVE
StreamingContext.setActiveContext(this)
}
shutdownHookRef = Utils.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false)
}
shutdownHookRef = Utils.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
Expand Down Expand Up @@ -659,6 +656,13 @@ class StreamingContext private[streaming] (
state = STOPPED
}
}

private def stopOnShutdown(): Unit = {
val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
}
}

/**
Expand Down

0 comments on commit e3d5475

Please sign in to comment.