-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40876][SQL] Widening type promotions in Parquet readers #44368
[SPARK-40876][SQL] Widening type promotions in Parquet readers #44368
Conversation
2a640f4
to
a4e8048
Compare
6ddb60b
to
c113d8e
Compare
c113d8e
to
cb1487e
Compare
e1c73e7
to
dc8b489
Compare
} else if (sparkType == DataTypes.DateType) { | ||
if ("CORRECTED".equals(datetimeRebaseMode)) { | ||
return new IntegerUpdater(); | ||
} else { | ||
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); | ||
return new IntegerWithRebaseUpdater(failIfRebase); | ||
} | ||
} else if (sparkType == DataTypes.TimestampNTZType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should support timestamp ltz as well, which is DataTypes.TimestmapType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but this means the parquet reader needs to know the session timezone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, how do we know this INT32 is a logical DATE in parquet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a check to only allow reading INT32 with a date annotation.
I took a stab at Date->TimestampLTZ but it's not trivial and we would need to discuss the expected behavior, I can follow up in a different PR if we decide we need it. It's easy to get wrong and I'd rather disallow it for now.
...ain/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
Show resolved
Hide resolved
...src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
Show resolved
Hide resolved
@@ -1070,17 +1070,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession | |||
} | |||
} | |||
|
|||
test("SPARK-35640: int as long should throw schema incompatible error") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also cc @sunchao
Will this pr solve the problem described in SPARK-35461 too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will address SPARK-35461 - covered by this test case in particular: https://github.com/apache/spark/pull/44368/files#diff-a5cfd7285f9adf95b2aeea90aa57cc35d2b8c6bddaa0f4652172d30a264d3614R153
@cloud-fan Can you merge this PR? |
} else if (sparkType == DataTypes.DateType) { | ||
if ("CORRECTED".equals(datetimeRebaseMode)) { | ||
return new IntegerUpdater(); | ||
} else { | ||
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); | ||
return new IntegerWithRebaseUpdater(failIfRebase); | ||
} | ||
} else if (sparkType == DataTypes.TimestampNTZType && isDateTypeMatched(descriptor)) { | ||
if ("CORRECTED".equals(datetimeRebaseMode)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #44428 , ntz should never rebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, this is date, and we need rebase
boolean needsUpcast = sparkType == LongType || (isDate && sparkType == TimestampNTZType) || | ||
!DecimalType.is32BitDecimalType(sparkType); | ||
boolean needsRebase = logicalTypeAnnotation instanceof DateLogicalTypeAnnotation && | ||
!"CORRECTED".equals(datetimeRebaseMode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
parquetType.getLogicalTypeAnnotation.isInstanceOf[DateLogicalTypeAnnotation] => | ||
new ParquetPrimitiveConverter(updater) { | ||
override def addInt(value: Int): Unit = { | ||
this.updater.set(DateTimeUtils.daysToMicros(dateRebaseFunc(value), ZoneOffset.UTC)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
thanks, merging to master! |
There are 3 test failed in
|
@LuciferYang thanks for catching it! Does it block PR merging? We may need to wait for a few days as it's the holiday season. If you can fix it then it's even better. We can revert it first if it affects other PRs. |
@cloud-fan This is not a serious issue, just ansi mode daily test failed, I don't think we need to revert. |
Just marking this, the issue has been fixed by #44481 |
…cale in Parquet readers ### What changes were proposed in this pull request? This is a follow-up from #44368 implementing an additional type promotion to decimals with larger precision and scale. As long as the precision increases by at least as much as the scale, the decimal values can be promoted without loss of precision: Decimal(6, 2) -> Decimal(8, 4): 1234.56 -> 1234.5600. The non-vectorized reader (parquet-mr) is already able to do this type promotion, this PR implements it for the vectorized reader. ### Why are the changes needed? This allows reading multiple parquet files that contain decimal with different precision/scales ### Does this PR introduce _any_ user-facing change? Yes, the following now succeeds when using the vectorized Parquet reader: ``` Seq(20).toDF($"a".cast(DecimalType(4, 2))).write.parquet(path) spark.read.schema("a decimal(6, 4)").parquet(path).collect() ``` It failed before with the vectorized reader and succeeded with the non-vectorized reader. ### How was this patch tested? - Tests added to `ParquetWideningTypeSuite` to cover decimal promotion between decimals with different physical types: INT32, INT64, FIXED_LEN_BYTE_ARRAY. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44513 from johanl-db/SPARK-40876-parquet-type-promotion-decimal-scale. Authored-by: Johan Lasperas <johan.lasperas@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…n Parquet vectorized reader ### What changes were proposed in this pull request? This is a follow-up from #44368 and #44513, implementing an additional type promotion from integers to decimals in the parquet vectorized reader, bringing it at parity with the non-vectorized reader in that regard. ### Why are the changes needed? This allows reading parquet files that have different schemas and mix decimals and integers - e.g reading files containing either `Decimal(15, 2)` and `INT32` as `Decimal(15, 2)` - as long as the requested decimal type is large enough to accommodate the integer values without precision loss. ### Does this PR introduce _any_ user-facing change? Yes, the following now succeeds when using the vectorized Parquet reader: ``` Seq(20).toDF($"a".cast(IntegerType)).write.parquet(path) spark.read.schema("a decimal(12, 0)").parquet(path).collect() ``` It failed before with the vectorized reader and succeeded with the non-vectorized reader. ### How was this patch tested? - Tests added to `ParquetWideningTypeSuite` - Updated relevant `ParquetQuerySuite` test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44803 from johanl-db/SPARK-40876-widening-promotion-int-to-decimal. Authored-by: Johan Lasperas <johan.lasperas@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request? This PR aims to widen type promotions in `AvroDeserializer`. Supported as following(Avro Type -> Spark Type): - Int -> Long ; - Int -> Double ; - Float -> Double; ### Why are the changes needed? Similar to PR #44368 for `Parquet` reader, we'd better to enable type promotion/widening for `Avro` deserializer. ### Does this PR introduce _any_ user-facing change? Yes, but more convenient for users. ### How was this patch tested? Pass GA and add a new test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47582 from wayneguow/SPARK-49082. Authored-by: Wei Guo <guow93@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This PR aims to widen type promotions in `AvroDeserializer`. Supported as following(Avro Type -> Spark Type): - Int -> Long ; - Int -> Double ; - Float -> Double; ### Why are the changes needed? Similar to PR apache#44368 for `Parquet` reader, we'd better to enable type promotion/widening for `Avro` deserializer. ### Does this PR introduce _any_ user-facing change? Yes, but more convenient for users. ### How was this patch tested? Pass GA and add a new test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47582 from wayneguow/SPARK-49082. Authored-by: Wei Guo <guow93@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This PR aims to widen type promotions in `AvroDeserializer`. Supported as following(Avro Type -> Spark Type): - Int -> Long ; - Int -> Double ; - Float -> Double; ### Why are the changes needed? Similar to PR apache#44368 for `Parquet` reader, we'd better to enable type promotion/widening for `Avro` deserializer. ### Does this PR introduce _any_ user-facing change? Yes, but more convenient for users. ### How was this patch tested? Pass GA and add a new test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47582 from wayneguow/SPARK-49082. Authored-by: Wei Guo <guow93@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This change adds the following conversions to the vectorized and non-vectorized Parquet readers corresponding to type promotions that are strictly widening without precision loss:
Why are the changes needed?
These type promotions support two similar use cases:
The second use case in particular will enable widening the type of columns or fields in existing Delta tables.
Does this PR introduce any user-facing change?
The following fails before this change:
With the Int->Long promotion in both the vectorized and non-vectorized parquet readers, it succeeds and produces correct results, without overflow or loss of precision.
The same is true for Float->Double, Int->Double, Decimal with higher precision and Date->TimestampNTZ
How was this patch tested?
ParquetTypeWideningSuite
covering the promotions included in this change, in particular:LEGACY
andCORRECTED
for Date -> TimestampNTZINT32
,INT64
,BINARY
,FIXED_LEN_BYTE_ARRAY
Was this patch authored or co-authored using generative AI tooling?
No