Skip to content

Commit

Permalink
More changes
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Jan 17, 2018
1 parent 1335a6d commit b8381ef
Showing 1 changed file with 129 additions and 45 deletions.
174 changes: 129 additions & 45 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1052,10 +1052,13 @@ output mode.

### Join Operations
Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame
as well as another streaming Dataset/DataFrame. In this section we will explore what type of joins
(i.e. inner, outer, etc.) are supported in the above cases. Note that in all the supported join
types, the result of the join with a streaming Dataset/DataFrame will be the exactly the same as
if it was with a static Dataset/DataFrame containing the same data in the stream.
as well as another streaming Dataset/DataFrame. The result of the streaming join is generated
incrementally, similar to the results of streaming aggregations in the previous section. In this
section we will explore what type of joins (i.e. inner, outer, etc.) are supported in the above
cases. Note that in all the supported join types, the result of the join with a streaming
Dataset/DataFrame will be the exactly the same as if it was with a static Dataset/DataFrame
containing the same data in the stream.


#### Stream-static joins

Expand Down Expand Up @@ -1099,9 +1102,8 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat
</div>

Note that stream-static joins are not stateful, so no state management is necessary.
However, a few types of stream-static outer join are not supported as the incomplete view of
all data in a stream makes it infeasible to calculate the results correctly.
These are discussed at the end of this section.
However, a few types of stream-static outer joins are not yet supported.
These are listed at the [end of this Join section](#support-matrix-for-joins-in-streaming-queries).

#### Stream-stream Joins
In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming
Expand All @@ -1126,9 +1128,13 @@ In other words, you will have to do the following additional steps in the join.
(similar to streaming aggregations)

1. Define a constraint on event-time across the two inputs such that the engine can figure out when
old rows of one input is not going to be required for matches with the other input. This constraint
can either be a time range condition (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`),
or equi-join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`).
old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for
matches with the other input. This constraint can be defined in one of the two ways.

1. Time range join conditions (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`),

1. Join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`).

Let’s understand this with an example.

Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with
Expand Down Expand Up @@ -1164,8 +1170,8 @@ impressionsWithWatermark.join(
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""
))
""")
)

{% endhighlight %}

Expand All @@ -1188,8 +1194,8 @@ impressionsWithWatermark.join(
expr(
"clickAdId = impressionAdId AND " +
"clickTime >= impressionTime AND " +
"clickTime <= impressionTime + interval 1 hour "
));
"clickTime <= impressionTime + interval 1 hour ")
);

{% endhighlight %}

Expand All @@ -1214,8 +1220,8 @@ impressionsWithWatermark.join(
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""
))
""")
)

{% endhighlight %}

Expand All @@ -1226,13 +1232,67 @@ impressionsWithWatermark.join(
While the watermark + event-time constraints is optional for inner joins, for left and right outer
joins they must be specified. This is because for generating the NULL results in outer join, the
engine must know when an input row is not going to match with anything in future. Hence, the
watermark + event-time constraints must be specified for generating correct results.
watermark + event-time constraints must be specified for generating correct results. Therefore,
a query with outer-join will look quite like the ad-monetization example earlier, except that
there will be an additional parameter specifying it to be an outer-join.

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}

impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter"
)

{% endhighlight %}

</div>
<div data-lang="java" markdown="1">

{% highlight java %}
impressionsWithWatermark.join(
clicksWithWatermark,
expr(
"clickAdId = impressionAdId AND " +
"clickTime >= impressionTime AND " +
"clickTime <= impressionTime + interval 1 hour "),
"leftOuter" // can be "inner", "leftOuter", "rightOuter"
);

{% endhighlight %}


</div>
<div data-lang="python" markdown="1">

{% highlight python %}
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter"
)

{% endhighlight %}

</div>
</div>

However, note that the outer NULL results will be generated with a delay (depends on the specified
watermark delay and the time range condition) because the engine has to wait for that long to ensure
there were no matches and there will be no more matches in future.

##### Support matrix for joins on streaming data
##### Support matrix for joins in streaming queries

