diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 69baadcb27c28..5f6854afa0207 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -116,7 +116,7 @@ private[streaming] class FlumePollingReceiver( logDebug("Stored events with seq:" + seq) j += 1 } - logDebug("Sending ack for: " +seq) + logDebug("Sending ack for sequence number: " +seq) // Send an ack to Flume so that Flume discards the events from its channels. client.ack(seq) logDebug("Ack sent for sequence number: " + seq) @@ -124,7 +124,9 @@ private[streaming] class FlumePollingReceiver( case e: Exception => try { // Let Flume know that the events need to be pushed back into the channel. + logDebug("Sending nack for sequence number: " + seq) client.nack(seq) // If the agent is down, even this could fail and throw + logDebug("Nack sent for sequence number: " + seq) } catch { case e: Exception => logError( "Sending Nack also failed. A Flume agent is down.")