Skip to content

Commit

Permalink
Build: Bump Apache Parquet 1.14.4 (#11502)
Browse files Browse the repository at this point in the history
* Revert "Revert "Build: Bump parquet from 1.13.1 to 1.14.3 (#11264)" (#11462)"

This reverts commit 7cc16fa.

* Bump to Parquet 1.14.4

* Lookup sizes instead

* Update build.gradle
  • Loading branch information
Fokko authored Nov 20, 2024
1 parent f6d02de commit 657fa86
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Path;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -137,7 +138,7 @@ private Table createPrimitiveTable() throws IOException {
return table;
}

private void createNestedTable() throws IOException {
private Table createNestedTable() throws IOException {
Table table =
validationCatalog.createTable(
TableIdentifier.of(DATABASE, TABLE_NAME),
Expand All @@ -154,6 +155,8 @@ private void createNestedTable() throws IOException {
File testFile = File.createTempFile("junit", null, temp.toFile());
DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(testFile), records);
table.newAppend().appendFile(dataFile).commit();

return table;
}

@BeforeEach
Expand All @@ -168,7 +171,7 @@ public void before() {
@AfterEach
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
super.clean();
}

Expand Down Expand Up @@ -212,32 +215,88 @@ protected Object[] row(Object... values) {

@TestTemplate
public void testPrimitiveColumns() throws Exception {
createPrimitiveTable();
Table table = createPrimitiveTable();
List<Row> result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME);

// With new releases of Parquet, new features might be added which cause the
// size of the column to increase. For example, with Parquet 1.14.x the
// uncompressed size has been added to allow for better allocation of memory upfront.
// Therefore, we look the sizes up, rather than hardcoding them
DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
Map<Integer, Long> columnSizeStats = dataFile.columnSizes();

Row binaryCol =
Row.of(
52L,
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("binaryCol").fieldId()),
4L,
2L,
null,
Base64.getDecoder().decode("1111"),
Base64.getDecoder().decode("2222"));
Row booleanCol = Row.of(32L, 4L, 0L, null, false, true);
Row decimalCol = Row.of(85L, 4L, 1L, null, new BigDecimal("1.00"), new BigDecimal("2.00"));
Row doubleCol = Row.of(85L, 4L, 0L, 1L, 1.0D, 2.0D);
Row booleanCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("booleanCol").fieldId()),
4L,
0L,
null,
false,
true);
Row decimalCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("decimalCol").fieldId()),
4L,
1L,
null,
new BigDecimal("1.00"),
new BigDecimal("2.00"));
Row doubleCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("doubleCol").fieldId()),
4L,
0L,
1L,
1.0D,
2.0D);
Row fixedCol =
Row.of(
44L,
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("fixedCol").fieldId()),
4L,
2L,
null,
Base64.getDecoder().decode("1111"),
Base64.getDecoder().decode("2222"));
Row floatCol = Row.of(71L, 4L, 0L, 2L, 0f, 0f);
Row intCol = Row.of(71L, 4L, 0L, null, 1, 2);
Row longCol = Row.of(79L, 4L, 0L, null, 1L, 2L);
Row stringCol = Row.of(79L, 4L, 0L, null, "1", "2");
Row floatCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("floatCol").fieldId()),
4L,
0L,
2L,
0f,
0f);
Row intCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("intCol").fieldId()),
4L,
0L,
null,
1,
2);
Row longCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("longCol").fieldId()),
4L,
0L,
null,
1L,
2L);
Row stringCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("stringCol").fieldId()),
4L,
0L,
null,
"1",
"2");

