From 8c00289dc78dea11e9ce4aed4326e43cf2b64c01 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 15 Jul 2014 00:02:31 -0700 Subject: [PATCH] More debug messages --- .../spark/streaming/flume/FlumePollingInputDStream.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 69baadcb27c28..5f6854afa0207 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -116,7 +116,7 @@ private[streaming] class FlumePollingReceiver( logDebug("Stored events with seq:" + seq) j += 1 } - logDebug("Sending ack for: " +seq) + logDebug("Sending ack for sequence number: " +seq) // Send an ack to Flume so that Flume discards the events from its channels. client.ack(seq) logDebug("Ack sent for sequence number: " + seq) @@ -124,7 +124,9 @@ private[streaming] class FlumePollingReceiver( case e: Exception => try { // Let Flume know that the events need to be pushed back into the channel. + logDebug("Sending nack for sequence number: " + seq) client.nack(seq) // If the agent is down, even this could fail and throw + logDebug("Nack sent for sequence number: " + seq) } catch { case e: Exception => logError( "Sending Nack also failed. A Flume agent is down.")