From 657fa86b4928de23bf01ceda0f6f6112bc19403c Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 20 Nov 2024 09:35:34 +0100 Subject: [PATCH] Build: Bump Apache Parquet 1.14.4 (#11502) * Revert "Revert "Build: Bump parquet from 1.13.1 to 1.14.3 (#11264)" (#11462)" This reverts commit 7cc16fa94d7cd4e19397e9b4fba62185e0fa5eac. * Bump to Parquet 1.14.4 * Lookup sizes instead * Update build.gradle --- .../TestMetadataTableReadableMetrics.java | 97 ++++++++++++++++--- .../TestMetadataTableReadableMetrics.java | 95 +++++++++++++++--- .../TestMetadataTableReadableMetrics.java | 95 +++++++++++++++--- gradle/libs.versions.toml | 2 +- 4 files changed, 242 insertions(+), 47 deletions(-) diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java index 40dfda723749..488969bab045 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java @@ -28,6 +28,7 @@ import java.nio.file.Path; import java.util.Base64; import java.util.List; +import java.util.Map; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.TableEnvironment; @@ -137,7 +138,7 @@ private Table createPrimitiveTable() throws IOException { return table; } - private void createNestedTable() throws IOException { + private Table createNestedTable() throws IOException { Table table = validationCatalog.createTable( TableIdentifier.of(DATABASE, TABLE_NAME), @@ -154,6 +155,8 @@ private void createNestedTable() throws IOException { File testFile = File.createTempFile("junit", null, temp.toFile()); DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(testFile), records); table.newAppend().appendFile(dataFile).commit(); + + return table; } @BeforeEach @@ -168,7 +171,7 @@ public void before() { @AfterEach public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + dropDatabase(flinkDatabase, true); super.clean(); } @@ -212,32 +215,88 @@ protected Object[] row(Object... values) { @TestTemplate public void testPrimitiveColumns() throws Exception { - createPrimitiveTable(); + Table table = createPrimitiveTable(); List result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME); + // With new releases of Parquet, new features might be added which cause the + // size of the column to increase. For example, with Parquet 1.14.x the + // uncompressed size has been added to allow for better allocation of memory upfront. + // Therefore, we look the sizes up, rather than hardcoding them + DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next(); + Map columnSizeStats = dataFile.columnSizes(); + Row binaryCol = Row.of( - 52L, + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("binaryCol").fieldId()), 4L, 2L, null, Base64.getDecoder().decode("1111"), Base64.getDecoder().decode("2222")); - Row booleanCol = Row.of(32L, 4L, 0L, null, false, true); - Row decimalCol = Row.of(85L, 4L, 1L, null, new BigDecimal("1.00"), new BigDecimal("2.00")); - Row doubleCol = Row.of(85L, 4L, 0L, 1L, 1.0D, 2.0D); + Row booleanCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("booleanCol").fieldId()), + 4L, + 0L, + null, + false, + true); + Row decimalCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("decimalCol").fieldId()), + 4L, + 1L, + null, + new BigDecimal("1.00"), + new BigDecimal("2.00")); + Row doubleCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("doubleCol").fieldId()), + 4L, + 0L, + 1L, + 1.0D, + 2.0D); Row fixedCol = Row.of( - 44L, + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("fixedCol").fieldId()), 4L, 2L, null, Base64.getDecoder().decode("1111"), Base64.getDecoder().decode("2222")); - Row floatCol = Row.of(71L, 4L, 0L, 2L, 0f, 0f); - Row intCol = Row.of(71L, 4L, 0L, null, 1, 2); - Row longCol = Row.of(79L, 4L, 0L, null, 1L, 2L); - Row stringCol = Row.of(79L, 4L, 0L, null, "1", "2"); + Row floatCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("floatCol").fieldId()), + 4L, + 0L, + 2L, + 0f, + 0f); + Row intCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("intCol").fieldId()), + 4L, + 0L, + null, + 1, + 2); + Row longCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("longCol").fieldId()), + 4L, + 0L, + null, + 1L, + 2L); + Row stringCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("stringCol").fieldId()), + 4L, + 0L, + null, + "1", + "2"); List expected = Lists.newArrayList( @@ -288,12 +347,18 @@ public void testSelectNestedValues() throws Exception { @TestTemplate public void testNestedValues() throws Exception { createNestedTable(); + List result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME); + + // We have to take a slightly different approach, since we don't store + // the column sizes for nested fields. + long leafDoubleColSize = + (long) ((Row) ((Row) result.get(0).getField(0)).getField(0)).getField(0); + long leafLongColSize = (long) ((Row) ((Row) result.get(0).getField(0)).getField(1)).getField(0); - Row leafDoubleCol = Row.of(46L, 3L, 1L, 1L, 0.0D, 0.0D); - Row leafLongCol = Row.of(54L, 3L, 1L, null, 0L, 1L); + Row leafDoubleCol = Row.of(leafDoubleColSize, 3L, 1L, 1L, 0.0D, 0.0D); + Row leafLongCol = Row.of(leafLongColSize, 3L, 1L, null, 0L, 1L); Row metrics = Row.of(Row.of(leafDoubleCol, leafLongCol)); - TestHelpers.assertRows( - sql("SELECT readable_metrics FROM %s$files", TABLE_NAME), ImmutableList.of(metrics)); + TestHelpers.assertRows(result, ImmutableList.of(metrics)); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java index 9cf953342a18..488969bab045 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java @@ -28,6 +28,7 @@ import java.nio.file.Path; import java.util.Base64; import java.util.List; +import java.util.Map; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.TableEnvironment; @@ -137,7 +138,7 @@ private Table createPrimitiveTable() throws IOException { return table; } - private void createNestedTable() throws IOException { + private Table createNestedTable() throws IOException { Table table = validationCatalog.createTable( TableIdentifier.of(DATABASE, TABLE_NAME), @@ -154,6 +155,8 @@ private void createNestedTable() throws IOException { File testFile = File.createTempFile("junit", null, temp.toFile()); DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(testFile), records); table.newAppend().appendFile(dataFile).commit(); + + return table; } @BeforeEach @@ -212,32 +215,88 @@ protected Object[] row(Object... values) { @TestTemplate public void testPrimitiveColumns() throws Exception { - createPrimitiveTable(); + Table table = createPrimitiveTable(); List result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME); + // With new releases of Parquet, new features might be added which cause the + // size of the column to increase. For example, with Parquet 1.14.x the + // uncompressed size has been added to allow for better allocation of memory upfront. + // Therefore, we look the sizes up, rather than hardcoding them + DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next(); + Map columnSizeStats = dataFile.columnSizes(); + Row binaryCol = Row.of( - 52L, + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("binaryCol").fieldId()), 4L, 2L, null, Base64.getDecoder().decode("1111"), Base64.getDecoder().decode("2222")); - Row booleanCol = Row.of(32L, 4L, 0L, null, false, true); - Row decimalCol = Row.of(85L, 4L, 1L, null, new BigDecimal("1.00"), new BigDecimal("2.00")); - Row doubleCol = Row.of(85L, 4L, 0L, 1L, 1.0D, 2.0D); + Row booleanCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("booleanCol").fieldId()), + 4L, + 0L, + null, + false, + true); + Row decimalCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("decimalCol").fieldId()), + 4L, + 1L, + null, + new BigDecimal("1.00"), + new BigDecimal("2.00")); + Row doubleCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("doubleCol").fieldId()), + 4L, + 0L, + 1L, + 1.0D, + 2.0D); Row fixedCol = Row.of( - 44L, + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("fixedCol").fieldId()), 4L, 2L, null, Base64.getDecoder().decode("1111"), Base64.getDecoder().decode("2222")); - Row floatCol = Row.of(71L, 4L, 0L, 2L, 0f, 0f); - Row intCol = Row.of(71L, 4L, 0L, null, 1, 2); - Row longCol = Row.of(79L, 4L, 0L, null, 1L, 2L); - Row stringCol = Row.of(79L, 4L, 0L, null, "1", "2"); + Row floatCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("floatCol").fieldId()), + 4L, + 0L, + 2L, + 0f, + 0f); + Row intCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("intCol").fieldId()), + 4L, + 0L, + null, + 1, + 2); + Row longCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("longCol").fieldId()), + 4L, + 0L, + null, + 1L, + 2L); + Row stringCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("stringCol").fieldId()), + 4L, + 0L, + null, + "1", + "2"); List expected = Lists.newArrayList( @@ -288,12 +347,18 @@ public void testSelectNestedValues() throws Exception { @TestTemplate public void testNestedValues() throws Exception { createNestedTable(); + List result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME); + + // We have to take a slightly different approach, since we don't store + // the column sizes for nested fields. + long leafDoubleColSize = + (long) ((Row) ((Row) result.get(0).getField(0)).getField(0)).getField(0); + long leafLongColSize = (long) ((Row) ((Row) result.get(0).getField(0)).getField(1)).getField(0); - Row leafDoubleCol = Row.of(46L, 3L, 1L, 1L, 0.0D, 0.0D); - Row leafLongCol = Row.of(54L, 3L, 1L, null, 0L, 1L); + Row leafDoubleCol = Row.of(leafDoubleColSize, 3L, 1L, 1L, 0.0D, 0.0D); + Row leafLongCol = Row.of(leafLongColSize, 3L, 1L, null, 0L, 1L); Row metrics = Row.of(Row.of(leafDoubleCol, leafLongCol)); - TestHelpers.assertRows( - sql("SELECT readable_metrics FROM %s$files", TABLE_NAME), ImmutableList.of(metrics)); + TestHelpers.assertRows(result, ImmutableList.of(metrics)); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java index 9cf953342a18..488969bab045 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java @@ -28,6 +28,7 @@ import java.nio.file.Path; import java.util.Base64; import java.util.List; +import java.util.Map; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.TableEnvironment; @@ -137,7 +138,7 @@ private Table createPrimitiveTable() throws IOException { return table; } - private void createNestedTable() throws IOException { + private Table createNestedTable() throws IOException { Table table = validationCatalog.createTable( TableIdentifier.of(DATABASE, TABLE_NAME), @@ -154,6 +155,8 @@ private void createNestedTable() throws IOException { File testFile = File.createTempFile("junit", null, temp.toFile()); DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(testFile), records); table.newAppend().appendFile(dataFile).commit(); + + return table; } @BeforeEach @@ -212,32 +215,88 @@ protected Object[] row(Object... values) { @TestTemplate public void testPrimitiveColumns() throws Exception { - createPrimitiveTable(); + Table table = createPrimitiveTable(); List result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME); + // With new releases of Parquet, new features might be added which cause the + // size of the column to increase. For example, with Parquet 1.14.x the + // uncompressed size has been added to allow for better allocation of memory upfront. + // Therefore, we look the sizes up, rather than hardcoding them + DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next(); + Map columnSizeStats = dataFile.columnSizes(); + Row binaryCol = Row.of( - 52L, + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("binaryCol").fieldId()), 4L, 2L, null, Base64.getDecoder().decode("1111"), Base64.getDecoder().decode("2222")); - Row booleanCol = Row.of(32L, 4L, 0L, null, false, true); - Row decimalCol = Row.of(85L, 4L, 1L, null, new BigDecimal("1.00"), new BigDecimal("2.00")); - Row doubleCol = Row.of(85L, 4L, 0L, 1L, 1.0D, 2.0D); + Row booleanCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("booleanCol").fieldId()), + 4L, + 0L, + null, + false, + true); + Row decimalCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("decimalCol").fieldId()), + 4L, + 1L, + null, + new BigDecimal("1.00"), + new BigDecimal("2.00")); + Row doubleCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("doubleCol").fieldId()), + 4L, + 0L, + 1L, + 1.0D, + 2.0D); Row fixedCol = Row.of( - 44L, + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("fixedCol").fieldId()), 4L, 2L, null, Base64.getDecoder().decode("1111"), Base64.getDecoder().decode("2222")); - Row floatCol = Row.of(71L, 4L, 0L, 2L, 0f, 0f); - Row intCol = Row.of(71L, 4L, 0L, null, 1, 2); - Row longCol = Row.of(79L, 4L, 0L, null, 1L, 2L); - Row stringCol = Row.of(79L, 4L, 0L, null, "1", "2"); + Row floatCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("floatCol").fieldId()), + 4L, + 0L, + 2L, + 0f, + 0f); + Row intCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("intCol").fieldId()), + 4L, + 0L, + null, + 1, + 2); + Row longCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("longCol").fieldId()), + 4L, + 0L, + null, + 1L, + 2L); + Row stringCol = + Row.of( + columnSizeStats.get(PRIMITIVE_SCHEMA.findField("stringCol").fieldId()), + 4L, + 0L, + null, + "1", + "2"); List expected = Lists.newArrayList( @@ -288,12 +347,18 @@ public void testSelectNestedValues() throws Exception { @TestTemplate public void testNestedValues() throws Exception { createNestedTable(); + List result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME); + + // We have to take a slightly different approach, since we don't store + // the column sizes for nested fields. + long leafDoubleColSize = + (long) ((Row) ((Row) result.get(0).getField(0)).getField(0)).getField(0); + long leafLongColSize = (long) ((Row) ((Row) result.get(0).getField(0)).getField(1)).getField(0); - Row leafDoubleCol = Row.of(46L, 3L, 1L, 1L, 0.0D, 0.0D); - Row leafLongCol = Row.of(54L, 3L, 1L, null, 0L, 1L); + Row leafDoubleCol = Row.of(leafDoubleColSize, 3L, 1L, 1L, 0.0D, 0.0D); + Row leafLongCol = Row.of(leafLongColSize, 3L, 1L, null, 0L, 1L); Row metrics = Row.of(Row.of(leafDoubleCol, leafLongCol)); - TestHelpers.assertRows( - sql("SELECT readable_metrics FROM %s$files", TABLE_NAME), ImmutableList.of(metrics)); + TestHelpers.assertRows(result, ImmutableList.of(metrics)); } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 8af0d6ec6ab2..7358d0b8547a 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -75,7 +75,7 @@ netty-buffer = "4.1.115.Final" netty-buffer-compat = "4.1.115.Final" object-client-bundle = "3.3.2" orc = "1.9.5" -parquet = "1.13.1" +parquet = "1.14.4" roaringbitmap = "1.3.0" scala-collection-compat = "2.12.0" slf4j = "2.0.16"