Skip to content

Commit

Permalink
[SPARK-17731][SQL][STREAMING] Metrics for structured streaming
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing

Specifically, this PR adds the following public APIs changes.

### New APIs
- `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later)

- `StreamingQueryStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by all the sources
  - processingRate - Current rate (rows/sec) at which the query is processing data from
                                  all the sources
  - ~~outputRate~~ - *Does not work with wholestage codegen*
  - latency - Current average latency between the data being available in source and the sink writing the corresponding output
  - sourceStatuses: Array[SourceStatus] - Current statuses of the sources
  - sinkStatus: SinkStatus - Current status of the sink
  - triggerStatus - Low-level detailed status of the last completed/currently active trigger
    - latencies - getOffset, getBatch, full trigger, wal writes
    - timestamps - trigger start, finish, after getOffset, after getBatch
    - numRows - input, output, state total/updated rows for aggregations

- `SourceStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by the source
  - processingRate - Current rate (rows/sec) at which the query is processing data from the source
  - triggerStatus - Low-level detailed status of the last completed/currently active trigger

- Python API for `StreamingQuery.status()`

### Breaking changes to existing APIs
**Existing direct public facing APIs**
- Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`.
  - Branch 2.0 should have it deprecated, master should have it removed.

**Existing advanced listener APIs**
- `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus`
   - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status)

- Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`.

- Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`.

- For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java.

## How was this patch tested?

Old and new unit tests.
- Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite.
- New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite.
- New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite.
- Source-specific tests for making sure input rows are counted are is source-specific test suites.
- Additional tests to test minor additions in LocalTableScanExec, StateStore, etc.

Metrics also manually tested using Ganglia sink

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#15307 from tdas/SPARK-17731.
  • Loading branch information
tdas authored and uzadude committed Jan 27, 2017
1 parent 83873bd commit 234c3dd
Show file tree
Hide file tree
Showing 27 changed files with 1,758 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,33 @@ class KafkaSourceSuite extends KafkaSourceTest {
testUnsupportedConfig("kafka.auto.offset.reset", "latest")
}

test("input row metrics") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 5)
testUtils.sendMessages(topic, Array("-1"))
require(testUtils.getLatestOffsets(Set(topic)).size === 5)

val kafka = spark
.readStream
.format("kafka")
.option("subscribe", topic)
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
AssertOnLastQueryStatus { status =>
assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
assert(status.sourceStatuses(0).processingRate > 0.0)
}
)
}

private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"

private def testFromLatestOffsets(topic: String, options: (String, String)*): Unit = {
Expand Down
13 changes: 13 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"),

// [SPARK-17731][SQL][Streaming] Metrics for structured streaming
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceStatus.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.SourceStatus.offsetDesc"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.status"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SinkStatus.this"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryInfo"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.queryInfo"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.queryInfo"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"),

// [SPARK-17338][SQL] add global temp view
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.catalog.Catalog.dropTempView"),
Expand Down
301 changes: 301 additions & 0 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,304 @@ def resetTerminated(self):
self._jsqm.resetTerminated()


class StreamingQueryStatus(object):
"""A class used to report information about the progress of a StreamingQuery.
.. note:: Experimental
.. versionadded:: 2.1
"""

def __init__(self, jsqs):
self._jsqs = jsqs

def __str__(self):
"""
Pretty string of this query status.
>>> print(sqs)
StreamingQueryStatus:
Query name: query
Query id: 1
Status timestamp: 123
Input rate: 15.5 rows/sec
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger details:
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
triggerId: 5
Source statuses [1 source]:
Source 1: MySource1
Available offset: #0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status: MySink
Committed offsets: [#1, -]
"""
return self._jsqs.toString()

@property
@ignore_unicode_prefix
@since(2.1)
def name(self):
"""
Name of the query. This name is unique across all active queries.
>>> sqs.name
u'query'
"""
return self._jsqs.name()

@property
@since(2.1)
def id(self):
"""
Id of the query. This id is unique across all queries that have been started in
the current process.
>>> int(sqs.id)
1
"""
return self._jsqs.id()

@property
@since(2.1)
def timestamp(self):
"""
Timestamp (ms) of when this query was generated.
>>> int(sqs.timestamp)
123
"""
return self._jsqs.timestamp()

@property
@since(2.1)
def inputRate(self):
"""
Current total rate (rows/sec) at which data is being generated by all the sources.
>>> sqs.inputRate
15.5
"""
return self._jsqs.inputRate()

@property
@since(2.1)
def processingRate(self):
"""
Current rate (rows/sec) at which the query is processing data from all the sources.
>>> sqs.processingRate
23.5
"""
return self._jsqs.processingRate()

@property
@since(2.1)
def latency(self):
"""
Current average latency between the data being available in source and the sink
writing the corresponding output.
>>> sqs.latency
345.0
"""
if (self._jsqs.latency().nonEmpty()):
return self._jsqs.latency().get()
else:
return None

@property
@ignore_unicode_prefix
@since(2.1)
def sourceStatuses(self):
"""
Current statuses of the sources as a list.
>>> len(sqs.sourceStatuses)
1
>>> sqs.sourceStatuses[0].description
u'MySource1'
"""
return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]

