-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redis sink flushes only rows that have more recent eventTimestamp #913
Redis sink flushes only rows that have more recent eventTimestamp #913
Conversation
/retest |
3807415
to
07fc09c
Compare
/test test-end-to-end |
* @param <Input> | ||
* @param <Output> | ||
*/ | ||
public class BatchDoFnWithRedis<Input, Output> extends DoFn<Input, Output> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seems to be only one extension of this base class (RedisCustomIO.WriteDoFn
). Is this base class necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanted to separate redis wrapper code from business logic
storage/connectors/redis/src/main/java/feast/storage/connectors/redis/writer/RedisCustomIO.java
Show resolved
Hide resolved
/test test-end-to-end |
/test test-end-to-end-batch |
/lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: pyalex, woop The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
* redis sink read then write * fix load tests * e2e * fix * specify feature ref * move test up in order * set project default * some docs * reorder e2e tests * reorder e2e tests * reorder e2e tests
What this PR does / why we need it:
Since we keep only one row per entity in Redis storage previously it was unpredictable which row will be written, especially in batch ingestion. In this PR we read existing value from redis first to compare eventTimestamps and only if row from stream has more recent eventTimestamp it will be written to store. This guarantees that more recent row won't be overwritten with older value. Now redis will have the latest value for entity.
Also, I refactored tests for redis sink to remove duplicated parts of code and use
@Parametrized
tests instead.Which issue(s) this PR fixes:
Fixes #
Does this PR introduce a user-facing change?: