-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8726] s3/gcs incremental source should stick to checkpoint v1 #12688
[HUDI-8726] s3/gcs incremental source should stick to checkpoint v1 #12688
Conversation
75369c9
to
65a87a7
Compare
33e010e
to
4fc962e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Source code changes looks good to me.
while I review test code, reminding @yihua to review the logic and certify
00653bc
to
0d26d85
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few minor comments
|
||
public class CheckpointUtils { | ||
|
||
public static final Set<String> DATASOURCES_MUST_USE_CKP_V1 = new HashSet<>(Arrays.asList( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we name this "DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
"6, org.apache.hudi.utilities.sources.AnotherSource, false", | ||
// Disallowed sources should return false even with version >= 8 | ||
"8, org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource, false", | ||
"8, org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource, false" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also add MockS3EventsHoodieIncrSource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -49,14 +59,15 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) { | |||
throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata()); | |||
} | |||
|
|||
public static boolean targetCheckpointV2(int writeTableVersion) { | |||
return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode(); | |||
public static boolean targetCheckpointV2(int writeTableVersion, String sourceClassName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets rename this to
shouldTargetCheckpointV2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
// TODO(yihua): for checkpoint translation, handle cases where the checkpoint is not exactly the | ||
// instant or completion time | ||
public static StreamerCheckpointV2 convertToCheckpointV2ForCommitTime( | ||
Checkpoint checkpoint, HoodieTableMetaClient metaClient) { | ||
Checkpoint checkpoint, HoodieTableMetaClient metaClient, TimelineUtils.HollowCommitHandling handlingMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor.
hollowCommitHandlingMode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -188,7 +188,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext, | |||
private def translateCheckpoint(commitTime: String): String = { | |||
if (writeTableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) { | |||
CheckpointUtils.convertToCheckpointV2ForCommitTime( | |||
new StreamerCheckpointV1(commitTime), metaClient).getCheckpointKey | |||
new StreamerCheckpointV1(commitTime), metaClient, hollowCommitHandling).getCheckpointKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here. naming -> hollowCommitHandlingMode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* This class simulates different checkpoint and data fetch scenarios to test the checkpoint handling | ||
* and data ingestion behavior of the StreamSync class. | ||
*/ | ||
public class MockS3EventsHoodieIncrSource extends S3EventsHoodieIncrSource { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we write a similar one for GCS ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -149,32 +155,6 @@ void testFetchNextBatchFromSource(Boolean useRowWriter, Boolean hasTransformer, | |||
HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()); | |||
} | |||
|
|||
@ParameterizedTest | |||
@MethodSource("getCheckpointToResumeCases") | |||
void testGetCheckpointToResume(HoodieStreamer.Config cfg, HoodieCommitMetadata commitMetadata, Option<String> expectedResumeCheckpoint) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why removed this? did you move this elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, check hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamerCheckpointUtils.java
hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
Show resolved
Hide resolved
397ddda
to
d051243
Compare
@Davis-Zhang-Onehouse : there are some CI failures. can you follow up. |
fixed the checkstyle, will ping once CI all green and is ready to merge. Thanks! |
Pure refactoring of the test classes - moving some method to the parent class so the subsequent commits can use them. Extracting methods to a new class so it can be consumed by later changes.
For testing ingestion flow e2e to ensure we consume the right checkpoint version and write out the right checkpoint version, we split the coverage into 2 parts: - Test anything interacting with S3/GCS incremental source. This is done by introducing a dummy class as an injected dependency so we can do validations and trigger the ingestion code covering the e2e behavior - Test S3/GCS incremental source itself, this is done by existing unit test against the class, they have done the testing already about the relevant code part.
Previously in order for EMPTY_ROW_SET_NONE_NULL_CKP to return different value, we create different BiFunction with hard coded value. Now it is parameterized via RETURN_CHECKPOINT_KEY. Also moved some constant member to the right util class.
Renaming & minor unit test changes. Next PR we introduce MockGcs source coverage just like MockS3 source.
7cffcb8
to
2a21429
Compare
2a21429
to
6e47bcc
Compare
Reviewers please review commit by commit, checking the commit messages would save you great amount of time
Change Logs
Fixed and bump up massive test coverage for checkpoint and some missing coverage when related code is introduced
How the checkpoint is consumed
How it is produced (from the deepest call stack to the caller at a higher level)
Impact
as title mentioned
Risk level (write none, low medium or high below)
low
Documentation Update
none
Contributor's checklist
Apart from basic unit test, we need e2e functional test of the s3 / gcs source. I can only think of writing IT test coverage.