Skip to content

Commit e2cc0be

Browse files
johanl-dblongvu-db
authored andcommittedAug 28, 2024
[Kernel] Add widening type conversions to Kernel default parquet reader (delta-io#3541)
\## Description Add a set of conversions to the default parquet reader provided by kernel to allow reading columns using a wider type than the actual in the parquet file. This will support the type widening table feature, see https://github.com/delta-io/delta/blob/master/protocol_rfcs/type-widening.md. Conversions added: - INT32 -> long - FLOAT -> double - decimal precision/scale increase - DATE -> timestamp_ntz - INT32 -> double - integers -> decimal ## How was this patch tested? Added tests covering all conversions in `ParquetColumnReaderSuite` ## Does this PR introduce _any_ user-facing changes? This change alone doesn't allow reading Delta table that use the type widening table feature. That feature is still unsupported. It does allow reading Delta tables that somehow have Parquet files that contain types that are different from the table schema, but that really should never happen for tables that don't support type widening..
1 parent 82cb055 commit e2cc0be

File tree

5 files changed

+208
-6
lines changed

5 files changed

+208
-6
lines changed
 

‎kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultKernelUtils.java

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.delta.kernel.types.DataType;
2222
import io.delta.kernel.types.StructType;
2323
import java.time.Instant;
24+
import java.time.LocalDate;
2425
import java.time.LocalDateTime;
2526
import java.time.ZoneOffset;
2627
import java.time.format.DateTimeFormatter;
@@ -71,6 +72,15 @@ public static long millisToMicros(long millis) {
7172
return Math.multiplyExact(millis, DateTimeConstants.MICROS_PER_MILLIS);
7273
}
7374

75+
/**
76+
* Converts a number of days since epoch (1970-01-01 00:00:00 UTC) to microseconds between epoch
77+
* and start of the day in the given timezone.
78+
*/
79+
public static long daysToMicros(int days, ZoneOffset timezone) {
80+
long seconds = LocalDate.ofEpochDay(days).atStartOfDay(timezone).toEpochSecond();
81+
return seconds * DateTimeConstants.MICROS_PER_SECOND;
82+
}
83+
7484
/**
7585
* Parses a TimestampNTZ string in UTC format, supporting milliseconds and microseconds, to
7686
* microseconds since the Unix epoch.

‎kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/DecimalColumnReader.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.delta.kernel.types.DecimalType;
2929
import java.math.BigDecimal;
3030
import java.math.BigInteger;
31+
import java.math.RoundingMode;
3132
import java.util.Arrays;
3233
import org.apache.parquet.column.Dictionary;
3334
import org.apache.parquet.io.api.Binary;
@@ -101,20 +102,23 @@ public abstract static class BaseDecimalColumnReader extends BasePrimitiveColumn
101102
// working state
102103
private BigDecimal[] values;
103104

104-
private final DataType dataType;
105+
private final DecimalType decimalType;
106+
105107
private final int scale;
106108
protected BigDecimal[] expandedDictionary;
107109

108110
BaseDecimalColumnReader(DataType dataType, int precision, int scale, int initialBatchSize) {
109111
super(initialBatchSize);
110112
DecimalType decimalType = (DecimalType) dataType;
113+
int scaleIncrease = decimalType.getScale() - scale;
114+
int precisionIncrease = decimalType.getPrecision() - precision;
111115
checkArgument(
112-
decimalType.getPrecision() == precision && decimalType.getScale() == scale,
116+
scaleIncrease >= 0 && precisionIncrease >= scaleIncrease,
113117
String.format(
114118
"Found Delta type %s but Parquet type has precision=%s and scale=%s",
115119
decimalType, precision, scale));
116120
this.scale = scale;
117-
this.dataType = dataType;
121+
this.decimalType = decimalType;
118122
this.values = new BigDecimal[initialBatchSize];
119123
}
120124

@@ -130,6 +134,9 @@ public boolean hasDictionarySupport() {
130134
protected void addDecimal(BigDecimal value) {
131135
resizeIfNeeded();
132136
this.nullability[currentRowIndex] = false;
137+
if (decimalType.getScale() != scale) {
138+
value = value.setScale(decimalType.getScale(), RoundingMode.UNNECESSARY);
139+
}
133140
this.values[currentRowIndex] = value;
134141
}
135142

@@ -140,7 +147,7 @@ public void addValueFromDictionary(int dictionaryId) {
140147

141148
@Override
142149
public ColumnVector getDataColumnVector(int batchSize) {
143-
ColumnVector vector = new DefaultDecimalVector(dataType, batchSize, values);
150+
ColumnVector vector = new DefaultDecimalVector(decimalType, batchSize, values);
144151
// re-initialize the working space
145152
this.nullability = ParquetColumnReaders.initNullabilityVector(nullability.length);
146153
this.values = new BigDecimal[values.length];

‎kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnReaders.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public static class IntColumnReader extends BasePrimitiveColumnReader {
267267

268268
IntColumnReader(DataType dataType, int initialBatchSize) {
269269
super(initialBatchSize);
270-
checkArgument(dataType instanceof IntegerType || dataType instanceof DataType);
270+
checkArgument(dataType instanceof IntegerType || dataType instanceof DateType);
271271
this.dataType = dataType;
272272
this.values = new int[initialBatchSize];
273273
}
@@ -315,6 +315,13 @@ public static class LongColumnReader extends BasePrimitiveColumnReader {
315315
this.values = new long[initialBatchSize];
316316
}
317317

318+
@Override
319+
public void addInt(int value) {
320+
resizeIfNeeded();
321+
this.nullability[currentRowIndex] = false;
322+
this.values[currentRowIndex] = value;
323+
}
324+
318325
@Override
319326
public void addLong(long value) {
320327
resizeIfNeeded();
@@ -388,6 +395,20 @@ public static class DoubleColumnReader extends BasePrimitiveColumnReader {
388395
this.values = new double[initialBatchSize];
389396
}
390397

398+
@Override
399+
public void addInt(int value) {
400+
resizeIfNeeded();
401+
this.nullability[currentRowIndex] = false;
402+
this.values[currentRowIndex] = value;
403+
}
404+
405+
@Override
406+
public void addFloat(float value) {
407+
resizeIfNeeded();
408+
this.nullability[currentRowIndex] = false;
409+
this.values[currentRowIndex] = value;
410+
}
411+
391412
@Override
392413
public void addDouble(double value) {
393414
resizeIfNeeded();

‎kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/TimestampConverters.java

+18
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
package io.delta.kernel.defaults.internal.parquet;
1717

1818
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
19+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
1920
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
2021
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
2122

2223
import io.delta.kernel.defaults.internal.DefaultKernelUtils;
2324
import io.delta.kernel.types.*;
2425
import java.nio.ByteBuffer;
2526
import java.nio.ByteOrder;
27+
import java.time.ZoneOffset;
2628
import org.apache.parquet.io.api.Binary;
2729
import org.apache.parquet.io.api.Converter;
2830
import org.apache.parquet.schema.*;
@@ -75,6 +77,10 @@ public static Converter createTimestampConverter(
7577
throw new UnsupportedOperationException(
7678
String.format("Unsupported Parquet TimeType unit=%s", timestamp.getUnit()));
7779
}
80+
} else if (typeFromClient == TimestampNTZType.TIMESTAMP_NTZ
81+
&& primType.getPrimitiveTypeName() == INT32
82+
&& typeAnnotation instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
83+
return new DateToTimestampNTZConverter(typeFromClient, initialBatchSize);
7884
} else {
7985
throw new RuntimeException(
8086
String.format("Unsupported timestamp column with Parquet type %s.", typeFromFile));
@@ -126,6 +132,18 @@ public void addLong(long value) {
126132
}
127133
}
128134

135+
public static class DateToTimestampNTZConverter extends ParquetColumnReaders.LongColumnReader {
136+
137+
DateToTimestampNTZConverter(DataType dataType, int initialBatchSize) {
138+
super(validTimestampType(dataType), initialBatchSize);
139+
}
140+
141+
@Override
142+
public void addInt(int value) {
143+
super.addLong(DefaultKernelUtils.daysToMicros(value, ZoneOffset.UTC));
144+
}
145+
}
146+
129147
private static DataType validTimestampType(DataType dataType) {
130148
checkArgument(dataType instanceof TimestampType || dataType instanceof TimestampNTZType);
131149
return dataType;

‎kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala

+147-1
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616
package io.delta.kernel.defaults.internal.parquet
1717

1818
import java.math.BigDecimal
19-
import io.delta.golden.GoldenTableUtils.goldenTableFile
19+
import java.util.TimeZone
20+
21+
import io.delta.golden.GoldenTableUtils.{goldenTableFile, goldenTablePath}
2022
import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow}
2123
import io.delta.kernel.test.VectorTestUtils
2224
import io.delta.kernel.types._
25+
import org.apache.spark.sql.internal.SQLConf
2326
import org.scalatest.funsuite.AnyFunSuite
27+
import org.apache.parquet.io.ParquetDecodingException
2428

2529
class ParquetFileReaderSuite extends AnyFunSuite
2630
with ParquetSuiteBase with VectorTestUtils with ExpressionTestUtils {
@@ -88,6 +92,148 @@ class ParquetFileReaderSuite extends AnyFunSuite
8892
}
8993
}
9094

95+
/////////////////////////////////////////////////////////////////////////////////////////////////
96+
// Tests covering reading parquet values into a wider column type //
97+
/////////////////////////////////////////////////////////////////////////////////////////////////
98+
/**
99+
* Test case for reading a column using a given type.
100+
* @param columnName Column to read from the file
101+
* @param toType Read type to use. May be different from the actually Parquet type.
102+
* @param expectedExpr Expression returning the expected value for each row in the file.
103+
*/
104+
case class TestCase(columnName: String, toType: DataType, expectedExpr: Int => Any)
105+
106+
private val supportedConversions: Seq[TestCase] = Seq(
107+
// 'ByteType' column was generated with overflowing values, we need to call i.toByte to also
108+
// wrap around here and generate the correct expected values.
109+
TestCase("ByteType", ShortType.SHORT, i => if (i % 72 != 0) i.toByte.toShort else null),
110+
TestCase("ByteType", IntegerType.INTEGER, i => if (i % 72 != 0) i.toByte.toInt else null),
111+
TestCase("ByteType", LongType.LONG, i => if (i % 72 != 0) i.toByte.toLong else null),
112+
TestCase("ByteType", DoubleType.DOUBLE, i => if (i % 72 != 0) i.toByte.toDouble else null),
113+
TestCase("ShortType", IntegerType.INTEGER, i => if (i % 56 != 0) i else null),
114+
TestCase("ShortType", LongType.LONG, i => if (i % 56 != 0) i.toLong else null),
115+
TestCase("ShortType", DoubleType.DOUBLE, i => if (i % 56 != 0) i.toDouble else null),
116+
TestCase("IntegerType", LongType.LONG, i => if (i % 23 != 0) i.toLong else null),
117+
TestCase("IntegerType", DoubleType.DOUBLE, i => if (i % 23 != 0) i.toDouble else null),
118+
119+
TestCase("FloatType", DoubleType.DOUBLE,
120+
i => if (i % 28 != 0) (i * 0.234).toFloat.toDouble else null),
121+
TestCase("decimal", new DecimalType(12, 2),
122+
i => if (i % 67 != 0) java.math.BigDecimal.valueOf(i * 12352, 2) else null),
123+
TestCase("decimal", new DecimalType(12, 4),
124+
i => if (i % 67 != 0) java.math.BigDecimal.valueOf(i * 1235200, 4) else null),
125+
TestCase("decimal", new DecimalType(26, 10),
126+
i => if (i % 67 != 0) java.math.BigDecimal.valueOf(i * 12352, 2).setScale(10)
127+
else null),
128+
TestCase("IntegerType", new DecimalType(10, 0),
129+
i => if (i % 23 != 0) new java.math.BigDecimal(i) else null),
130+
TestCase("IntegerType", new DecimalType(16, 4),
131+
i => if (i % 23 != 0) new java.math.BigDecimal(i).setScale(4) else null),
132+
TestCase("LongType", new DecimalType(20, 0),
133+
i => if (i % 25 != 0) new java.math.BigDecimal(i + 1) else null),
134+
TestCase("LongType", new DecimalType(28, 6),
135+
i => if (i % 25 != 0) new java.math.BigDecimal(i + 1).setScale(6) else null),
136+
137+
TestCase("BinaryType", StringType.STRING, i => if (i % 59 != 0) i.toString else null)
138+
)
139+
140+
// The following conversions are supported by Kernel but not by Spark with parquet-mr.
141+
// TODO: We should properly reject these conversions, a lot of them produce wrong results.
142+
// Collecting them here to document the current behavior.
143+
private val kernelOnlyConversions: Seq[TestCase] = Seq(
144+
// This conversions will silently overflow.
145+
TestCase("ShortType", ByteType.BYTE, i => if (i % 56 != 0) i.toByte else null),
146+
TestCase("IntegerType", ByteType.BYTE, i => if (i % 23 != 0) i.toByte else null),
147+
TestCase("IntegerType", ShortType.SHORT, i => if (i % 23 != 0) i.toShort else null),
148+
149+
// This is reading the unscaled decimal value as long which is wrong.
150+
TestCase("decimal", LongType.LONG, i => if (i % 67 != 0) i.toLong * 12352 else null),
151+
152+
// The following conversions seem legit, although Spark rejects them.
153+
TestCase("ByteType", DateType.DATE, i => if (i % 72 != 0) i.toByte.toInt else null),
154+
TestCase("ShortType", DateType.DATE, i => if (i % 56 != 0) i else null),
155+
TestCase("IntegerType", DateType.DATE, i => if (i % 23 != 0) i else null),
156+
TestCase("StringType", BinaryType.BINARY, i => if (i % 57 != 0) i.toString.getBytes else null)
157+
)
158+
159+
for (testCase <- supportedConversions ++ kernelOnlyConversions)
160+
test(s"parquet supported conversion - ${testCase.columnName} -> ${testCase.toType.toString}") {
161+
val inputLocation = goldenTablePath("parquet-all-types")
162+
val readSchema = new StructType().add(testCase.columnName, testCase.toType)
163+
val result = readParquetFilesUsingKernel(inputLocation, readSchema)
164+
val expected = (0 until 200)
165+
.map { i => TestRow(testCase.expectedExpr(i))}
166+
checkAnswer(result, expected)
167+
168+
if (!kernelOnlyConversions.contains(testCase)) {
169+
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
170+
val sparkResult = readParquetFilesUsingSpark(inputLocation, readSchema)
171+
checkAnswer(result, sparkResult)
172+
}
173+
}
174+
}
175+
176+
test (s"parquet supported conversion - date -> timestamp_ntz") {
177+
val timezones =
178+
Seq("UTC", "Iceland", "PST", "America/Los_Angeles", "Etc/GMT+9", "Asia/Beirut", "JST")
179+
for (fromTimezone <- timezones; toTimezone <- timezones) {
180+
val inputLocation = goldenTablePath(s"data-reader-date-types-$fromTimezone")
181+
TimeZone.setDefault(TimeZone.getTimeZone(toTimezone))
182+
183+
val readSchema = new StructType().add("date", TimestampNTZType.TIMESTAMP_NTZ)
184+
val result = readParquetFilesUsingKernel(inputLocation, readSchema)
185+
// 1577836800000000L -> 2020-01-01 00:00:00 UTC
186+
checkAnswer(result, Seq(TestRow(1577836800000000L)))
187+
}
188+
}
189+
190+
def checkParquetReadError(inputLocation: String, readSchema: StructType): Unit = {
191+
val ex = intercept[Throwable] {
192+
readParquetFilesUsingKernel(inputLocation, readSchema)
193+
}
194+
// We don't properly reject conversions and the error we get vary a lot, this checks various
195+
// error message we may get as result.
196+
// TODO: Uniformize rejecting unsupported conversions.
197+
assert(
198+
ex.getMessage.contains("Can not read value") ||
199+
ex.getMessage.contains("column with Parquet type") ||
200+
ex.getMessage.contains("Unable to create Parquet converter for") ||
201+
ex.getMessage.contains("Found Delta type Decimal") ||
202+
ex.getMessage.contains("cannot be cast to")
203+
)
204+
}
205+
206+
for(column <- Seq("BooleanType", "ByteType", "ShortType", "IntegerType", "LongType",
207+
"FloatType", "DoubleType", "StringType", "BinaryType")) {
208+
test(s"parquet unsupported conversion from $column") {
209+
val inputLocation = goldenTablePath("parquet-all-types")
210+
val supportedTypes = (supportedConversions ++ kernelOnlyConversions)
211+
.filter(_.columnName == column)
212+
.map(_.toType)
213+
val unsupportedTypes = ALL_TYPES
214+
.filterNot(supportedTypes.contains)
215+
.filterNot(_.getClass.getSimpleName == column)
216+
217+
for (toType <- unsupportedTypes) {
218+
val readSchema = new StructType().add(column, toType)
219+
withClue(s"Converting $column to $toType") {
220+
checkParquetReadError(inputLocation, readSchema)
221+
}
222+
}
223+
}
224+
}
225+
226+
test(s"parquet unsupported conversion from decimal") {
227+
val inputLocation = goldenTablePath("parquet-all-types")
228+
// 'decimal' column is Decimal(10, 2) which fits into a long.
229+
for (toType <- ALL_TYPES.filterNot(_ == LongType.LONG)) {
230+
val readSchema = new StructType().add("decimal", toType)
231+
withClue(s"Converting decimal to $toType") {
232+
checkParquetReadError(inputLocation, readSchema)
233+
}
234+
}
235+
}
236+
91237
test("read subset of columns") {
92238
val tablePath = goldenTableFile("parquet-all-types").getAbsolutePath
93239
val readSchema = new StructType()

0 commit comments

Comments
 (0)
Please sign in to comment.