Skip to content

Commit

Permalink
Address the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Dec 24, 2014
1 parent 02dd44f commit c097ddc
Showing 1 changed file with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,58 +28,58 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {

private val streamingListener = ssc.progressListener

private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
private def registerGauge[T](name: String, f: StreamingJobProgressListener => Option[T],
defaultValue: T) {
metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] {
override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue)
override def getValue: T = f(streamingListener).getOrElse(defaultValue)
})
}

// Gauge for number of network receivers
registerGauge("receivers", _.numReceivers, 0)
registerGauge("receivers", l => Option(l.numReceivers), 0)

// Gauge for number of total completed batches
registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
registerGauge("totalCompletedBatches", l => Option(l.numTotalCompletedBatches), 0L)

// Gauge for number of unprocessed batches
registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
registerGauge("unprocessedBatches", l => Option(l.numUnprocessedBatches), 0L)

// Gauge for number of waiting batches
registerGauge("waitingBatches", _.waitingBatches.size, 0L)
registerGauge("waitingBatches", l => Option(l.waitingBatches.size), 0L)

// Gauge for number of running batches
registerGauge("runningBatches", _.runningBatches.size, 0L)
registerGauge("runningBatches", l => Option(l.runningBatches.size), 0L)

// Gauge for number of retained completed batches
registerGauge("retainedCompletedBatches", _.retainedCompletedBatches.size, 0L)
registerGauge("retainedCompletedBatches", l => Option(l.retainedCompletedBatches.size), 0L)

// Gauge for last completed batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
registerGauge("lastCompletedBatch_submissionTime",
_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
registerGauge("lastCompletedBatch_processStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
registerGauge("lastCompletedBatch_processEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
_.lastCompletedBatch.map(_.submissionTime), -1L)
registerGauge("lastCompletedBatch_processingStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
registerGauge("lastCompletedBatch_processingEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)

// Gauge for last completed batch's delay information.
registerGauge("lastCompletedBatch_processTime",
_.lastCompletedBatch.flatMap(_.processingDelay).getOrElse(0L), -1L)
registerGauge("lastCompletedBatch_processingTime",
_.lastCompletedBatch.flatMap(_.processingDelay), -1L)
registerGauge("lastCompletedBatch_schedulingDelay",
_.lastCompletedBatch.flatMap(_.schedulingDelay).getOrElse(0L), -1L)
_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L)
registerGauge("lastCompletedBatch_totalDelay",
_.lastCompletedBatch.flatMap(_.totalDelay).getOrElse(0L), -1L)
_.lastCompletedBatch.flatMap(_.totalDelay), -1L)

// Gauge for last received batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
registerGauge("lastReceivedBatch_submissionTime",
_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
registerGauge("lastReceivedBatch_processStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
registerGauge("lastReceivedBatch_processEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
_.lastCompletedBatch.map(_.submissionTime), -1L)
registerGauge("lastReceivedBatch_processingStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
registerGauge("lastReceivedBatch_processingEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)

// Gauge for last received batch records and total received batch records.
registerGauge("lastReceivedBatchRecords", _.lastReceivedBatchRecords.values.sum, 0L)
registerGauge("totalReceivedBatchRecords", _.numTotalReceivedBatchRecords, 0L)
registerGauge("lastReceivedBatchRecords", l => Option(l.lastReceivedBatchRecords.values.sum), 0L)
registerGauge("totalReceivedBatchRecords", l => Option(l.numTotalReceivedBatchRecords), 0L)
}

0 comments on commit c097ddc

Please sign in to comment.