@property
@ignore_unicode_prefix
@since(2.1)
def sinkStatus(self):
"""
Current status of the sink.
>>> sqs.sinkStatus.description
u'MySink'
"""
return SinkStatus(self._jsqs.sinkStatus())

@property
@ignore_unicode_prefix
@since(2.1)
def triggerDetails(self):
"""
Low-level details of the currently active trigger (e.g. number of rows processed
in trigger, latency of intermediate steps, etc.).
If no trigger is currently active, then it will have details of the last completed trigger.
>>> sqs.triggerDetails
{u'triggerId': u'5', u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
u'isTriggerActive': u'true', u'latency.getOffset.total': u'10',
u'isDataPresentInTrigger': u'true'}
"""
return self._jsqs.triggerDetails()


class SourceStatus(object):
"""
Status and metrics of a streaming Source.
.. note:: Experimental
.. versionadded:: 2.1
"""

def __init__(self, jss):
self._jss = jss

def __str__(self):
"""
Pretty string of this source status.
>>> print(sqs.sourceStatuses[0])
SourceStatus: MySource1
Available offset: #0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
"""
return self._jss.toString()

@property
@ignore_unicode_prefix
@since(2.1)
def description(self):
"""
Description of the source corresponding to this status.
>>> sqs.sourceStatuses[0].description
u'MySource1'
"""
return self._jss.description()

@property
@ignore_unicode_prefix
@since(2.1)
def offsetDesc(self):
"""
Description of the current offset if known.
>>> sqs.sourceStatuses[0].offsetDesc
u'#0'
"""
return self._jss.offsetDesc()

@property
@since(2.1)
def inputRate(self):
"""
Current rate (rows/sec) at which data is being generated by the source.
>>> sqs.sourceStatuses[0].inputRate
15.5
"""
return self._jss.inputRate()

@property
@since(2.1)
def processingRate(self):
"""
Current rate (rows/sec) at which the query is processing data from the source.
>>> sqs.sourceStatuses[0].processingRate
23.5
"""
return self._jss.processingRate()

@property
@ignore_unicode_prefix
@since(2.1)
def triggerDetails(self):
"""
Low-level details of the currently active trigger (e.g. number of rows processed
in trigger, latency of intermediate steps, etc.).
If no trigger is currently active, then it will have details of the last completed trigger.
>>> sqs.sourceStatuses[0].triggerDetails
{u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
u'latency.getBatch.source': u'20'}
"""
return self._jss.triggerDetails()


class SinkStatus(object):
"""
Status and metrics of a streaming Sink.
.. note:: Experimental
.. versionadded:: 2.1
"""

def __init__(self, jss):
self._jss = jss

def __str__(self):
"""
Pretty string of this source status.
>>> print(sqs.sinkStatus)
SinkStatus: MySink
Committed offsets: [#1, -]
"""
return self._jss.toString()

@property
@ignore_unicode_prefix
@since(2.1)
def description(self):
"""
Description of the source corresponding to this status.
>>> sqs.sinkStatus.description
u'MySink'
"""
return self._jss.description()

@property
@ignore_unicode_prefix
@since(2.1)
def offsetDesc(self):
"""
Description of the current offsets up to which data has been written by the sink.
>>> sqs.sinkStatus.offsetDesc
u'[#1, -]'
"""
return self._jss.offsetDesc()


class Trigger(object):
"""Used to indicate how often results should be produced by a :class:`StreamingQuery`.
Expand Down Expand Up @@ -753,11 +1051,14 @@ def _test():
globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
globs['df'] = \
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
globs['sqs'] = StreamingQueryStatus(
spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus())

(failure_count, test_count) = doctest.testmod(
pyspark.sql.streaming, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
globs['spark'].stop()

if failure_count:
exit(-1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
ret
}

/**
* Returns a Seq containing the leaves in this tree.
*/
def collectLeaves(): Seq[BaseType] = {
this.collect { case p if p.children.isEmpty => p }
}

/**
* Finds and returns the first [[TreeNode]] of the tree for which the given partial function
* is defined (pre-order), and applies the partial function to it.
Expand Down
Loading

0 comments on commit 234c3dd

Please sign in to comment.