Skip to content

Commit

Permalink
Address more
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Dec 2, 2016
1 parent f7b8755 commit be3737f
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class StreamExecution(

private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay

private val noDataEventInterval = sparkSession.sessionState.conf.streamingNoDataEventInterval
private val noDataProgressEventInterval =
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval

/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
Expand Down Expand Up @@ -199,7 +200,7 @@ class StreamExecution(
SparkSession.setActiveSession(sparkSession)

// The timestamp we report an event that has no input data
var noDataEventTimestamp = Long.MinValue
var lastNoDataProgressEventTime = Long.MinValue

triggerExecutor.execute(() => {
startTrigger()
Expand All @@ -225,12 +226,12 @@ class StreamExecution(
finishTrigger(dataAvailable)
if (dataAvailable) {
// Reset noDataEventTimestamp if we processed any data
noDataEventTimestamp = Long.MinValue
lastNoDataProgressEventTime = Long.MinValue
postEvent(new QueryProgressEvent(lastProgress))
} else {
val now = triggerClock.getTimeMillis()
if (now - noDataEventInterval >= noDataEventTimestamp) {
noDataEventTimestamp = now
if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
lastNoDataProgressEventTime = now
postEvent(new QueryProgressEvent(lastProgress))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,8 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(10L)

val STREAMING_NO_DATA_EVENT_INTERVAL =
SQLConfigBuilder("spark.sql.streaming.noDataEventInterval")
val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL =
SQLConfigBuilder("spark.sql.streaming.noDataProgressEventInterval")
.internal()
.doc("How long to wait between two progress events when there is no data")
.timeConf(TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -691,7 +691,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)

def streamingNoDataEventInterval: Long = getConf(STREAMING_NO_DATA_EVENT_INTERVAL)
def streamingNoDataProgressEventInterval: Long =
getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL)

def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {

test("only one progress event per interval when no data") {
// This test will start a query but not push any data, and then check if we push too many events
withSQLConf(SQLConf.STREAMING_NO_DATA_EVENT_INTERVAL.key -> "100ms") {
withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100ms") {
@volatile var numProgressEvent = 0
val listener = new StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {}
Expand Down Expand Up @@ -228,7 +228,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
testStream(input.toDS)(actions: _*)
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
// 11 is the max value of the possible numbers of events.
assert(numProgressEvent >= 1 && numProgressEvent <= 11)
assert(numProgressEvent > 1 && numProgressEvent <= 11)
} finally {
spark.streams.removeListener(listener)
}
Expand Down

0 comments on commit be3737f

Please sign in to comment.