-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30553][DOCS] fix structured-streaming java example error #27268
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for making a PR, @bettermouse .
However, the Java example should be consistent with Scala.
If you want to change the example, please test and update Scala part together.
@dongjoon-hyun Thanks.but I do not think we should change Scala. After my update.JAVA and SCALA have same Physical Plan. |
I'm taking back my words. I'm reviewing this PR again. |
I saw your example in the JIRA.
|
@dongjoon-hyun I have checked it.The class JavaStructuredNetworkWordCountWindowed does not use API withWatermark. So there is no problem |
Oh, got it. Thank you for checking. |
Jenkins test this please |
Test build #116979 has finished for PR 27268 at commit
|
Thanks for the contribution, @bettermouse Could we have plan information for both current master and after the fix? It would be great to have it in PR description so that reviewers don't have to check it manually. And it would be better if we could respect the template of the PR. You can put "N/A" if the PR doesn't need to fill up the section. |
.groupBy( | ||
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"), | ||
words.col("word")) | ||
functions.window(wordsWatermark.col("timestamp"), "10 minutes", "5 minutes"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess functions.col("timestamp")
is exactly equivalent to the $"timestamp"
in Scala code example. Same applies to col("word").
Actually Java code example seems to be written a bit verbosely. import static org.apache.spark.sql.functions.*;
would shorten the code and remove functions.
all the places - actually Scala code example assume that the import is placed before.
In overall, if I understand correctly, col("timestamp")
would work too if the static import is placed correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR Thank you for your help.
I think we would be better to address this and include the result in the section of "How was this patch tested?", instead of simply putting "N/A". |
@HeartSaVioR Sorry.This is my first PR.Can you help me understand what |
I assume you are now having two example queries "before the fix" vs "after the fix". Once you run the query and ingest some data, you'll see the query plan in SQL tab, as you've added like It would be enough to fill up the content and add below:
Please remove |
Thank you very much.I have corrected it according to your suggestion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@dongjoon-hyun Could you please review this again? window#13-T600000ms
denotes the field has event time metadata correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Thank you so much for your first contribution, @bettermouse.
Thank you so much the good guide to help this PR, @HeartSaVioR !
Merged to master/2.4.
# What changes were proposed in this pull request? Fix structured-streaming java example error. ```java Dataset<Row> windowedCounts = words .withWatermark("timestamp", "10 minutes") .groupBy( functions.window(words.col("timestamp"), "10 minutes", "5 minutes"), words.col("word")) .count(); ``` It does not clean up old state.May cause OOM > Before the fix ```scala == Physical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter48e331f0 +- *(4) HashAggregate(keys=[window#13, word#4], functions=[count(1)], output=[window#13, word#4, count#12L]) +- StateStoreSave [window#13, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-91124080-0e20-41c0-9150-91735bdc22c0/state, runId = 5c425536-a3ae-4385-8167-5fa529e6760d, opId = 0, ver = 6, numPartitions = 1], Update, 1579530890886, 2 +- *(3) HashAggregate(keys=[window#13, word#4], functions=[merge_count(1)], output=[window#13, word#4, count#23L]) +- StateStoreRestore [window#13, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-91124080-0e20-41c0-9150-91735bdc22c0/state, runId = 5c425536-a3ae-4385-8167-5fa529e6760d, opId = 0, ver = 6, numPartitions = 1], 2 +- *(2) HashAggregate(keys=[window#13, word#4], functions=[merge_count(1)], output=[window#13, word#4, count#23L]) +- Exchange hashpartitioning(window#13, word#4, 1) +- *(1) HashAggregate(keys=[window#13, word#4], functions=[partial_count(1)], output=[window#13, word#4, count#23L]) +- *(1) Project [window#13, word#4] +- *(1) Filter (((isnotnull(timestamp#5) && isnotnull(window#13)) && (timestamp#5 >= window#13.start)) && (timestamp#5 < window#13.end)) +- *(1) Expand [List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 0) - 2) * 300000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 0) - 2) * 300000000) + 600000000), LongType, TimestampType)), word#4, timestamp#5-T600000ms), List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 1) - 2) * 300000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 1) - 2) * 300000000) + 600000000), LongType, TimestampType)), word#4, timestamp#5-T600000ms)], [window#13, word#4, timestamp#5-T600000ms] +- EventTimeWatermark timestamp#5: timestamp, interval 10 minutes +- LocalTableScan <empty>, [word#4, timestamp#5] ``` > After the fix ```scala == Physical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter1df12a96 +- *(4) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[count(1)], output=[window#8-T600000ms, word#4, count#12L]) +- StateStoreSave [window#13-T600000ms, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-95ac74cc-aca6-42eb-827d-7586aa69bcd3/state, runId = 91fa311d-d47e-4726-9d0a-f21ef268d9d0, opId = 0, ver = 4, numPartitions = 1], Update, 1579529975342, 2 +- *(3) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[merge_count(1)], output=[window#13-T600000ms, word#4, count#23L]) +- StateStoreRestore [window#13-T600000ms, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-95ac74cc-aca6-42eb-827d-7586aa69bcd3/state, runId = 91fa311d-d47e-4726-9d0a-f21ef268d9d0, opId = 0, ver = 4, numPartitions = 1], 2 +- *(2) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[merge_count(1)], output=[window#13-T600000ms, word#4, count#23L]) +- Exchange hashpartitioning(window#13-T600000ms, word#4, 1) +- *(1) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[partial_count(1)], output=[window#13-T600000ms, word#4, count#23L]) +- *(1) Project [window#13-T600000ms, word#4] +- *(1) Filter (((isnotnull(timestamp#5-T600000ms) && isnotnull(window#13-T600000ms)) && (timestamp#5-T600000ms >= window#13-T600000ms.start)) && (timestamp#5-T600000ms < window#13-T600000ms.end)) +- *(1) Expand [List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 0) - 2) * 300000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 0) - 2) * 300000000) + 600000000), LongType, TimestampType)), word#4, timestamp#5-T600000ms), List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 1) - 2) * 300000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) END + 1) - 2) * 300000000) + 600000000), LongType, TimestampType)), word#4, timestamp#5-T600000ms)], [window#13-T600000ms, word#4, timestamp#5-T600000ms] +- EventTimeWatermark timestamp#5: timestamp, interval 10 minutes +- LocalTableScan <empty>, [word#4, timestamp#5] ``` ### Why are the changes needed? If we write the code according to the documentation.It does not clean up old state.May cause OOM ### Does this PR introduce any user-facing change? No ### How was this patch tested? ```java SparkSession spark = SparkSession.builder().appName("test").master("local[*]") .config("spark.sql.shuffle.partitions", 1) .getOrCreate(); Dataset<Row> lines = spark.readStream().format("socket") .option("host", "skynet") .option("includeTimestamp", true) .option("port", 8888).load(); Dataset<Row> words = lines.toDF("word", "timestamp"); Dataset<Row> windowedCounts = words .withWatermark("timestamp", "10 minutes") .groupBy( window(col("timestamp"), "10 minutes", "5 minutes"), col("word")) .count(); StreamingQuery start = windowedCounts.writeStream() .outputMode("update") .format("console").start(); start.awaitTermination(); ``` We can write an example like this.And input some date 1. see the matrics `stateOnCurrentVersionSizeBytes` in log.Is it increasing all the time? 2. see the Physical Plan.Whether it contains things like `HashAggregate(keys=[window#11-T10000ms, value#39]` 3. We can debug in `storeManager.remove(store, keyRow)`.Whether it will remove the old state. Closes #27268 from bettermouse/spark-30553. Authored-by: bettermouse <qq5375631> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 3c4e619) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@bettermouse . You are added to the Apache Spark contributor group and SPARK-30553 is assigned to you. Congratulation! |
What changes were proposed in this pull request?
Fix structured-streaming java example error.
It does not clean up old state.May cause OOM
Why are the changes needed?
If we write the code according to the documentation.It does not clean up old state.May cause OOM
Does this PR introduce any user-facing change?
No
How was this patch tested?
We can write an example like this.And input some date
stateOnCurrentVersionSizeBytes
in log.Is it increasing all the time?HashAggregate(keys=[window#11-T10000ms, value#39]
storeManager.remove(store, keyRow)
.Whether it will remove the old state.