Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17782][STREAMING][KAFKA] eliminate race condition of poll twice #15387

Closed
wants to merge 2 commits into from

Conversation

koeninger
Copy link
Contributor

What changes were proposed in this pull request?

Kafka consumers can't subscribe or maintain heartbeat without polling, but polling ordinarily consumes messages and adjusts position. We don't want this on the driver, so we poll with a timeout of 0 and pause all topicpartitions.

Some consumer strategies that seek to particular positions have to poll first, but they weren't pausing immediately thereafter. Thus, there was a race condition where the second poll() in the DStream start method might actually adjust consumer position.

Eliminated (or at least drastically reduced the chance of) the race condition via pausing in the relevant consumer strategies, and assert on startup that no messages were consumed.

How was this patch tested?

I reliably reproduced the intermittent test failure by inserting a thread.sleep directly before returning from SubscribePattern. The suggested fix eliminated the failure.

@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66477 has finished for PR 15387 at commit 1fc5863.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 7, 2016

Test build #66479 has finished for PR 15387 at commit aca55de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -223,7 +223,7 @@ private[spark] class DirectKafkaInputDStream[K, V](

override def start(): Unit = {
val c = consumer
c.poll(0)
assert(c.poll(0).isEmpty, "Driver shouldn't consume messages; pause if you poll during setup")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this poll(0) guaranteed to not return any record if the previous poll(0) is paused immediately? Is there a race condition possible where the first poll(0) (inside consumer strategy) manages to actually fetch records internally before it is paused, which is then returned by this poll(0) (inside DStream)?

@koeninger
Copy link
Contributor Author

I'm not going to say anything is impossible, which is the point of the
assert. If it does somehow happen, it will be at start, so should be
obvious.

The whole poll 0 / pause thing is a gross hack, but it's what was suggested
by the kafka project dev list.

On Oct 7, 2016 6:16 AM, "Tathagata Das" notifications@github.com wrote:

@tdas commented on this pull request.

In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/
DirectKafkaInputDStream.scala
#15387 (review):

@@ -223,7 +223,7 @@ private[spark] class DirectKafkaInputDStream[K, V](

override def start(): Unit = {
val c = consumer

  • c.poll(0)
  • assert(c.poll(0).isEmpty, "Driver shouldn't consume messages; pause if you poll during setup")

Is this poll(0) guaranteed to not return any record if the previous
poll(0) is paused immediately? Is there a race condition possible where the
first poll(0) (inside consumer strategy) manages to actually fetch records
internally before it is paused, which is then returned by this poll(0)
(inside DStream)?


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
#15387 (review),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAGAB00e-NfFyaeSVoGw3SAplTC3m6xAks5qxin8gaJpZM4KQlB3
.

@zsxwing
Copy link
Member

zsxwing commented Oct 7, 2016

but polling ordinarily consumes messages and adjusts position.

Even if enable.auto.commit is false? In the doc, it says automatically set as the last committed offset, so I guess setting enable.auto.commit to false will prevent positions from updating.

     * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
     * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
     * offset for the subscribed list of partitions

@koeninger
Copy link
Contributor Author

I set auto commit to false, and still recreated the test failure.

That makes sense to me, consumer position should still be getting updated
in memory even if it isn't saved to storage anywhere.

At any rate, there are valid (albeit ill advised in my opinion) reasons to
turn on autocommit, so I'm not sure it matters.

On Fri, Oct 7, 2016 at 4:24 PM, Shixiong Zhu notifications@github.com
wrote:

but polling ordinarily consumes messages and adjusts position.

Even if enable.auto.commit is false? In the doc, it says automatically
set as the last committed offset, so I guess setting enable.auto.commit
to false will prevent positions from updating.

 * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
 * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
 * offset for the subscribed list of partitions


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
#15387 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAGAB5XiPWUJ07g1tn5RYzNxXA4csbiCks5qxriKgaJpZM4KQlB3
.

@zsxwing
Copy link
Member

zsxwing commented Oct 7, 2016

@koeninger If poll(0) returns a non empty results, how about just subtract the offsets back using the result size? Then we don't need to count on poll(0) not updating the offsets.

@zsxwing
Copy link
Member

zsxwing commented Oct 7, 2016

#15397 is the fix for structured streaming.

@koeninger
Copy link
Contributor Author

You dont want poll consuming messages, its not just about offset
correctness, the driver shouldnt be spending time or bandwidth doing that.
What is the substantive concern with this solution, beyond the general
ugliness inherent in the consumer? The chance that anyone out there has a
custom consumerstrategy that will be impacted by this, and wont have the
sophistication to deal with it, is pretty much nil.

On Friday, October 7, 2016, Shixiong Zhu notifications@github.com wrote:

@koeninger https://github.com/koeninger If poll(0) returns a non empty
results, how about just subtract the offsets back using the result size?
Then we don't need to count on poll(0) not updating the offsets.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#15387 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAGAB9ijyrxxmBt08QFVZkXPwukJH0Teks5qxtlngaJpZM4KQlB3
.

@koeninger
Copy link
Contributor Author

Poll also isn't going to return you just messages for a single
topicpartition, so to do what you're suggesting you'd have to go through
all the messages and do additional processing, not just use result size.

On Fri, Oct 7, 2016 at 7:59 PM, Cody Koeninger cody@koeninger.org wrote:

You dont want poll consuming messages, its not just about offset
correctness, the driver shouldnt be spending time or bandwidth doing that.
What is the substantive concern with this solution, beyond the general
ugliness inherent in the consumer? The chance that anyone out there has a
custom consumerstrategy that will be impacted by this, and wont have the
sophistication to deal with it, is pretty much nil.

On Friday, October 7, 2016, Shixiong Zhu notifications@github.com wrote:

@koeninger https://github.com/koeninger If poll(0) returns a non empty
results, how about just subtract the offsets back using the result size?
Then we don't need to count on poll(0) not updating the offsets.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#15387 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAGAB9ijyrxxmBt08QFVZkXPwukJH0Teks5qxtlngaJpZM4KQlB3
.

@koeninger
Copy link
Contributor Author

If the concern is TD's comment,
"Future calls to {@link #poll(long)} will not return any records from these partitions until they have been resumed using {@link #resume(Collection)}."
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1353

During the original implementation I had verified that calling pause kills the internal message buffer, which is one of the complications leading to a cached consumer per partition.

I really don't think it's going to happen, but the assert is in there for paranoia, and to be explicit about the conditions.

@koeninger
Copy link
Contributor Author

Let me know if you guys like that alternative PR better

@zsxwing
Copy link
Member

zsxwing commented Oct 10, 2016

During the original implementation I had verified that calling pause kills the internal message buffer, which is one of the complications leading to a cached consumer per partition.

I observed the same behavior during my debug. I found that the first poll(0) will always send a request to prefetch the data. Pausing partitions just prevents the second poll(0) from returning anything at here (the data has been fetched): https://github.com/apache/kafka/blob/0.10.0.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L527

You dont want poll consuming messages, its not just about offset
correctness, the driver shouldnt be spending time or bandwidth doing that.

I think you have agreed that this is impossible via current KafkaConsumer APIs as well.

However, the unknown thing to me is that if the first poll(0) will return something. I saw the first poll(0) will always send the fetching request, but I'm not sure that if it's possible that the response will be processed in the first poll(0). If this could happen, pausing partitions will not help in such case since it's called after the first poll(0). In addition, since it's unclear in javadoc, it could be changed in the future. That's why I decided to manually seek to the beginning in #15397.

@koeninger
Copy link
Contributor Author

If you're worried about it then accept the alternative PR I linked.

On Sun, Oct 9, 2016 at 11:37 PM, Shixiong Zhu notifications@github.com
wrote:

During the original implementation I had verified that calling pause kills
the internal message buffer, which is one of the complications leading to a
cached consumer per partition.

I observed the same behavior during my debug. I found that the first
poll(0) will always send a request to prefetch the data. Pausing
partitions just prevents the second poll(0) from returning anything at
here: https://github.com/apache/kafka/blob/0.10.0.1/clients/
src/main/java/org/apache/kafka/clients/consumer/
internals/Fetcher.java#L527

You dont want poll consuming messages, its not just about offset
correctness, the driver shouldnt be spending time or bandwidth doing that.

I think you have agreed that this is impossible via current KafkaConsumer
APIs as well.

However, the unknown thing to me is that if the first poll(0) will return
something. I saw the first poll(0) will always send the fetching request,
but I'm not sure that if it's possible that the response will be processed
in the first poll(0). If this could happen, pausing partitions will not
help in such case since it's called after the first poll(0). In addition,
since it's unclear in javadoc, it could be changed in the future. That's
why I decided to manually seek to the beginning in #15397
#15397.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#15387 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAGAB7VIjhqXkLqQ-3NtKi3oQYokztLOks5qycEegaJpZM4KQlB3
.

ghost pushed a commit to dbtsai/spark that referenced this pull request Oct 12, 2016
…of poll twice

## What changes were proposed in this pull request?

Alternative approach to apache#15387

Author: cody koeninger <cody@koeninger.org>

Closes apache#15401 from koeninger/SPARK-17782-alt.
asfgit pushed a commit that referenced this pull request Oct 12, 2016
…of poll twice

## What changes were proposed in this pull request?

Alternative approach to #15387

Author: cody koeninger <cody@koeninger.org>

Closes #15401 from koeninger/SPARK-17782-alt.

(cherry picked from commit f9a56a1)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
@koeninger koeninger closed this Oct 12, 2016
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…of poll twice

## What changes were proposed in this pull request?

Alternative approach to apache#15387

Author: cody koeninger <cody@koeninger.org>

Closes apache#15401 from koeninger/SPARK-17782-alt.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants