Skip to content

Commit

Permalink
Fixed trigger details bug
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Oct 13, 2016
1 parent cafbeb7 commit 49da4b1
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam
require(currentTriggerStartTimestamp >= 0)
val currentTriggerFinishTimestamp = triggerClock.getTimeMillis()
reportTriggerDetail(FINISH_TIMESTAMP, currentTriggerFinishTimestamp)
reportTriggerDetail(STATUS_MESSAGE, "")
triggerDetails.remove(STATUS_MESSAGE)
reportTriggerDetail(IS_TRIGGER_ACTIVE, false)

// Report number of rows
Expand Down Expand Up @@ -190,6 +190,7 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam
}

def stop(): Unit = synchronized {
triggerDetails.clear()
inputRates.valuesIterator.foreach { _.stop() }
processingRates.valuesIterator.foreach { _.stop() }
latency = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,35 @@ import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.ManualClock

class StreamMetricsSuite extends SparkFunSuite {
import StreamMetrics._

// To make === between double tolerate inexact values
implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)

test("rates and latencies - basic life cycle") {
test("rates, latencies, trigger details - basic life cycle") {
val sm = newStreamMetrics(source)
assert(sm.currentInputRate() === 0.0)
assert(sm.currentProcessingRate() === 0.0)
assert(sm.currentSourceInputRate(source) === 0.0)
assert(sm.currentSourceProcessingRate(source) === 0.0)
assert(sm.currentLatency() === None)
assert(sm.currentTriggerDetails().isEmpty)

// When trigger started, the rates should not change
// When trigger started, the rates should not change, but should return
// reported trigger details
sm.reportTriggerStarted(1)
sm.reportTriggerDetail("key", "value")
sm.reportSourceTriggerDetail(source, "key2", "value2")
assert(sm.currentInputRate() === 0.0)
assert(sm.currentProcessingRate() === 0.0)
assert(sm.currentSourceInputRate(source) === 0.0)
assert(sm.currentSourceProcessingRate(source) === 0.0)
assert(sm.currentLatency() === None)
assert(sm.currentTriggerDetails() ===
Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
START_TIMESTAMP -> "0", "key" -> "value"))
assert(sm.currentSourceTriggerDetails(source) ===
Map(TRIGGER_ID -> "1", "key2" -> "value2"))

// Finishing the trigger should calculate the rates, except input rate which needs
// to have another trigger interval
Expand All @@ -55,10 +65,24 @@ class StreamMetricsSuite extends SparkFunSuite {
assert(sm.currentSourceInputRate(source) === 0.0)
assert(sm.currentSourceProcessingRate(source) === 100.0)
assert(sm.currentLatency() === None)

// Another trigger should calculate the input rate
assert(sm.currentTriggerDetails() ===
Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000",
NUM_INPUT_ROWS -> "100", "key" -> "value"))
assert(sm.currentSourceTriggerDetails(source) ===
Map(TRIGGER_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))

// After another trigger starts, the rates and latencies should not change until
// new rows are reported
clock.advance(1000)
sm.reportTriggerStarted(2)
assert(sm.currentInputRate() === 0.0)
assert(sm.currentProcessingRate() === 100.0)
assert(sm.currentSourceInputRate(source) === 0.0)
assert(sm.currentSourceProcessingRate(source) === 100.0)
assert(sm.currentLatency() === None)

// Reporting new rows should update the rates and latencies
sm.reportNumInputRows(Map(source -> 200L)) // 200 input rows
clock.advance(500)
sm.reportTriggerFinished()
Expand All @@ -75,6 +99,7 @@ class StreamMetricsSuite extends SparkFunSuite {
assert(sm.currentSourceInputRate(source) === 0.0)
assert(sm.currentSourceProcessingRate(source) === 0.0)
assert(sm.currentLatency() === None)
assert(sm.currentTriggerDetails().isEmpty)
}

test("rates and latencies - after trigger with no data") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString),
AssertOnQuery(_.status.triggerDetails.isEmpty),

StartStream(),
AddData(inputData, 0),
Expand Down

0 comments on commit 49da4b1

Please sign in to comment.