-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17346][SQL][test-maven]Add Kafka source for Structured Streaming (branch 2.0) #15367
Conversation
## What changes were proposed in this pull request? This PR adds a new project ` external/kafka-0-10-sql` for Structured Streaming Kafka source. It's based on the design doc: https://docs.google.com/document/d/19t2rWe51x7tq2e5AOfrsM9qb8_m7BRuv9fel9i0PqR8/edit?usp=sharing tdas did most of work and part of them was inspired by koeninger's work. ### Introduction The Kafka source is a structured streaming data source to poll data from Kafka. The schema of reading data is as follows: Column | Type ---- | ---- key | binary value | binary topic | string partition | int offset | long timestamp | long timestampType | int The source can deal with deleting topics. However, the user should make sure there is no Spark job processing the data when deleting a topic. ### Configuration The user can use `DataStreamReader.option` to set the following configurations. Kafka Source's options | value | default | meaning ------ | ------- | ------ | ----- startingOffset | ["earliest", "latest"] | "latest" | The start point when a query is started, either "earliest" which is from the earliest offset, or "latest" which is just from the latest offset. Note: This only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off. failOnDataLost | [true, false] | true | Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. subscribe | A comma-separated list of topics | (none) | The topic list to subscribe. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source. subscribePattern | Java regex string | (none) | The pattern used to subscribe the topic. Only one of "subscribe" and "subscribeParttern" options can be specified for Kafka source. kafka.consumer.poll.timeoutMs | long | 512 | The timeout in milliseconds to poll data from Kafka in executors fetchOffset.numRetries | int | 3 | Number of times to retry before giving up fatch Kafka latest offsets. fetchOffset.retryIntervalMs | long | 10 | milliseconds to wait before retrying to fetch Kafka offsets Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, `stream.option("kafka.bootstrap.servers", "host:port")` ### Usage * Subscribe to 1 topic ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribe", "topic1") .load() ``` * Subscribe to multiple topics ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribe", "topic1,topic2") .load() ``` * Subscribe to a pattern ```Scala spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host:port") .option("subscribePattern", "topic.*") .load() ``` ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Shixiong Zhu <zsxwing@gmail.com> Author: cody koeninger <cody@koeninger.org> Closes #15102 from zsxwing/kafka-source.
cc @tdas |
Test build #66408 has finished for PR 15367 at commit
|
…aven build ## What changes were proposed in this pull request? Generate the sql test jar to fix the maven build ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #15368 from zsxwing/sql-test-jar.
Test build #66423 has finished for PR 15367 at commit
|
retest this please |
Test build #66432 has finished for PR 15367 at commit
|
retest this please |
Test build #66433 has finished for PR 15367 at commit
|
This is rather a lot of functionality to backport, our backport guide doesn't seem to quite line up with this (based on https://www.mail-archive.com/dev@spark.apache.org/msg10284.html / https://cwiki.apache.org/confluence/display/SPARK/Committers#Committers-PolicyonBackportingBugFixes ) but @pwendell will know better. |
@holdenk this is basically adding an external project for Structured streaming. Without the Kafka source, it's hard for people to test Structured Streaming with their real data. Considering 2.0.2 will be out before 2.1.0, it's better to add this one into branch 2.0 in order to test it early. Since Structured Streaming is experimental, it won't break any production usage and should be safe. |
We should definitly vet this PR carefully to make sure its safe. One thing that is missing from that guide, that I do believe is accepted practice, is more leeway when the feature is marked experimental (i.e. during Spark 1.0-1.3 I used to backport pretty much everything that went into SQL). Since I think many people are simply not using structured streaming due to lack of kafka support, I'd error on the side of more users sooner. If you think any part of this PR is unsafe with respect stable components though, please do raise those concerns. |
} | ||
|
||
after { | ||
for (topic <- testUtils.getAllTopicsAndPartitionSize().toMap.keys) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdas we can just merge this one then merge my another PR to 2.0. There should not be any conflicts.
@zsxwing Have you tested the maven build? |
@marmbrus @zsxwing I agree its experimental and we should have more flexibility here with backports. I also very much agree that structured streaming in its current state on 2.0 isn't usable - but I'm not super sure that backporting fixes is the best way to do this? Honestly I spend most of my time focused on Python & ML (and I've only really been looking at structured streaming with those two hats on). I'm really cautious about the idea 2k+ line backport which hasn't even been released otherwise but I don't have any specific objections to the changes its just making me nervous. The fact the whats being backported seems to still be under development is also concerning since doing this backport now puts us in a position of backporting more (not yet merged into mainline) fixes. Of course - If the people with the most experience in this area all agree (and most of y'all [ @marmbrus @zsxwing @tdas but maybe missing @koeninger ] seem to already be on this PR so I'll leave you to it) that this backport reasonable that is great - it would probably be good to follow up to the original backport mailing list thread and update the wiki as well. |
Does backporting reduce the likelihood of change if user feedback indicates we got it wrong? My technical concerns were largely addressed, that's my big remaining organizational concern. |
No, if we backport this I would plan to continue to backport changes (that are safe) until the next release. Either way this should not affect what goes into master. |
Thanks! I'm going to merge this one since the concern from @koeninger is addressed. |
…ing (branch 2.0) ## What changes were proposed in this pull request? Backport 9293734 and b678e46 into branch 2.0. The only difference is the Spark version in pom file. ## How was this patch tested? Jenkins. Author: Shixiong Zhu <shixiong@databricks.com> Closes #15367 from zsxwing/kafka-source-branch-2.0.
What changes were proposed in this pull request?
Backport 9293734 and b678e46 into branch 2.0.
The only difference is the Spark version in pom file.
How was this patch tested?
Jenkins.