From f4660c5cb41d9b5ef737b38e7e38abf3b2f2e31c Mon Sep 17 00:00:00 2001 From: joyyoj Date: Tue, 3 Jun 2014 21:15:11 +0800 Subject: [PATCH 1/2] [SPARK-1998] SparkFlumeEvent with body bigger than 1020 bytes are not read properly --- .../org/apache/spark/streaming/flume/FlumeInputDStream.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 5be33f1d5c428..ed35e34ad45ab 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -71,12 +71,12 @@ class SparkFlumeEvent() extends Externalizable { for (i <- 0 until numHeaders) { val keyLength = in.readInt() val keyBuff = new Array[Byte](keyLength) - in.read(keyBuff) + in.readFully(keyBuff) val key : String = Utils.deserialize(keyBuff) val valLength = in.readInt() val valBuff = new Array[Byte](valLength) - in.read(valBuff) + in.readFully(valBuff) val value : String = Utils.deserialize(valBuff) headers.put(key, value) From 0a156c581c72ca6905da4a0b04334d892a599ee8 Mon Sep 17 00:00:00 2001 From: joyyoj Date: Thu, 31 Jul 2014 23:39:53 +0800 Subject: [PATCH 2/2] SPARK-2381 Exit if the receiver job failed --- .../spark/streaming/scheduler/ReceiverTracker.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 5307fe189d717..0c563a4a9947e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -272,7 +272,15 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { // Distribute the receivers and start them logInfo("Starting " + receivers.length + " receivers") - ssc.sparkContext.runJob(tempRDD, startReceiver) + try { + ssc.sparkContext.runJob(tempRDD, startReceiver) + } catch { + case e : Throwable => + logError("receiver job crashed, detail:" + e.getStackTraceString) + // find a better way to stop the application ? make other stages depend on tempRdd ? + ssc.stop() + System.exit(1) + } logInfo("All of the receivers have been terminated") }