List<Row> expected =
Lists.newArrayList(
Expand Down Expand Up @@ -288,12 +347,18 @@ public void testSelectNestedValues() throws Exception {
@TestTemplate
public void testNestedValues() throws Exception {
createNestedTable();
List<Row> result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME);

// We have to take a slightly different approach, since we don't store
// the column sizes for nested fields.
long leafDoubleColSize =
(long) ((Row) ((Row) result.get(0).getField(0)).getField(0)).getField(0);
long leafLongColSize = (long) ((Row) ((Row) result.get(0).getField(0)).getField(1)).getField(0);

Row leafDoubleCol = Row.of(46L, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafLongCol = Row.of(54L, 3L, 1L, null, 0L, 1L);
Row leafDoubleCol = Row.of(leafDoubleColSize, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafLongCol = Row.of(leafLongColSize, 3L, 1L, null, 0L, 1L);
Row metrics = Row.of(Row.of(leafDoubleCol, leafLongCol));

TestHelpers.assertRows(
sql("SELECT readable_metrics FROM %s$files", TABLE_NAME), ImmutableList.of(metrics));
TestHelpers.assertRows(result, ImmutableList.of(metrics));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Path;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -137,7 +138,7 @@ private Table createPrimitiveTable() throws IOException {
return table;
}

private void createNestedTable() throws IOException {
private Table createNestedTable() throws IOException {
Table table =
validationCatalog.createTable(
TableIdentifier.of(DATABASE, TABLE_NAME),
Expand All @@ -154,6 +155,8 @@ private void createNestedTable() throws IOException {
File testFile = File.createTempFile("junit", null, temp.toFile());
DataFile dataFile = FileHelpers.writeDataFile(table, Files.localOutput(testFile), records);
table.newAppend().appendFile(dataFile).commit();

return table;
}

@BeforeEach
Expand Down Expand Up @@ -212,32 +215,88 @@ protected Object[] row(Object... values) {

@TestTemplate
public void testPrimitiveColumns() throws Exception {
createPrimitiveTable();
Table table = createPrimitiveTable();
List<Row> result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME);

// With new releases of Parquet, new features might be added which cause the
// size of the column to increase. For example, with Parquet 1.14.x the
// uncompressed size has been added to allow for better allocation of memory upfront.
// Therefore, we look the sizes up, rather than hardcoding them
DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
Map<Integer, Long> columnSizeStats = dataFile.columnSizes();

Row binaryCol =
Row.of(
52L,
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("binaryCol").fieldId()),
4L,
2L,
null,
Base64.getDecoder().decode("1111"),
Base64.getDecoder().decode("2222"));
Row booleanCol = Row.of(32L, 4L, 0L, null, false, true);
Row decimalCol = Row.of(85L, 4L, 1L, null, new BigDecimal("1.00"), new BigDecimal("2.00"));
Row doubleCol = Row.of(85L, 4L, 0L, 1L, 1.0D, 2.0D);
Row booleanCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("booleanCol").fieldId()),
4L,
0L,
null,
false,
true);
Row decimalCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("decimalCol").fieldId()),
4L,
1L,
null,
new BigDecimal("1.00"),
new BigDecimal("2.00"));
Row doubleCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("doubleCol").fieldId()),
4L,
0L,
1L,
1.0D,
2.0D);
Row fixedCol =
Row.of(
44L,
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("fixedCol").fieldId()),
4L,
2L,
null,
Base64.getDecoder().decode("1111"),
Base64.getDecoder().decode("2222"));
Row floatCol = Row.of(71L, 4L, 0L, 2L, 0f, 0f);
Row intCol = Row.of(71L, 4L, 0L, null, 1, 2);
Row longCol = Row.of(79L, 4L, 0L, null, 1L, 2L);
Row stringCol = Row.of(79L, 4L, 0L, null, "1", "2");
Row floatCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("floatCol").fieldId()),
4L,
0L,
2L,
0f,
0f);
Row intCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("intCol").fieldId()),
4L,
0L,
null,
1,
2);
Row longCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("longCol").fieldId()),
4L,
0L,
null,
1L,
2L);
Row stringCol =
Row.of(
columnSizeStats.get(PRIMITIVE_SCHEMA.findField("stringCol").fieldId()),
4L,
0L,
null,
"1",
"2");

List<Row> expected =
Lists.newArrayList(
Expand Down Expand Up @@ -288,12 +347,18 @@ public void testSelectNestedValues() throws Exception {
@TestTemplate
public void testNestedValues() throws Exception {
createNestedTable();
List<Row> result = sql("SELECT readable_metrics FROM %s$files", TABLE_NAME);

// We have to take a slightly different approach, since we don't store
// the column sizes for nested fields.
long leafDoubleColSize =
(long) ((Row) ((Row) result.get(0).getField(0)).getField(0)).getField(0);
long leafLongColSize = (long) ((Row) ((Row) result.get(0).getField(0)).getField(1)).getField(0);

Row leafDoubleCol = Row.of(46L, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafLongCol = Row.of(54L, 3L, 1L, null, 0L, 1L);
Row leafDoubleCol = Row.of(leafDoubleColSize, 3L, 1L, 1L, 0.0D, 0.0D);
Row leafLongCol = Row.of(leafLongColSize, 3L, 1L, null, 0L, 1L);
Row metrics = Row.of(Row.of(leafDoubleCol, leafLongCol));

TestHelpers.assertRows(
sql("SELECT readable_metrics FROM %s$files", TABLE_NAME), ImmutableList.of(metrics));
TestHelpers.assertRows(result, ImmutableList.of(metrics));
}
}
Loading

0 comments on commit 657fa86

Please sign in to comment.