diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/internal/parquet/TestParquetBatchReader.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/internal/parquet/TestParquetBatchReader.java deleted file mode 100644 index 5eac022e2b3..00000000000 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/internal/parquet/TestParquetBatchReader.java +++ /dev/null @@ -1,589 +0,0 @@ -/* - * Copyright (2023) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.kernel.defaults.internal.parquet; - -import java.io.File; -import java.io.IOException; -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.net.URI; -import java.util.*; -import java.util.stream.Collectors; - -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; -import static io.delta.golden.GoldenTableUtils.goldenTableFile; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; - -import io.delta.kernel.data.*; -import io.delta.kernel.types.*; -import io.delta.kernel.utils.CloseableIterator; - -import io.delta.kernel.internal.util.Tuple2; -import io.delta.kernel.internal.util.VectorUtils; - -import io.delta.kernel.defaults.utils.DefaultKernelTestUtils; - -import io.delta.kernel.defaults.internal.DefaultKernelUtils; - -public class TestParquetBatchReader { - /** - * Test reads data from a Parquet file with data of various combinations of data types supported - * by the Delta Lake table protocol. - */ - private static final String ALL_TYPES_FILE = - Arrays.stream(goldenTableFile("parquet-all-types").listFiles()) - .filter(file -> file.getName().endsWith(".parquet")) - .map(File::getAbsolutePath) - .findFirst() - .get(); - - private static final StructType ALL_TYPES_FILE_SCHEMA = new StructType() - .add("byteType", ByteType.BYTE) - .add("shortType", ShortType.SHORT) - .add("integerType", IntegerType.INTEGER) - .add("longType", LongType.LONG) - .add("floatType", FloatType.FLOAT) - .add("doubleType", DoubleType.DOUBLE) - .add("decimal", new DecimalType(10, 2)) - .add("booleanType", BooleanType.BOOLEAN) - .add("stringType", StringType.STRING) - .add("binaryType", BinaryType.BINARY) - .add("dateType", DateType.DATE) - .add("timestampType", TimestampType.TIMESTAMP) - .add("nested_struct", - new StructType() - .add("aa", StringType.STRING) - .add("ac", new StructType().add("aca", IntegerType.INTEGER))) - .add("array_of_prims", - new ArrayType(IntegerType.INTEGER, true)) - .add("array_of_arrays", - new ArrayType(new ArrayType(IntegerType.INTEGER, true), true)) - .add("array_of_structs", - new ArrayType(new StructType().add("ab", LongType.LONG), true)) - .add("map_of_prims", new MapType(IntegerType.INTEGER, LongType.LONG, true)) - .add("map_of_rows", new MapType( - IntegerType.INTEGER, - new StructType().add("ab", LongType.LONG), - true)) - .add("map_of_arrays", new MapType( - LongType.LONG, - new ArrayType(IntegerType.INTEGER, true), - true)); - - @Test - public void readAllTypesOfData() - throws Exception { - readAndVerify(ALL_TYPES_FILE_SCHEMA, 90 /* readBatchSize */); - } - - @Test - public void readSubsetOfColumns() - throws Exception { - StructType readSchema = new StructType() - .add("byteType", ByteType.BYTE) - .add("booleanType", BooleanType.BOOLEAN) - .add("stringType", StringType.STRING) - .add("dateType", DateType.DATE) - .add("nested_struct", - new StructType() - .add("aa", StringType.STRING) - .add("ac", new StructType().add("aca", IntegerType.INTEGER))) - .add("array_of_prims", - new ArrayType(IntegerType.INTEGER, true)); - - readAndVerify(readSchema, 73 /* readBatchSize */); - } - - @Test - public void readSubsetOfColumnsWithMissingColumnsInFile() - throws Exception { - StructType readSchema = new StructType() - .add("booleanType", BooleanType.BOOLEAN) - .add("integerType", IntegerType.INTEGER) - .add("missing_column_struct", - new StructType().add("ab", IntegerType.INTEGER)) - .add("longType", LongType.LONG) - .add("missing_column_primitive", DateType.DATE) - .add("nested_struct", - new StructType() - .add("aa", StringType.STRING) - .add("ac", new StructType().add("aca", IntegerType.INTEGER)) - ); - - readAndVerify(readSchema, 23 /* readBatchSize */); - } - - @Test - public void requestRowIndices() throws IOException { - String path = DefaultKernelTestUtils.getTestResourceFilePath("parquet-basic-row-indexes"); - File dir = new File(URI.create(path).getPath()); - List parquetFiles = Arrays.stream(Objects.requireNonNull(dir.listFiles())) - .filter(file -> file.getName().endsWith(".parquet")) - .map(File::getAbsolutePath) - .collect(Collectors.toList()); - - StructType readSchema = new StructType() - .add("id", LongType.LONG) - .add(StructField.METADATA_ROW_INDEX_COLUMN); - - Configuration conf = new Configuration(); - // Set the batch size small enough so there will be multiple batches - conf.setInt("delta.kernel.default.parquet.reader.batch-size", 2); - ParquetBatchReader reader = new ParquetBatchReader(conf); - - for (String filePath : parquetFiles) { - try (CloseableIterator iter = reader.read(filePath, readSchema)) { - while (iter.hasNext()) { - ColumnarBatch batch = iter.next(); - for (int i = 0; i < batch.getSize(); i++) { - long id = batch.getColumnVector(0).getLong(i); - long rowIndex = batch.getColumnVector(1).getLong(i); - assertEquals(id % 10, rowIndex); - } - } - } - } - - // File with multiple row-groups [0, 20000) where rowIndex = id - String filePath = DefaultKernelTestUtils.getTestResourceFilePath( - "parquet/row_index_multiple_row_groups.parquet"); - reader = new ParquetBatchReader(new Configuration()); - try (CloseableIterator iter = reader.read(filePath, readSchema)) { - while (iter.hasNext()) { - ColumnarBatch batch = iter.next(); - for (int i = 0; i < batch.getSize(); i++) { - long id = batch.getColumnVector(0).getLong(i); - long rowIndex = batch.getColumnVector(1).getLong(i); - assertEquals(id, rowIndex); - } - } - } - } - - private static Configuration newConf(Optional batchSize) { - Configuration conf = new Configuration(); - if (batchSize.isPresent()) { - conf.set("delta.kernel.default.parquet.reader.batch-size", batchSize.get().toString()); - } - return conf; - } - - private static void readAndVerify(StructType readSchema, int readBatchSize) - throws Exception { - ParquetBatchReader batchReader = - new ParquetBatchReader(newConf(Optional.of(readBatchSize))); - List batches = - readAsBatches(batchReader, ALL_TYPES_FILE, readSchema); - - for (int rowId = 0; rowId < 200; rowId++) { - verifyRowFromAllTypesFile(readSchema, batches, rowId); - } - } - - private static List readAsBatches( - ParquetBatchReader parquetReader, - String path, - StructType readSchema) throws Exception { - List batches = new ArrayList<>(); - try (CloseableIterator dataIter = parquetReader.read(path, readSchema)) { - while (dataIter.hasNext()) { - batches.add(dataIter.next()); - } - } - return batches; - } - - private static void verifyRowFromAllTypesFile( - StructType readSchema, - List batches, - int rowId) { - Tuple2 batchWithIdx = getBatchForRowId(batches, rowId); - int ordinal = 0; - for (StructField structField : readSchema.fields()) { - String name = structField.getName().toLowerCase(); - ColumnVector vector = batchWithIdx._1.getColumnVector(ordinal); - switch (name) { - case "booleantype": { - Boolean expValue = (rowId % 87 != 0) ? rowId % 2 == 0 : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.booleanValue(), vector.getBoolean(batchWithIdx._2)); - } - break; - } - case "bytetype": { - Byte expValue = (rowId % 72 != 0) ? (byte) rowId : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.byteValue(), vector.getByte(batchWithIdx._2)); - } - break; - } - case "shorttype": { - Short expValue = (rowId % 56 != 0) ? (short) rowId : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.shortValue(), vector.getShort(batchWithIdx._2)); - } - break; - } - case "datetype": { - Integer expValue = (rowId % 61 != 0) ? - (int) Math.floorDiv( - rowId * 20000000L, - DefaultKernelUtils.DateTimeConstants.MILLIS_PER_DAY) : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.intValue(), vector.getInt(batchWithIdx._2)); - } - break; - } - case "integertype": { - Integer expValue = (rowId % 23 != 0) ? rowId : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.intValue(), vector.getInt(batchWithIdx._2)); - } - break; - } - case "longtype": { - Long expValue = (rowId % 25 != 0) ? rowId + 1L : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.longValue(), vector.getLong(batchWithIdx._2)); - } - break; - } - case "floattype": { - Float expValue = (rowId % 28 != 0) ? (rowId * 0.234f) : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.floatValue(), vector.getFloat(batchWithIdx._2), 0.02); - } - break; - } - case "doubletype": { - Double expValue = (rowId % 54 != 0) ? (rowId * 234234.23d) : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.doubleValue(), vector.getDouble(batchWithIdx._2), - 0.02); - } - break; - } - case "stringtype": { - String expValue = (rowId % 57 != 0) ? Integer.toString(rowId) : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue, vector.getString(batchWithIdx._2)); - } - break; - } - case "binarytype": { - byte[] expValue = (rowId % 59 != 0) ? Integer.toString(rowId).getBytes() : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertArrayEquals(expValue, vector.getBinary(batchWithIdx._2)); - } - break; - } - case "timestamptype": { - // Tests only for spark.sql.parquet.outputTimestampTyp = INT96, other formats - // are tested in end-to-end tests in DeltaTableReadsSuite - Long expValue = (rowId % 62 != 0) ? 23423523L * rowId * 1000 : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue.longValue(), vector.getLong(batchWithIdx._2)); - } - break; - } - case "decimal": { - BigDecimal expValue = (rowId % 67 != 0) ? - // Value is rounded to scale=2 when written - new BigDecimal(rowId * 123.52).setScale(2, RoundingMode.HALF_UP) : null; - if (expValue == null) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - } else { - assertEquals(expValue, vector.getDecimal(batchWithIdx._2)); - } - break; - } - case "nested_struct": - validateNestedStructColumn(vector, batchWithIdx._2, rowId); - break; - case "array_of_prims": { - boolean expIsNull = rowId % 25 == 0; - if (expIsNull) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - assertNull(vector.getArray(batchWithIdx._2)); - } else if (rowId % 29 == 0) { - checkArrayValue(vector.getArray(batchWithIdx._2), IntegerType.INTEGER, - Collections.emptyList()); - } else { - List expArray = Arrays.asList(rowId, null, rowId + 1); - checkArrayValue(vector.getArray(batchWithIdx._2), IntegerType.INTEGER, - expArray); - } - break; - } - case "array_of_arrays": - validateArrayOfArraysColumn(vector, batchWithIdx._2, rowId); - break; - case "array_of_structs": { - assertFalse(vector.isNullAt(batchWithIdx._2)); - ArrayValue arrayValue = vector.getArray(batchWithIdx._2); - ColumnVector elementVector = arrayValue.getElements(); - assertEquals(2, arrayValue.getSize()); - assertEquals(2, elementVector.getSize()); - assertTrue(elementVector.getDataType() instanceof StructType); - assertEquals(rowId, elementVector.getChild(0).getLong(0)); - assertTrue(elementVector.isNullAt(1)); - break; - } - case "map_of_prims": { - boolean expIsNull = rowId % 28 == 0; - if (expIsNull) { - assertTrue(vector.isNullAt(batchWithIdx._2)); - assertNull(vector.getMap(batchWithIdx._2)); - } else if (rowId % 30 == 0) { - checkMapValue( - vector.getMap(batchWithIdx._2), - IntegerType.INTEGER, - LongType.LONG, - Collections.emptyMap() - ); - } else { - Map expValue = new HashMap() { - { - put(rowId, (rowId % 29 == 0) ? null : (rowId + 2L)); - put((rowId % 27 != 0) ? (rowId + 2) : (rowId + 3), rowId + 9L); - - } - }; - checkMapValue( - vector.getMap(batchWithIdx._2), - IntegerType.INTEGER, - LongType.LONG, - expValue - ); - } - break; - } - case "map_of_rows": { - // Map(i + 1 -> (if (i % 10 == 0) Row((i*20).longValue()) else null)) - assertFalse(vector.isNullAt(batchWithIdx._2)); - MapValue mapValue = vector.getMap(batchWithIdx._2); - Map actValue = VectorUtils.toJavaMap(mapValue); - - // entry 0: key = rowId - Integer key0 = rowId + 1; - boolean expValue0IsNull = rowId % 10 != 0; - Row actValue0 = actValue.get(key0); - if (expValue0IsNull) { - assertNull(actValue0); - } else { - Long actValue0Member = actValue0.getLong(0); - Long expValue0Member = rowId * 20L; - assertEquals(expValue0Member, actValue0Member); - } - break; - } - case "map_of_arrays": - validateMapOfArraysColumn(vector, batchWithIdx._2, rowId); - break; - case "missing_column_primitive": - case "missing_column_struct": { - assertTrue(vector.isNullAt(batchWithIdx._2)); - break; - } - default: - throw new IllegalArgumentException("unknown column: " + name); - } - ordinal++; - } - } - - private static void validateNestedStructColumn( - ColumnVector vector, int batchRowId, int tableRowId) { - boolean expNull = tableRowId % 63 == 0; - if (expNull) { - assertTrue(vector.isNullAt(batchRowId)); - return; - } - - boolean expAaValNull = tableRowId % 19 == 0; - boolean expAcValNull = tableRowId % 19 == 0 || tableRowId % 23 == 0; - final int aaColOrdinal = 0; - final int acColOrdinal = 1; - - assertEquals(vector.getChild(aaColOrdinal).isNullAt(batchRowId), expAaValNull); - assertEquals(vector.getChild(acColOrdinal).isNullAt(batchRowId), expAcValNull); - - if (!expAaValNull) { - String aaVal = vector.getChild(aaColOrdinal).getString(batchRowId); - assertEquals(Integer.toString(tableRowId), aaVal); - } - if (!expAcValNull) { - int actAcaVal = vector.getChild(acColOrdinal).getChild(0).getInt(batchRowId); - assertEquals(tableRowId, actAcaVal); - } - } - - private static void validateArrayOfArraysColumn( - ColumnVector vector, int batchRowId, int tableRowId) { - boolean expIsNull = tableRowId % 8 == 0; - if (expIsNull) { - assertTrue(vector.isNullAt(batchRowId)); - return; - } - - List singleElemArray = Arrays.asList(tableRowId); - List doubleElemArray = Arrays.asList(tableRowId + 10, tableRowId + 20); - List arrayWithNulls = Arrays.asList(null, tableRowId + 200); - List singleElemNullArray = Collections.singletonList(null); - List emptyArray = Collections.emptyList(); - - List> expArray = null; - switch (tableRowId % 7) { - case 0: - expArray = Arrays.asList(singleElemArray, singleElemArray, arrayWithNulls); - break; - case 1: - expArray = Arrays.asList(singleElemArray, doubleElemArray, emptyArray); - break; - case 2: - expArray = Arrays.asList(arrayWithNulls); - break; - case 3: - expArray = Arrays.asList(singleElemNullArray); - break; - case 4: - expArray = Collections.singletonList(null); - break; - case 5: - expArray = Collections.singletonList(emptyArray); - break; - case 6: - expArray = Collections.emptyList(); - break; - } - DataType expDataType = new ArrayType(IntegerType.INTEGER, true); - checkArrayValue(vector.getArray(batchRowId), expDataType, expArray); - } - - private static void validateMapOfArraysColumn( - ColumnVector vector, int batchRowId, int tableRowId) { - boolean expIsNull = tableRowId % 30 == 0; - if (expIsNull) { - assertTrue(vector.isNullAt(batchRowId)); - return; - } - - final List val1; - if (tableRowId % 4 == 0) { - val1 = Arrays.asList(tableRowId, null, tableRowId + 1); - } else { - val1 = Collections.emptyList(); - } - final List val2; - if (tableRowId % 7 == 0) { - val2 = Collections.emptyList(); - } else { - val2 = Collections.singletonList(null); - } - - Map> expMap = Collections.emptyMap(); - if (tableRowId % 24 != 0) { - expMap = new HashMap>() { - { - put((long) tableRowId, val1); - put(tableRowId + 1L, val2); - } - }; - } - checkMapValue( - vector.getMap(batchRowId), - LongType.LONG, - new ArrayType(IntegerType.INTEGER, true), - expMap - ); - } - - private static Tuple2 getBatchForRowId( - List batches, int rowId) { - int indexStart = 0; - for (ColumnarBatch batch : batches) { - if (indexStart <= rowId && rowId < indexStart + batch.getSize()) { - return new Tuple2<>(batch, rowId - indexStart); - } - indexStart += batch.getSize(); - } - - throw new IllegalArgumentException("row id is not found: " + rowId); - } - - private static void checkArrayValue( - ArrayValue arrayValue, DataType expDataType, List expList) { - int size = expList.size(); - ColumnVector elementVector = arrayValue.getElements(); - // Check the size is as expected and arrayValue.getSize == elementVector.getSize - assertEquals(size, arrayValue.getSize()); - assertEquals(size, elementVector.getSize()); - // Check the element vector has the correct data type - assertEquals(elementVector.getDataType(), expDataType); - // Check the elements are correct - assertEquals(expList, VectorUtils.toJavaList(arrayValue)); - assertThrows(IllegalArgumentException.class, - () -> DefaultKernelTestUtils.getValueAsObject(elementVector, size + 1)); - } - - private static void checkMapValue( - MapValue mapValue, DataType keyDataType, DataType valueDataType, Map expMap) { - int size = expMap.size(); - ColumnVector keyVector = mapValue.getKeys(); - ColumnVector valueVector = mapValue.getValues(); - // Check the size mapValue.getSize == keyVector.getSize == valueVector.getSize - assertEquals(size, mapValue.getSize()); - assertEquals(size, keyVector.getSize()); - assertEquals(size, valueVector.getSize()); - // Check the key and value vector has the correct data type - assertEquals(keyVector.getDataType(), keyDataType); - assertEquals(valueVector.getDataType(), valueDataType); - // Check the elements are correct - assertEquals(expMap, VectorUtils.toJavaMap(mapValue)); - assertThrows(IllegalArgumentException.class, - () -> DefaultKernelTestUtils.getValueAsObject(keyVector, size + 1)); - assertThrows(IllegalArgumentException.class, - () -> DefaultKernelTestUtils.getValueAsObject(valueVector, size + 1)); - } -} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ParquetBatchReaderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ParquetBatchReaderSuite.scala deleted file mode 100644 index abfaf127d75..00000000000 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ParquetBatchReaderSuite.scala +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright (2021) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.kernel.defaults - -import java.io.File -import java.math.BigDecimal - -import io.delta.golden.GoldenTableUtils.goldenTableFile -import io.delta.kernel.defaults.internal.parquet.ParquetBatchReader -import io.delta.kernel.defaults.utils.{TestRow, TestUtils} -import io.delta.kernel.types.{DecimalType, IntegerType, StructType} -import org.apache.hadoop.conf.Configuration -import org.scalatest.funsuite.AnyFunSuite - -class ParquetBatchReaderSuite extends AnyFunSuite with TestUtils { - - def getSingleParquetFile(directory: File): String = { - val parquetFiles = directory.listFiles().filter(_.getName.endsWith(".parquet")) - assert(parquetFiles.size == 1) - parquetFiles.head.getAbsolutePath - } - - ////////////////////////////////////////////////////////////////////////////////// - // Decimal type tests - ////////////////////////////////////////////////////////////////////////////////// - - private val DECIMAL_TYPES_DICT_FILE_V1 = getSingleParquetFile( - goldenTableFile("parquet-decimal-dictionaries-v1")) - - private val DECIMAL_TYPES_DICT_FILE_V2 = getSingleParquetFile( - goldenTableFile("parquet-decimal-dictionaries-v2")) - - test("decimals encoded using dictionary encoding ") { - val expectedResult = (0 until 1000000).map { i => - TestRow(i, BigDecimal.valueOf(i%5), BigDecimal.valueOf(i%6), BigDecimal.valueOf(i%2)) - } - - val readSchema = new StructType() - .add("id", IntegerType.INTEGER) - .add("col1", new DecimalType(9, 0)) // INT32: 1 <= precision <= 9 - .add("col2", new DecimalType(12, 0)) // INT64: 10 <= precision <= 18 - .add("col3", new DecimalType(25, 0)) // FIXED_LEN_BYTE_ARRAY - - val batchReader = new ParquetBatchReader(new Configuration()) - for (file <- Seq(DECIMAL_TYPES_DICT_FILE_V1, DECIMAL_TYPES_DICT_FILE_V2)) { - val batches = batchReader.read(file, readSchema) - val result = batches.toSeq.flatMap(_.getRows.toSeq) - checkAnswer(result, expectedResult) - } - } - - private val LARGE_SCALE_DECIMAL_TYPES_FILE = getSingleParquetFile( - goldenTableFile("parquet-decimal-type")) - - test("large scale decimal type file") { - - def expand(n: BigDecimal): BigDecimal = { - n.scaleByPowerOfTen(5).add(n) - } - - val expectedResult = (0 until 99998).map { i => - if (i % 85 == 0) { - val n = BigDecimal.valueOf(i) - TestRow(i, n.movePointLeft(1).setScale(1), n.setScale(5), n.setScale(5)) - } else { - val negation = if (i % 33 == 0) { - -1 - } else { - 1 - } - val n = BigDecimal.valueOf(i*negation) - TestRow( - i, - n.movePointLeft(1), - expand(n).movePointLeft(5), - expand(expand(expand(n))).movePointLeft(5) - ) - } - } - - val readSchema = new StructType() - .add("id", IntegerType.INTEGER) - .add("col1", new DecimalType(5, 1)) // INT32: 1 <= precision <= 9 - .add("col2", new DecimalType(10, 5)) // INT64: 10 <= precision <= 18 - .add("col3", new DecimalType(20, 5)) // FIXED_LEN_BYTE_ARRAY - - val batchReader = new ParquetBatchReader(new Configuration()) - val batches = batchReader.read(LARGE_SCALE_DECIMAL_TYPES_FILE, readSchema) - - val result = batches.toSeq.flatMap(_.getRows.toSeq) - checkAnswer(result, expectedResult) - } - - ////////////////////////////////////////////////////////////////////////////////// - // Timestamp type tests - ////////////////////////////////////////////////////////////////////////////////// - // TODO move over from DeltaTableReadsSuite once there is better testing infra -} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala new file mode 100644 index 00000000000..8b19a8422db --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetFileReaderSuite.scala @@ -0,0 +1,144 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.parquet + +import java.math.BigDecimal + +import io.delta.golden.GoldenTableUtils.goldenTableFile +import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow, VectorTestUtils} +import io.delta.kernel.types._ +import org.scalatest.funsuite.AnyFunSuite + +class ParquetFileReaderSuite extends AnyFunSuite + with ParquetSuiteBase with VectorTestUtils with ExpressionTestUtils { + + test("decimals encoded using dictionary encoding ") { + // Below golden tables contains three decimal columns + // each stored in a different physical format: int32, int64 and fixed binary + val decimalDictFileV1 = goldenTableFile("parquet-decimal-dictionaries-v1").getAbsolutePath + val decimalDictFileV2 = goldenTableFile("parquet-decimal-dictionaries-v2").getAbsolutePath + + val expResult = (0 until 1000000).map { i => + TestRow(i, BigDecimal.valueOf(i%5), BigDecimal.valueOf(i%6), BigDecimal.valueOf(i%2)) + } + + val readSchema = tableSchema(decimalDictFileV1) + + for (file <- Seq(decimalDictFileV1, decimalDictFileV2)) { + val actResult = readParquetFilesUsingKernel(file, readSchema) + + checkAnswer(actResult, expResult) + } + } + + test("large scale decimal type file") { + val largeScaleDecimalTypesFile = goldenTableFile("parquet-decimal-type").getAbsolutePath + + def expand(n: BigDecimal): BigDecimal = { + n.scaleByPowerOfTen(5).add(n) + } + + val expResult = (0 until 99998).map { i => + if (i % 85 == 0) { + val n = BigDecimal.valueOf(i) + TestRow(i, n.movePointLeft(1).setScale(1), n.setScale(5), n.setScale(5)) + } else { + val negation = if (i % 33 == 0) -1 else 1 + val n = BigDecimal.valueOf(i*negation) + TestRow( + i, + n.movePointLeft(1), + expand(n).movePointLeft(5), + expand(expand(expand(n))).movePointLeft(5) + ) + } + } + + val readSchema = tableSchema(largeScaleDecimalTypesFile) + + val actResult = readParquetFilesUsingKernel(largeScaleDecimalTypesFile, readSchema) + + checkAnswer(actResult, expResult) + } + + private val ALL_TYPES_FILE = goldenTableFile("parquet-all-types").getAbsolutePath + + test("read all types of data") { + val readSchema = tableSchema(ALL_TYPES_FILE) + + val actResult = readParquetFilesUsingKernel(ALL_TYPES_FILE, readSchema) + + val expResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) + + checkAnswer(actResult, expResult) + } + + test("read subset of columns") { + val readSchema = new StructType() + .add("byteType", ByteType.BYTE) + .add("booleanType", BooleanType.BOOLEAN) + .add("stringType", StringType.STRING) + .add("dateType", DateType.DATE) + .add("nested_struct", new StructType() + .add("aa", StringType.STRING) + .add("ac", new StructType().add("aca", IntegerType.INTEGER))) + .add("array_of_prims", new ArrayType(IntegerType.INTEGER, true)) + + val actResult = readParquetFilesUsingKernel(ALL_TYPES_FILE, readSchema) + + val expResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) + + checkAnswer(actResult, expResult) + } + + test("read subset of columns with missing columns in file") { + val readSchema = new StructType() + .add("booleanType", BooleanType.BOOLEAN) + .add("integerType", IntegerType.INTEGER) + .add("missing_column_struct", new StructType().add("ab", IntegerType.INTEGER)) + .add("longType", LongType.LONG) + .add("missing_column_primitive", DateType.DATE) + .add("nested_struct", new StructType() + .add("aa", StringType.STRING) + .add("ac", new StructType().add("aca", IntegerType.INTEGER))) + + val actResult = readParquetFilesUsingKernel(ALL_TYPES_FILE, readSchema) + + val expResult = readParquetFilesUsingSpark(ALL_TYPES_FILE, readSchema) + + checkAnswer(actResult, expResult) + } + + test("request row indices") { + val readSchema = new StructType() + .add("id", LongType.LONG) + .add(StructField.METADATA_ROW_INDEX_COLUMN) + + val path = getTestResourceFilePath("parquet-basic-row-indexes") + val actResult1 = readParquetFilesUsingKernel(path, readSchema) + val expResult1 = (0L until 30L) + .map(i => TestRow(i, if (i < 10) i else if (i < 20) i - 10L else i - 20L)) + + checkAnswer(actResult1, expResult1) + + // File with multiple row-groups [0, 20000) where rowIndex = id + val filePath = getTestResourceFilePath("parquet/") + val actResult2 = readParquetFilesUsingKernel(filePath, readSchema) + val expResult2 = (0L until 20000L).map(i => TestRow(i, i)) + + checkAnswer(actResult2, expResult2) + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala index 9cdc0972047..f221aeeb4d7 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala @@ -250,7 +250,7 @@ trait ParquetSuiteBase extends TestUtils { } // Read the parquet files in actionFileDir using Spark Parquet reader - private def readParquetFilesUsingSpark( + def readParquetFilesUsingSpark( actualFileDir: String, readSchema: StructType): Seq[TestRow] = { spark.read .format("parquet")