Skip to content

Commit

Permalink
Expand StreamingSource to add more metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 26, 2014
1 parent bf1a6aa commit c7a9376
Showing 1 changed file with 18 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
registerGauge("lastCompletedBatch_processEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)

// Gauge for last completed batch's delay information.
registerGauge("lastCompletedBatch_processTime",
_.lastCompletedBatch.flatMap(_.processingDelay).getOrElse(0L), -1L)
registerGauge("lastCompletedBatch_schedulingDelay",
_.lastCompletedBatch.flatMap(_.schedulingDelay).getOrElse(0L), -1L)
registerGauge("lastCompletedBatch_totalDelay",
_.lastCompletedBatch.flatMap(_.totalDelay).getOrElse(0L), -1L)

// Gauge for last received batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
registerGauge("lastReceivedBatch_submissionTime",
Expand All @@ -70,4 +78,14 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
registerGauge("lastReceivedBatch_processEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)

// Gauge for last received batch records and total received batch records.
private var totalReceivedBatchRecords: Long = 0L
def getTotalReceivedBatchRecords(listener: StreamingJobProgressListener): Long = {
totalReceivedBatchRecords += listener.lastReceivedBatchRecords.values.sum
totalReceivedBatchRecords
}

registerGauge("lastReceivedBatchRecords", _.lastReceivedBatchRecords.values.sum, 0L)
registerGauge("totalReceivedBatchRecords", getTotalReceivedBatchRecords, 0L)
}

0 comments on commit c7a9376

Please sign in to comment.