-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26848][SQL][SS] Introduce new option to Kafka source: offset by timestamp (starting/ending) #23747
Conversation
Test build #102099 has finished for PR 23747 at commit
|
Test build #102100 has finished for PR 23747 at commit
|
Test build #102103 has finished for PR 23747 at commit
|
If a user uses a Kafka cluster which runs using an old version that doesn't support timestamp APIs, will their query fail? |
We can just give it a try with Kafka 0.10.0. (I guess Kafka 0.9.x is not API compatible so we don't need to care about it.) Let me install that version and test and get back to you. |
I realized I missed pushing commit adding required version of Kafka to use this feature. Just pushed. Btw, I'm curious we would mind the situation pretty seriously that end users will use Spark 3.0.0 against Kafka 0.10.0.x (last bugfix released to Aug 2016). When end users put Spark 3.0.0 in their production the fastest would be mid this year, then we're supposing end users to use Kafka released 3 years ago while they're brave to be early adopters of newer version of Spark - feel a bit far from realistic. |
Just ran the query with Kafka 0.10.0.
result:
result:
|
Test build #102210 has finished for PR 23747 at commit
|
Does someone involved in Kafka community? I'm seeking Kafka doc for offsetsForTimes, but unlike we update doc as well when introducing a new feature, origin PR for offsetsForTimes doesn't seem to touch doc. (Not sure whether they have doc outside of main repo.) https://github.com/apache/kafka/pull/1215/files Does it expect to refer/read KIP page directly in this case? |
Why not just link to e.g.
https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-
The main kafka documentation at
https://kafka.apache.org/documentation/ has plenty of discussion of
timestamp imho
…On Wed, Feb 13, 2019 at 4:01 AM Gabor Somogyi ***@***.***> wrote:
@gaborgsomogyi commented on this pull request.
________________________________
In docs/structured-streaming-kafka-integration.md:
> @@ -310,6 +310,23 @@ The following configurations are optional:
<table class="table">
<tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
+<tr>
I agree too detailed explanation is bad/redundant and that's the reason why I've suggested compacted limitations.
Stating The start point of timestamp when a query is started is just not true because producer can overwrite this field under some circumstances (not Spark is the only producer). I would refer to Kafka configuration and mention that this field is depending on how Kafka is configured / data produced without mentioning all the details. I think based on the field users can look for KIP in need.
Yeah, offsetsForTimes is not over-documented but that's a Kafka question.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
|
That's also a good suggestion. |
Yeah, just linking to both seems fine
…On Wed, Feb 13, 2019 at 9:58 AM Gabor Somogyi ***@***.***> wrote:
That's also a good suggestion.
I meant something like: timestamp is dependent on
log.message.timestamp.type for further details please see
https://kafka.apache.org/documentation/...
That said either is fine for me.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#23747 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAGAB__q5NvrUIalDfGn854ROU34N2Qwks5vNDYugaJpZM4avzRU>
.
|
OK. Got it. Thanks both of you for the nice recommendation. |
Test build #102312 has finished for PR 23747 at commit
|
IMHO SPARK-23539 and this (SPARK-26848) are good options to be included for Spark 3.0, given other streaming frameworks support these and the features don't break existing thing. This option would also help on batch queries to restrict the range by timestamp which is intuitive than raw offsets. |
@@ -29,4 +29,13 @@ trait KafkaTest extends BeforeAndAfterAll { | |||
super.afterAll() | |||
CachedKafkaProducer.clear() | |||
} | |||
|
|||
def waitForBiggerTimestamp(curTimestamp: Long): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK timestamp field can be overridden in the ProducerRecord
, can't be? Looks like this part generates a lot of unit test wait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes nice suggestion. Will address.
Test build #103140 has finished for PR 23747 at commit
|
Now I have a cluster and started to evaluate this in-depth, it may take some time... |
@tdas @zsxwing @jose-torres Could we consider start reviewing this one to add this feature in 3.0.0? |
@gaborgsomogyi, what is it that you're evaluating in depth? In general, this seems like a reasonable change. I'd make the docs more up-front about the fact that it's a pass-through feature: Spark doesn't do any interpretation or reasoning about the timestamp, it just passes it directly to Kafka for Kafka to translate into an offset. (Otherwise I worry people will get confused and think Spark is responsible for some translation problem they're facing.) |
@@ -179,6 +179,56 @@ private[kafka010] class KafkaOffsetReader( | |||
KafkaSourceOffset(fetched) | |||
} | |||
|
|||
def fetchSpecificTimestampBasedOffsets(topicTimestamps: Map[String, Long]): KafkaSourceOffset = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to share more code with fetchSpecificOffsets() here? I know a lot of the implementation is a bit different, but I don't like the idea of having complicated things like "workaround for KAFKA-7703" in multiple independent places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to do it: if it turned out hard to achieve I'll leave a comment here.
This option leverages `KafkaConsumer.offsetForTimes`: please refer https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map- for details.<p/> | ||
Also the meaning of `timestamp` here can be vary according to Kafka configuration (`log.message.timestamp.type`): please refer https://kafka.apache.org/documentation/ for further details.<p/> | ||
Note: This option requires Kafka 0.10.1.0 or higher.<p/> | ||
Note2: `startingOffsetsByTimestamp` takes precedence over `startingOffsets`.<p/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish there were a way to combine the options instead of having precedence logic, but I can't think of one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... it might be possible if we use timestamp as some of custom format like ts:<timestamp>
, and try to parse it as long for offset, and fail-back to custom format for timestamp. I already considered like this but didn't feel good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
val assignedTopics = partitions.asScala.map(_.topic()) | ||
assert(assignedTopics == topicTimestamps.keySet, | ||
"If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " + | ||
s"topics. Specified: ${topicTimestamps.keySet} Assigned: $assignedTopics") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should test that 0 and System.currentTimeMillis() work straightforwardly as timestamp offsets, so people can use them the same way as -2 and -1 in the non-timestamp case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given we want to state that Spark transparently passes timestamp to Kafka without interpreting and reasoning, would we need to add some test to reason about timestamp semantics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm never opposed to more tests :)
@jose-torres I also think its reasonable change and that's the reason why I'll test on cluster with corner cases, etc... but only next Monday because I'm on vacation... |
00026ac
to
7b51425
Compare
consumer.poll(0) | ||
val partitions = consumer.assignment() | ||
|
||
// Call `position` to wait until the potential offset request triggered by `poll(0)` is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually not executed for fetchEarliestOffsets
right now, but I wouldn't feel it hurts much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind just added a flag to handle it.
Due to deal with function parameters with long param names, the indentation may look a bit weird. Honestly I'm not sure I can point out where the indentations are broken. Comments regarding indentation welcome! |
Test build #103571 has finished for PR 23747 at commit
|
I see. No problem and thanks for reviewing even the patch is not familiar for you. I can wait for other reviewers who can decide to merge.
No. It provides another way to set "offset", by timestamp. For now, end users need to set exact offset number or either have to set latest/earliest, and when they want to run the query starting from specific time point they need to know about exact offset which is inserted at that time. While end users may retrieve it from cli tool (not 100% sure but given they expose API...), it's not convenient to retrieve the offset from Kafka for the time point and set to Spark option. There's another benefit for this change - once they specify the offset to Spark option, unless they also leave comment to describe where the offset came from, the offset number is not showing the intention that they want to run from specific time point. After the patch the intention could be represented very clear on their Spark app.
Maybe we don't need to guide about version issue for both this (>= 0.10) and Kafka header support (>= 0.11). We already use pretty high version of Kafka client so there's no significant change (benefits on code side) on drop supporting old versions. |
Test build #110666 has finished for PR 23747 at commit
|
This now breaks part of change on 88c8d5e. Need to rebase and fix. |
…mestamp (starting/ending) * Remove unnecessary code, and fix scalastyle * Documentation * Add note on Kafka version to support this feature * Address review comment from gaborgsomogyi: added more information regarding timestamp on the doc * Address review comment from gaborgsomogyi * Address review comments from jose-torres * Minor change: fetchEarliestOffset doesn't call unnecessary 'position' * Address review comments from gaborgsomogyi * Address review comment from gaborgsomogyi * Replace ` with <code> tag * Support per-partition timestamp * Some cleaning up * Make query fail when there's no matched offset for starting offset timestamp
22b7dba
to
c8a4938
Compare
Test build #110667 has finished for PR 23747 at commit
|
Test build #4877 has finished for PR 23747 at commit
|
retest this, please |
Test build #111023 has finished for PR 23747 at commit
|
@HeartSaVioR this is good to go? following #23747 (comment) |
@gaborgsomogyi @koeninger any more thoughts on this one or looking OK? |
@srowen no further comment, after revisit my approval still stands.
Since the actual connector is 0.10+ compatible it would be additional effort to drop < 1.0 support so I don't see benefit here. |
It's a good idea. |
Merged to master |
Thanks all for reviewing and merging! |
…-integration page ### What changes were proposed in this pull request? Fix the disorder of `structured-streaming-kafka-integration` page caused by #23747. ### Why are the changes needed? A typo messed up the HTML page. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Locally test by Jekyll. Before:  After:  Closes #27098 from xuanyuanking/SPARK-30426. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…bing topic-partitions in Kafka source ### What changes were proposed in this pull request? This patch is a follow-up of SPARK-26848 (#23747). In SPARK-26848, we decided to open possibility to let end users set individual timestamp per partition. But in many cases, specifying timestamp represents the intention that we would want to go back to specific timestamp and reprocess records, which should be applied to all topics and partitions. This patch proposes to provide a way to set a global timestamp across topic-partitions which the source is subscribing to, so that end users can set all offsets by specific timestamp easily. To provide the way to config the timestamp easier, the new options only receive "a" timestamp for start/end timestamp. New options introduced in this PR: * startingTimestamp * endingTimestamp All two options receive timestamp as string. There're priorities for options regarding starting/ending offset as we will have three options for start offsets and another three options for end offsets. Priorities are following: * starting offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets * ending offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets ### Why are the changes needed? Existing option to specify timestamp as offset is quite verbose if there're a lot of partitions across topics. Suppose there're 100s of partitions in a topic, the json should contain 100s of times of the same timestamp. Also, the number of partitions can also change, which requires either: * fixing the code if the json is statically created * introducing the dependencies on Kafka client and deal with Kafka API on crafting json programmatically Both approaches are even not "acceptable" if we're dealing with ad-hoc query; anyone doesn't want to write the code more complicated than the query itself. Flink [provides the option](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-start-position-configuration) to specify a timestamp for all topic-partitions like this PR, and even doesn't provide the option to specify the timestamp per topic-partition. With this PR, end users are only required to provide a single timestamp value. No more complicated JSON format end users need to know about the structure. ### Does this PR introduce _any_ user-facing change? Yes, this PR introduces two new options, described in above section. Doc changes are following:     ### How was this patch tested? New UTs covering new functionalities. Also manually tested via simple batch & streaming queries. Closes #32609 from HeartSaVioR/SPARK-29223-v2. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
This patch introduces new options "startingOffsetsByTimestamp" and "endingOffsetsByTimestamp" to set specific timestamp per topic (since we're unlikely to set the different value per partition) to let source starts reading from offsets which have equal of greater timestamp, and ends reading until offsets which have equal of greater timestamp.
The new option would be optional of course, and take preference over existing offset options.
How was this patch tested?
New unit tests added. Also manually tested basic functionality with Kafka 2.0.0 server.
Running query below
with below records (one string which number part remarks when they're put after such timestamp) in
topic
spark_26848_test_v1
topic
spark_26848_test_2_v1
the result of
df.show()
follows:Note that endingOffsets (as well as endingOffsetsByTimestamp) are exclusive.