-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Drop Type widening feature: read Parquet footers to collect files to rewrite #3155
[Spark] Drop Type widening feature: read Parquet footers to collect files to rewrite #3155
Conversation
8fe1968
to
29e7fd0
Compare
@@ -323,8 +324,9 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) | |||
reorgTableSpec = DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) | |||
)(Nil) | |||
|
|||
reorg.run(table.spark) | |||
numFilesToRewrite | |||
val rows = reorg.run(table.spark) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this always guaranteed to return at least one row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -309,8 +309,9 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) | |||
* @return Return the number of files rewritten. | |||
*/ | |||
private def rewriteFilesIfNeeded(): Long = { | |||
val numFilesToRewrite = TypeWidening.numFilesRequiringRewrite(table.initialSnapshot) | |||
if (numFilesToRewrite == 0L) return 0L | |||
if (!TypeWideningMetadata.containsTypeWideningMetadata(table.initialSnapshot.schema)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it guaranteed that there are no files to rewrite if the metadata is not contained?
I guess if there is no metadata but there are files the table is not fully readable anyways?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is part of the feature spec: https://github.com/delta-io/delta/blob/master/protocol_rfcs/type-widening.md?plain=1#L125
The metadata must be preserved unless all files have been rewritten since the latest type change.
This makes dropping the feature a lot more efficient in case:
- There was no type change applied to the table: no need to go and check all parquet footers
- The DROP FEATURE must be re-run after waiting for the retention period to pass, again no need to read the footers.
Note: this was already the case before sincenumFilesRequiringRewrite
relied on the presence of type widening metadata.
spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableHelper.scala
Show resolved
Hide resolved
val footers = DeltaFileOperations.readParquetFootersInParallel( | ||
configuration, | ||
fileStatuses.toList, | ||
ignoreCorruptFiles) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this param will allow it to ignore corrupt files? Should we instead never ignore corrupt files? It could potentially be dangerous because a corrupt file might recover later on and become unreadable.
Any modification we do on tables are potentially unsafe if we ignore corrupt files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I updated it to always fail on corrupted files + added a test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thanks
What changes were proposed in this pull request?
The initial approach to identify files that contain a type that differs from the table schema and that must be rewritten before dropping the type widening table feature is convoluted and turns out to be more brittle than intended.
This change switches instead to directly reading the file schema from the Parquet footer and rewriting all files that have a mismatching type.
Additional Context
Files are identified using their default row commit version (a part of the row tracking feature) and matched against type changes previously applied to the table and recorded in the table metadata: any file written before the latest type change should use a different type and must be rewritten.
This requires multiple pieces of information to be accurately tracked:
Any bug will likely lead to files not being correctly rewritten before removing the table feature, potentially leaving the table in an unreadable state.
How was this patch tested?
Tests added in previous PR to cover CLONE and RESTORE: #3053
Tests added and updated in this PR to cover rewriting files with different column types when removing the table feature.