-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2492][Streaming] kafkaReceiver minor changes to align with Kafka 0.8 #1420
Conversation
QA tests have started for PR 1420. This patch merges cleanly. |
QA results for PR 1420: |
QA tests have started for PR 1420. This patch merges cleanly. |
QA results for PR 1420: |
QA tests have started for PR 1420. This patch merges cleanly. |
QA results for PR 1420: |
Hi @tdas , would you mind taking a look at this? thanks a lot. |
@tdas This would bring the kafka receiver in alignment with kafka 0.8. If folks would like the old kafka 0.7 behavior resetOffset does the trick. It looks like there are some merge conflicts right now, otherwise I would give this +1. |
@reinvigorate Thanks for your review. |
@jerryshao Could you merge conflicts? I am taking a look in trying to understand the change. After that I will merge this change. |
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). | ||
* @param groupId The group id for this consumer. | ||
*/ | ||
def resetOffset(zkQuorum: String, groupId: String) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think it is good idea to expose have this utility function in our API that is very specific to a particular version of Kafka. If this functionality is required for some well-known version-specific Kafka issue, then I am guessing there will be known workarounds in the Kafka-world and we dont need to provide custom functionality for that. So I dont think we should add this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So maybe we can just remove this function to let user to delete the ZK metadata by themselves if they know it.
…ZK metadata by calling this API
Test build #23199 has started for PR 1420 at commit
|
Hi TD, I've rebased the code to the latest master. I agree with you that providing this utility function for Kafka in Spark Streaming is quite strange, IMO we can just delete this code. If people want to directly read data from the begin or end of partition with such trick, they can do it by themselves. What is your opinion. @reinvigorate , what is your opinion? |
Test build #23199 has finished for PR 1420 at commit
|
Test FAILed. |
Jenkins, retest this please. |
Test build #23201 has started for PR 1420 at commit
|
@jerryshao Yeah I think @tdas is spot on. |
Test build #23201 has finished for PR 1420 at commit
|
Test FAILed. |
Quite strange Jenkins fails to build this patch. |
Test build #23203 has started for PR 1420 at commit
|
Test build #23203 has finished for PR 1420 at commit
|
Test PASSed. |
@@ -17,6 +17,7 @@ | |||
|
|||
package org.apache.spark.streaming.kafka | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not necessary
Thank @reinvigorate. @jerryshao Could you please delete that method, and remove the ZKUtils import. Other than that this patch looks good to me. |
Test build #23214 has started for PR 1420 at commit
|
Thanks @tdas and @reinvigorate , I've removed this method as you suggested, please help to review :) |
LGTM. Will merge when tests pass. |
KafkaStreamSuite has passed. So I am merging this. |
Test build #23214 has finished for PR 1420 at commit
|
Test PASSed. |
…ka 0.8 Update the KafkaReceiver's behavior when auto.offset.reset is set. In Kafka 0.8, `auto.offset.reset` is a hint for out-range offset to seek to the beginning or end of the partition. While in the previous code `auto.offset.reset` is a enforcement to seek to the beginning or end immediately, this is different from Kafka 0.8 defined behavior. Also deleting extesting ZK metadata in Receiver when multiple consumers are launched will introduce issue as mentioned in [SPARK-2383](https://issues.apache.org/jira/browse/SPARK-2383). So Here we change to offer user to API to explicitly reset offset before create Kafka stream, while in the meantime keep the same behavior as Kafka 0.8 for parameter `auto.offset.reset`. @tdas, would you please review this PR? Thanks a lot. Author: jerryshao <saisai.shao@intel.com> Closes #1420 from jerryshao/kafka-fix and squashes the following commits: d6ae94d [jerryshao] Address the comment to remove the resetOffset() function de3a4c8 [jerryshao] Fix compile error 4a1c3f9 [jerryshao] Doc changes b2c1430 [jerryshao] Move offset reset to a helper function to let user explicitly delete ZK metadata by calling this API fac8fd6 [jerryshao] Changes to align with Kafka 0.8 (cherry picked from commit c8850a3) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Update the KafkaReceiver's behavior when auto.offset.reset is set.
In Kafka 0.8,
auto.offset.reset
is a hint for out-range offset to seek to the beginning or end of the partition. While in the previous codeauto.offset.reset
is a enforcement to seek to the beginning or end immediately, this is different from Kafka 0.8 defined behavior.Also deleting extesting ZK metadata in Receiver when multiple consumers are launched will introduce issue as mentioned in SPARK-2383.
So Here we change to offer user to API to explicitly reset offset before create Kafka stream, while in the meantime keep the same behavior as Kafka 0.8 for parameter
auto.offset.reset
.@tdas, would you please review this PR? Thanks a lot.