-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Snowpipe Streaming] Fix IndexOutOfBoundException thrown when offsets are not continous during schema-evolution #1037
base: master
Are you sure you want to change the base?
Conversation
hi @sfc-gh-mbobowski @sfc-gh-achyzy could you please review this quick small bug-fix ? [Please guide me on how to sign CLA for this repository, here's my emailID : swasnik@confluent.io. Thanks !] |
hi @sfc-gh-achyzy @sfc-gh-mbobowski @sfc-gh-gjachimko Requesting your reviews on this small bug-fix, thanks ! |
1 similar comment
hi @sfc-gh-achyzy @sfc-gh-mbobowski @sfc-gh-gjachimko Requesting your reviews on this small bug-fix, thanks ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the contribution! Good catch, just two minor things and we should be good to go
...ain/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java
Outdated
Show resolved
Hide resolved
Thank you @sfc-gh-dseweryn and @sfc-gh-akowalczyk for review and approval ! |
Apologies. |
Connector Exception stacktrace :
Due to this connector could not
evolve-schema
(even ifsnowflake.role.name
had correct permissions)Reproduction on ITs : #1036
What was the bug ?
This computation is not right.
When Offsets are continous, lets checkout
records
,offsets
andsinkRecords
objectssinkRecords : [SinkRecord<2000>, SinkRecord<2001> .... SinkRecord<2100>] (101 records)
records : [Map<2000>, .... Map<2100>]
offsets : [2000, ... 2100].
For
idx
= 10=> long originalSinkRecordIdx = 2010 - 2000 = 10\
If offsets are not continous (with gaps, like when using FilterSMT) ->
sinkRecords : [SinkRecord<2000>, SinkRecord<2002> .... SinkRecord<2100>] (51 records)
records : [Map<2000>, Map<2002>, .... Map<2100>]
offsets : [2000, 2002,... 2100].
For
idx
= 10=> long originalSinkRecordIdx = 2020 - 2000 = 20 (incorrect).
[we want SinkRecord at 10th index].
Pre-review checklist
snowflake.ingestion.method
.Yes
- Added end to end and Unit Tests.No
- Suggest why it is not param protected (This is a bug-fix)Urgency
This review is high priority