<table class ="table">
<tr>
Expand All @@ -1242,10 +1302,17 @@ watermark delay and the time range condition) because the engine has to wait for
<th></th>
</tr>
<tr>
<td rowspan="4" style="vertical-align: middle;">
Stream</td>
<td rowspan="4" style="vertical-align: middle;">
Static</td>
<td style="vertical-align: middle;">Static</td>
<td style="vertical-align: middle;">Static</td>
<td style="vertical-align: middle;">All types</td>
<td style="vertical-align: middle;">
Supported, since its not on streaming data even though it
can be present in a streaming query
</td>
</tr>
<tr>
<td rowspan="4" style="vertical-align: middle;">Stream</td>
<td rowspan="4" style="vertical-align: middle;">Static</td>
<td style="vertical-align: middle;">Inner</td>
<td style="vertical-align: middle;">Supported, not stateful</td>
</tr>
Expand All @@ -1262,10 +1329,8 @@ watermark delay and the time range condition) because the engine has to wait for
<td style="vertical-align: middle;">Not supported</td>
</tr>
<tr>
<td rowspan="4" style="vertical-align: middle;">
Static</td>
<td rowspan="4" style="vertical-align: middle;">
Stream</td>
<td rowspan="4" style="vertical-align: middle;">Static</td>
<td rowspan="4" style="vertical-align: middle;">Stream</td>
<td style="vertical-align: middle;">Inner</td>
<td style="vertical-align: middle;">Supported, not stateful</td>
</tr>
Expand All @@ -1282,20 +1347,27 @@ watermark delay and the time range condition) because the engine has to wait for
<td style="vertical-align: middle;">Not supported</td>
</tr>
<tr>
<td rowspan="4" style="vertical-align: middle;">
Stream</td>
<td rowspan="4" style="vertical-align: middle;">
Stream</td>
<td rowspan="4" style="vertical-align: middle;">Stream</td>
<td rowspan="4" style="vertical-align: middle;">Stream</td>
<td style="vertical-align: middle;">Inner</td>
<td style="vertical-align: middle;">Supported, optionally specify watermark on both sides + time constraints for state cleanup</td>
<td style="vertical-align: middle;">
Supported, optionally specify watermark on both sides +
time constraints for state cleanup<
</td>
</tr>
<tr>
<td style="vertical-align: middle;">Left Outer</td>
<td style="vertical-align: middle;">Conditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup</td>
<td style="vertical-align: middle;">
Conditionally supported, must specify watermark on right + time constraints for correct
results, optionally specify watermark on left for all state cleanup
</td>
</tr>
<tr>
<td style="vertical-align: middle;">Right Outer</td>
<td style="vertical-align: middle;">Conditionally supported, must specify watermark on left + time constraints for correct results, optionally specify watermark on right for all state cleanup</td>
<td style="vertical-align: middle;">
Conditionally supported, must specify watermark on left + time constraints for correct
results, optionally specify watermark on right for all state cleanup
</td>
</tr>
<tr>
<td style="vertical-align: middle;">Full Outer</td>
Expand All @@ -1313,7 +1385,15 @@ Additional details on supported joins:

- Joins can be cascaded, that is, you can do `df1.join(df2, ...).join(df3, ...).join(df4, ....)`.

- As of Spark 2.3, joins can be used only when the query is in Append output mode. Other output modes are not yet supported.
- As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.

- As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of
what cannot be used.

- Cannot use streaming aggregations before joins.

- Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode cannot before joins.



### Streaming Deduplication
Expand Down Expand Up @@ -1383,19 +1463,13 @@ Some of them are as follows.

- Limit and take first N rows are not supported on streaming Datasets.

- Distinct operations on streaming Datasets are not supported.
- Distinct operations on streaming Datasets are not supported. See the
<a href="#support-matrix-for-joins-in-streaming-queries">support matrix in the Join Operations section</a>
for more details

- Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

- Outer joins between a streaming and a static Datasets are conditionally supported.

+ Full outer join with a streaming Dataset is not supported

+ Left outer join with a streaming Dataset on the right is not supported

+ Right outer join with a streaming Dataset on the left is not supported

- Any kind of joins between two streaming Datasets is not yet supported.
- Few types of outer joins on streaming Datasets are not supported. See the

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

Expand Down Expand Up @@ -1503,6 +1577,15 @@ Here is the compatibility matrix.
Aggregations not allowed after <code>flatMapGroupsWithState</code>.
</td>
</tr>
<tr>
<td colspan="2" style="vertical-align: middle;">Queries with <code>joins</code></td>
<td style="vertical-align: middle;">Append</td>
<td style="vertical-align: middle;">
Update and Complete mode not supported yet. See the
<a href="#support-matrix-for-joins-in-streaming-queries">support matrix in the Join Operations section</a>
for more details on what types of joins are supported.
</td>
</tr>
<tr>
<td colspan="2" style="vertical-align: middle;">Other queries</td>
<td style="vertical-align: middle;">Append, Update</td>
Expand Down Expand Up @@ -2369,6 +2452,7 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat

**Talks**

- Spark Summit 2017 Talk - [Easy, Scalable, Fault-tolerant Stream Processing with Structured Streaming in Apache Spark](https://spark-summit.org/2017/events/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark/)
- Spark Summit Europe 2017 Talks -
- [Easy, Scalable, Fault-tolerant Stream Processing with Structured Streaming in Apache Spark](https://spark-summit.org/2017/events/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark/)
- Spark Summit 2016 Talk - [A Deep Dive into Structured Streaming](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)

0 comments on commit b8381ef

Please sign in to comment.