Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17834][SQL]Fetch the earliest offsets manually in KafkaSource instead of counting on KafkaConsumer #15397

Closed
wants to merge 3 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Oct 7, 2016

What changes were proposed in this pull request?

Because KafkaConsumer.poll(0) may update the partition offsets, this PR just calls seekToBeginning to manually set the earliest offsets for the KafkaSource initial offsets.

How was this patch tested?

Existing tests.

@zsxwing
Copy link
Member Author

zsxwing commented Oct 7, 2016

/cc @tdas @koeninger

@SparkQA
Copy link

SparkQA commented Oct 8, 2016

Test build #66543 has finished for PR 15397 at commit 95a0c96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

How is this going to work with assign? It seems like it's just avoiding the problem, not fixing it.

@zsxwing
Copy link
Member Author

zsxwing commented Oct 10, 2016

How is this going to work with assign? It seems like it's just avoiding the problem, not fixing it.

We can seek to the offsets provided by the user.

@koeninger
Copy link
Contributor

Look at the poll/seek implementation in the DStream's subscribe and
subscribe pattern when user offsets are provided, i.e. the problem that
triggered this ticket to begin with. You're going to have to solve the same
problem there with the structured stream, unless the structured stream
somehow wants to limit assigning specific partitions only to the assign
strategy, which eliminates lots of valid use cases.

On Sun, Oct 9, 2016 at 11:41 PM, Shixiong Zhu notifications@github.com
wrote:

How is this going to work with assign? It seems like it's just avoiding
the problem, not fixing it.

We can seek to the offsets provided by the user.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#15397 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAGAB1CurDhXuJr-NxZrbI_7wG8O_2l7ks5qycHzgaJpZM4KRjTS
.

@zsxwing
Copy link
Member Author

zsxwing commented Oct 12, 2016

@koeninger sorry for the delay. Right now KafkaSource doesn't support external group id, so we don't need to concern about how to fetching committed offsets. Any other cases that I'm missing?

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66849 has finished for PR 15397 at commit 9578555.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

My main point is that whoever implements SPARK-17812 is going to have to deal with the issue shown in SPARK-17782, which means much of this patch is going to need to be changed anyway.

But It's not just about external group id. Committed offsets would actually make the issue in SPARK-17782 less of a problem, because they would take precedence over auto.offset.reset

@zsxwing
Copy link
Member Author

zsxwing commented Oct 13, 2016

My main point is that whoever implements SPARK-17812 is going to have to deal with the issue shown in SPARK-17782, which means much of this patch is going to need to be changed anyway.

@koeninger I agreed that this patch will be changed. However, this PR does fix a known issue for the current supported features and there is not user facing changes. Considering 2.0.2 may come out soon and I don't think SPARK-17812 will be done soon, I would like to merge this to fix issues for 2.0.2. What do you think?

Copy link
Contributor

@koeninger koeninger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did another once over, couple more minor things.

If the plan is to wait for SPARK-17812 to fix up the other stuff I was concerned about, that's ok with me, but I really hope it doesn't slip past another release. To reiterate, I'm fine with doing that work.

@@ -256,8 +269,6 @@ private[kafka010] case class KafkaSource(
*/
private def fetchNewPartitionEarliestOffsets(
newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt {
// Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to pause all partitions here as well?

@@ -270,7 +281,7 @@ private[kafka010] case class KafkaSource(
// So we need to ignore them
partitions.contains(p)
}.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got offsets for new partitions: $partitionToOffsets")
logDebug(s"Got earliest offsets for new partitions: $partitionToOffsets")
partitionToOffsets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: different variable name partitionToOffsets vs partitionOffsets for what is essentially the same thing

@SparkQA
Copy link

SparkQA commented Oct 13, 2016

Test build #66908 has finished for PR 15397 at commit 7986f18.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@koeninger
Copy link
Contributor

LGTM, thanks for talking it through

@zsxwing
Copy link
Member Author

zsxwing commented Oct 13, 2016

Thanks! Merging to master and 2.0.

asfgit pushed a commit that referenced this pull request Oct 13, 2016
… instead of counting on KafkaConsumer

## What changes were proposed in this pull request?

Because `KafkaConsumer.poll(0)` may update the partition offsets, this PR just calls `seekToBeginning` to manually set the earliest offsets for the KafkaSource initial offsets.

## How was this patch tested?

Existing tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15397 from zsxwing/SPARK-17834.

(cherry picked from commit 08eac35)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
@asfgit asfgit closed this in 08eac35 Oct 13, 2016
@zsxwing zsxwing deleted the SPARK-17834 branch October 13, 2016 20:52
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
… instead of counting on KafkaConsumer

## What changes were proposed in this pull request?

Because `KafkaConsumer.poll(0)` may update the partition offsets, this PR just calls `seekToBeginning` to manually set the earliest offsets for the KafkaSource initial offsets.

## How was this patch tested?

Existing tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#15397 from zsxwing/SPARK-17834.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
… instead of counting on KafkaConsumer

## What changes were proposed in this pull request?

Because `KafkaConsumer.poll(0)` may update the partition offsets, this PR just calls `seekToBeginning` to manually set the earliest offsets for the KafkaSource initial offsets.

## How was this patch tested?

Existing tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#15397 from zsxwing/SPARK-17834.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants