Skip to content

Commit

Permalink
Limit the number of StreamingQueryListener.StreamProgressEvent when t…
Browse files Browse the repository at this point in the history
…here is no data
  • Loading branch information
zsxwing committed Dec 1, 2016
1 parent 2ab8551 commit 4ace2bd
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class StreamExecution(

private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay

private val noDataReportInterval = sparkSession.sessionState.conf.streamingNoDataReportInterval

/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
*/
Expand Down Expand Up @@ -196,6 +198,9 @@ class StreamExecution(
// While active, repeatedly attempt to run batches.
SparkSession.setActiveSession(sparkSession)

// The timestamp we report an event that has no input data
var noDataReportTimestamp = Long.MinValue

triggerExecutor.execute(() => {
startTrigger()

Expand All @@ -218,7 +223,15 @@ class StreamExecution(

// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
postEvent(new QueryProgressEvent(lastProgress))
if (dataAvailable) {
postEvent(new QueryProgressEvent(lastProgress))
} else {
val now = triggerClock.getTimeMillis()
if (now - noDataReportInterval >= noDataReportTimestamp) {
noDataReportTimestamp = now
postEvent(new QueryProgressEvent(lastProgress))
}
}

if (dataAvailable) {
// We'll increase currentBatchId after we complete processing current batch's data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,13 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(10L)

val STREAMING_NO_DATA_REPORT_INTERVAL =
SQLConfigBuilder("spark.sql.streaming.noDataReportInterval")
.internal()
.doc("How long to wait between two progress events when there is no data")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(10000L)

val STREAMING_METRICS_ENABLED =
SQLConfigBuilder("spark.sql.streaming.metricsEnabled")
.doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.")
Expand Down Expand Up @@ -684,6 +691,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)

def streamingNoDataReportInterval: Long = getConf(STREAMING_NO_DATA_REPORT_INTERVAL)

def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)

def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.scalatest.PrivateMethodTester._
import org.apache.spark.SparkException
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.util.JsonProtocol

Expand Down Expand Up @@ -191,6 +192,48 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}

test("noDataReportInterval") {
withSQLConf(SQLConf.STREAMING_NO_DATA_REPORT_INTERVAL.key -> "100ms") {
@volatile var progressEventCount = 0

val listener = new StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {}

override def onQueryProgress(event: QueryProgressEvent): Unit = {
progressEventCount += 1
}

override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
spark.streams.addListener(listener)
try {
val clock = new StreamManualClock()
val actions = mutable.ArrayBuffer[StreamAction]()
actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock)
actions += AssertOnQuery { _ =>
// It should report at least one progress
eventually(timeout(streamingTimeout)) {
assert(progressEventCount > 0)
}
true
}
for (_ <- 1 to 100) {
actions += AdvanceManualClock(10)
actions += AssertOnQuery { _ =>
// Sleep so that if the config `noDataReportInterval` doesn't work, it has enough time
// to report too many events.
Thread.sleep(10)
true
}
}
testStream(MemoryStream[Int].toDS)(actions: _*)
assert(progressEventCount <= 11)
} finally {
spark.streams.removeListener(listener)
}
}
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
// query-event-logs-version-2.0.0.txt has all types of events generated by
// Structured Streaming in Spark 2.0.0.
Expand Down

0 comments on commit 4ace2bd

Please sign in to comment.