Skip to content

Commit

Permalink
Change the code style and add totalProcessedRecords
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Dec 26, 2014
1 parent 44721a6 commit 00f5f7f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
private val streamingListener = ssc.progressListener

private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
defaultValue: T) {
metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] {
override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue)
})
defaultValue: T): Unit = {
registerGaugeWithOption[T](name,
(l: StreamingJobProgressListener) => Option(f(streamingListener)), defaultValue)
}

private def registerGaugeWithOption[T](
Expand All @@ -50,6 +49,12 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
// Gauge for number of total completed batches
registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)

// Gauge for number of total received records
registerGauge("totalReceivedRecords", _.numTotalReceivedRecords, 0L)

// Gauge for number of total processed records
registerGauge("totalProcessedRecords", _.numTotalProcessedRecords, 0L)

// Gauge for number of unprocessed batches
registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)

Expand Down Expand Up @@ -88,7 +93,6 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
registerGaugeWithOption("lastReceivedBatch_processingEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)

// Gauge for last received batch records and total received batch records.
registerGauge("lastReceivedBatchRecords", _.lastReceivedBatchRecords.values.sum, 0L)
registerGauge("totalReceivedBatchRecords", _.numTotalReceivedBatchRecords, 0L)
// Gauge for last received batch records.
registerGauge("lastReceivedBatch_records", _.lastReceivedBatchRecords.values.sum, 0L)
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private val completedaBatchInfos = new Queue[BatchInfo]
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private var totalCompletedBatches = 0L
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
private var totalReceivedBatchRecords = 0L

val batchDuration = ssc.graph.batchDuration.milliseconds

Expand Down Expand Up @@ -67,7 +68,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)

batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
totalReceivedBatchRecords += infos.map(_.numRecords).sum
totalReceivedRecords += infos.map(_.numRecords).sum
}
}

Expand All @@ -77,6 +78,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
totalCompletedBatches += 1L

batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
totalProcessedRecords += infos.map(_.numRecords).sum
}
}

def numReceivers = synchronized {
Expand All @@ -87,8 +92,12 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
totalCompletedBatches
}

def numTotalReceivedBatchRecords: Long = synchronized {
totalReceivedBatchRecords
def numTotalReceivedRecords: Long = synchronized {
totalReceivedRecords
}

def numTotalProcessedRecords: Long = synchronized {
totalProcessedRecords
}

def numUnprocessedBatches: Long = synchronized {
Expand Down

0 comments on commit 00f5f7f

Please sign in to comment.