From c097ddcb963bff611221ebc74da4e1f82c87b024 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 24 Dec 2014 09:46:28 +0800 Subject: [PATCH] Address the comments --- .../spark/streaming/StreamingSource.scala | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala index ba9c29fe3d944..e4053c5747971 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -28,58 +28,58 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { private val streamingListener = ssc.progressListener - private def registerGauge[T](name: String, f: StreamingJobProgressListener => T, + private def registerGauge[T](name: String, f: StreamingJobProgressListener => Option[T], defaultValue: T) { metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] { - override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue) + override def getValue: T = f(streamingListener).getOrElse(defaultValue) }) } // Gauge for number of network receivers - registerGauge("receivers", _.numReceivers, 0) + registerGauge("receivers", l => Option(l.numReceivers), 0) // Gauge for number of total completed batches - registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L) + registerGauge("totalCompletedBatches", l => Option(l.numTotalCompletedBatches), 0L) // Gauge for number of unprocessed batches - registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L) + registerGauge("unprocessedBatches", l => Option(l.numUnprocessedBatches), 0L) // Gauge for number of waiting batches - registerGauge("waitingBatches", _.waitingBatches.size, 0L) + registerGauge("waitingBatches", l => Option(l.waitingBatches.size), 0L) // Gauge for number of running batches - registerGauge("runningBatches", _.runningBatches.size, 0L) + registerGauge("runningBatches", l => Option(l.runningBatches.size), 0L) // Gauge for number of retained completed batches - registerGauge("retainedCompletedBatches", _.retainedCompletedBatches.size, 0L) + registerGauge("retainedCompletedBatches", l => Option(l.retainedCompletedBatches.size), 0L) // Gauge for last completed batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. registerGauge("lastCompletedBatch_submissionTime", - _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) - registerGauge("lastCompletedBatch_processStartTime", - _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) - registerGauge("lastCompletedBatch_processEndTime", - _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + _.lastCompletedBatch.map(_.submissionTime), -1L) + registerGauge("lastCompletedBatch_processingStartTime", + _.lastCompletedBatch.flatMap(_.processingStartTime), -1L) + registerGauge("lastCompletedBatch_processingEndTime", + _.lastCompletedBatch.flatMap(_.processingEndTime), -1L) // Gauge for last completed batch's delay information. - registerGauge("lastCompletedBatch_processTime", - _.lastCompletedBatch.flatMap(_.processingDelay).getOrElse(0L), -1L) + registerGauge("lastCompletedBatch_processingTime", + _.lastCompletedBatch.flatMap(_.processingDelay), -1L) registerGauge("lastCompletedBatch_schedulingDelay", - _.lastCompletedBatch.flatMap(_.schedulingDelay).getOrElse(0L), -1L) + _.lastCompletedBatch.flatMap(_.schedulingDelay), -1L) registerGauge("lastCompletedBatch_totalDelay", - _.lastCompletedBatch.flatMap(_.totalDelay).getOrElse(0L), -1L) + _.lastCompletedBatch.flatMap(_.totalDelay), -1L) // Gauge for last received batch, useful for monitoring the streaming job's running status, // displayed data -1 for any abnormal condition. registerGauge("lastReceivedBatch_submissionTime", - _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L) - registerGauge("lastReceivedBatch_processStartTime", - _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L) - registerGauge("lastReceivedBatch_processEndTime", - _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L) + _.lastCompletedBatch.map(_.submissionTime), -1L) + registerGauge("lastReceivedBatch_processingStartTime", + _.lastCompletedBatch.flatMap(_.processingStartTime), -1L) + registerGauge("lastReceivedBatch_processingEndTime", + _.lastCompletedBatch.flatMap(_.processingEndTime), -1L) // Gauge for last received batch records and total received batch records. - registerGauge("lastReceivedBatchRecords", _.lastReceivedBatchRecords.values.sum, 0L) - registerGauge("totalReceivedBatchRecords", _.numTotalReceivedBatchRecords, 0L) + registerGauge("lastReceivedBatchRecords", l => Option(l.lastReceivedBatchRecords.values.sum), 0L) + registerGauge("totalReceivedBatchRecords", l => Option(l.numTotalReceivedBatchRecords), 0L) }