diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 5307fe189d717..0c563a4a9947e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -272,7 +272,15 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { // Distribute the receivers and start them logInfo("Starting " + receivers.length + " receivers") - ssc.sparkContext.runJob(tempRDD, startReceiver) + try { + ssc.sparkContext.runJob(tempRDD, startReceiver) + } catch { + case e : Throwable => + logError("receiver job crashed, detail:" + e.getStackTraceString) + // find a better way to stop the application ? make other stages depend on tempRdd ? + ssc.stop() + System.exit(1) + } logInfo("All of the receivers have been terminated") }