From c88a9ed71033c31ca292ba9362adb2b4a7171597 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 7 Aug 2024 17:41:14 -0400 Subject: [PATCH 01/15] support writing partitioned data --- .../beam/sdk/io/iceberg/RecordWriter.java | 4 + .../beam/sdk/io/iceberg/IcebergIOIT.java | 272 ++++++++++-------- 2 files changed, 152 insertions(+), 124 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index d7212783d1b..01816855bae 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -26,6 +26,7 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.PartitionKey; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.catalog.Catalog; @@ -52,6 +53,7 @@ class RecordWriter { this.table = table; this.absoluteFilename = table.location() + "/" + filename; OutputFile outputFile = table.io().newOutputFile(absoluteFilename); + PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); switch (fileFormat) { case AVRO: @@ -60,6 +62,7 @@ class RecordWriter { .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create) .schema(table.schema()) .withSpec(table.spec()) + .withPartition(partitionKey) .overwrite() .build(); break; @@ -69,6 +72,7 @@ class RecordWriter { .createWriterFunc(GenericParquetWriter::buildWriter) .schema(table.schema()) .withSpec(table.spec()) + .withPartition(partitionKey) .overwrite() .build(); break; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 3a169eeb40d..a68689af4e9 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -22,21 +22,18 @@ import java.io.IOException; import java.io.Serializable; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; -import java.util.stream.IntStream; +import java.util.stream.LongStream; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.managed.Managed; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -45,12 +42,11 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.Schema; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.data.parquet.GenericParquetWriter; @@ -72,26 +68,74 @@ /** Integration tests for {@link IcebergIO} source and sink. */ @RunWith(JUnit4.class) public class IcebergIOIT implements Serializable { + private static final org.apache.beam.sdk.schemas.Schema DOUBLY_NESTED_ROW_SCHEMA = + org.apache.beam.sdk.schemas.Schema.builder() + .addStringField("doubly_nested_str") + .addInt64Field("doubly_nested_float") + .build(); - public interface IcebergIOTestPipelineOptions extends GcpOptions { - @Description("Number of records that will be written and/or read by the test") - @Default.Integer(1000) - Integer getNumRecords(); - - void setNumRecords(Integer numRecords); - - @Description("Number of shards in the test table") - @Default.Integer(10) - Integer getNumShards(); - - void setNumShards(Integer numShards); - } + private static final org.apache.beam.sdk.schemas.Schema NESTED_ROW_SCHEMA = + org.apache.beam.sdk.schemas.Schema.builder() + .addStringField("nested_str") + .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA) + .addInt32Field("nested_int") + .addFloatField("nested_float") + .build(); + private static final org.apache.beam.sdk.schemas.Schema BEAM_SCHEMA = + org.apache.beam.sdk.schemas.Schema.builder() + .addStringField("str") + .addBooleanField("bool") + .addInt32Field("int") + .addRowField("row", NESTED_ROW_SCHEMA) + .addArrayField("arr_long", org.apache.beam.sdk.schemas.Schema.FieldType.INT64) + .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA) + .addNullableInt64Field("nullable_long") + .build(); - @Rule public TestPipeline writePipeline = TestPipeline.create(); + private static final SimpleFunction ROW_FUNC = + new SimpleFunction() { + @Override + public Row apply(Long num) { + String strNum = Long.toString(num); + Row nestedRow = + Row.withSchema(NESTED_ROW_SCHEMA) + .addValue("nested_str_value_" + strNum) + .addValue( + Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA) + .addValue("doubly_nested_str_value_" + strNum) + .addValue(num) + .build()) + .addValue(Integer.valueOf(strNum)) + .addValue(Float.valueOf(strNum + "." + strNum)) + .build(); + + return Row.withSchema(BEAM_SCHEMA) + .addValue("str_value_" + strNum) + .addValue(num % 2 == 0) + .addValue(Integer.valueOf(strNum)) + .addValue(nestedRow) + .addValue(LongStream.range(0, num % 10).boxed().collect(Collectors.toList())) + .addValue(num % 2 == 0 ? null : nestedRow) + .addValue(num) + .build(); + } + }; + + private static final org.apache.iceberg.Schema ICEBERG_SCHEMA = + IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA); + private static final SimpleFunction RECORD_FUNC = + new SimpleFunction() { + @Override + public Record apply(Row input) { + return IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, input); + } + }; + private static final Integer NUM_RECORDS = 1000; + private static final Integer NUM_SHARDS = 10; - @Rule public TestPipeline readPipeline = TestPipeline.create(); + @Rule public TestPipeline pipeline = TestPipeline.create(); - static IcebergIOTestPipelineOptions options; + static GcpOptions options; static Configuration catalogHadoopConf; @@ -100,11 +144,11 @@ public interface IcebergIOTestPipelineOptions extends GcpOptions { private String warehouseLocation; private TableIdentifier tableId; + private Catalog catalog; @BeforeClass public static void beforeClass() { - PipelineOptionsFactory.register(IcebergIOTestPipelineOptions.class); - options = TestPipeline.testingPipelineOptions().as(IcebergIOTestPipelineOptions.class); + options = TestPipeline.testingPipelineOptions().as(GcpOptions.class); catalogHadoopConf = new Configuration(); catalogHadoopConf.set("fs.gs.project.id", options.getProject()); @@ -121,46 +165,18 @@ public void setUp() { tableId = TableIdentifier.of( testName.getMethodName(), "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation); } - static final org.apache.beam.sdk.schemas.Schema BEAM_SCHEMA = - org.apache.beam.sdk.schemas.Schema.builder() - .addInt32Field("int") - .addFloatField("float") - .addDoubleField("double") - .addInt64Field("long") - .addStringField("str") - .addBooleanField("bool") - .addByteArrayField("bytes") - .build(); - - static final Schema ICEBERG_SCHEMA = IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA); - - Map getValues(int num) { - String strNum = Integer.toString(num); - return ImmutableMap.builder() - .put("int", num) - .put("float", Float.valueOf(strNum)) - .put("double", Double.valueOf(strNum)) - .put("long", Long.valueOf(strNum)) - .put("str", strNum) - .put("bool", num % 2 == 0) - .put("bytes", ByteBuffer.wrap(new byte[] {(byte) num})) - .build(); - } - - /** - * Populates the Iceberg table according to the configuration specified in {@link - * IcebergIOTestPipelineOptions}. Returns a {@link List} of expected elements. - */ - List populateTable(Table table) throws IOException { - double recordsPerShardFraction = options.getNumRecords().doubleValue() / options.getNumShards(); + /** Populates the Iceberg table and Returns a {@link List} of expected elements. */ + private List populateTable(Table table) throws IOException { + double recordsPerShardFraction = NUM_RECORDS.doubleValue() / NUM_SHARDS; long maxRecordsPerShard = Math.round(Math.ceil(recordsPerShardFraction)); AppendFiles appendFiles = table.newAppend(); - List expectedRows = new ArrayList<>(options.getNumRecords()); + List expectedRows = new ArrayList<>(NUM_RECORDS); int totalRecords = 0; - for (int shardNum = 0; shardNum < options.getNumShards(); ++shardNum) { + for (int shardNum = 0; shardNum < NUM_SHARDS; ++shardNum) { String filepath = table.location() + "/" + UUID.randomUUID(); OutputFile file = table.io().newOutputFile(filepath); DataWriter writer = @@ -172,14 +188,14 @@ List populateTable(Table table) throws IOException { .build(); for (int recordNum = 0; - recordNum < maxRecordsPerShard && totalRecords < options.getNumRecords(); + recordNum < maxRecordsPerShard && totalRecords < NUM_RECORDS; ++recordNum, ++totalRecords) { - Map values = getValues(recordNum); - GenericRecord rec = GenericRecord.create(ICEBERG_SCHEMA).copy(values); - writer.write(rec); + Row expectedBeamRow = ROW_FUNC.apply((long) recordNum); + Record icebergRecord = RECORD_FUNC.apply(expectedBeamRow); - expectedRows.add(Row.withSchema(BEAM_SCHEMA).withFieldValues(values).build()); + writer.write(icebergRecord); + expectedRows.add(expectedBeamRow); } writer.close(); appendFiles.appendFile(writer.toDataFile()); @@ -189,6 +205,43 @@ List populateTable(Table table) throws IOException { return expectedRows; } + private List readRecords(Table table) { + TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA); + List writtenRecords = new ArrayList<>(); + for (CombinedScanTask task : tableScan.planTasks()) { + InputFilesDecryptor decryptor = new InputFilesDecryptor(task, table.io(), table.encryption()); + for (FileScanTask fileTask : task.files()) { + InputFile inputFile = decryptor.getInputFile(fileTask); + CloseableIterable iterable = + Parquet.read(inputFile) + .split(fileTask.start(), fileTask.length()) + .project(ICEBERG_SCHEMA) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(ICEBERG_SCHEMA, fileSchema)) + .filter(fileTask.residual()) + .build(); + + for (Record rec : iterable) { + writtenRecords.add(rec); + } + } + } + return writtenRecords; + } + + private Map managedIcebergConfig() { + return ImmutableMap.builder() + .put("table", tableId.toString()) + .put("catalog_name", "test-name") + .put( + "catalog_properties", + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouseLocation) + .build()) + .build(); + } + /** * Test of a predetermined moderate number of records written directly to Iceberg then read via a * Beam pipeline. Table initialization is done on a single process using the Iceberg APIs so the @@ -196,90 +249,61 @@ List populateTable(Table table) throws IOException { */ @Test public void testRead() throws Exception { - Catalog catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation); Table table = catalog.createTable(tableId, ICEBERG_SCHEMA); List expectedRows = populateTable(table); - Map config = - ImmutableMap.builder() - .put("table", tableId.toString()) - .put("catalog_name", "test-name") - .put( - "catalog_properties", - ImmutableMap.builder() - .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .put("warehouse", warehouseLocation) - .build()) - .build(); + Map config = managedIcebergConfig(); PCollection rows = - readPipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); + pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection(); PAssert.that(rows).containsInAnyOrder(expectedRows); - readPipeline.run().waitUntilFinish(); + pipeline.run().waitUntilFinish(); } + private static final List INPUT_ROWS = + LongStream.range(0, NUM_RECORDS).boxed().map(ROW_FUNC::apply).collect(Collectors.toList()); + /** * Test of a predetermined moderate number of records written to Iceberg using a Beam pipeline, * then read directly using Iceberg API. */ @Test public void testWrite() { - Catalog catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation); Table table = catalog.createTable(tableId, ICEBERG_SCHEMA); - List inputRecords = - IntStream.range(0, options.getNumRecords()) - .boxed() - .map(i -> GenericRecord.create(ICEBERG_SCHEMA).copy(getValues(i))) - .collect(Collectors.toList()); + // Write with Beam + Map config = managedIcebergConfig(); + PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA); + input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); + pipeline.run().waitUntilFinish(); - List inputRows = - inputRecords.stream() - .map(record -> IcebergUtils.icebergRecordToBeamRow(BEAM_SCHEMA, record)) - .collect(Collectors.toList()); + // Read back and check records are correct + List returnedRecords = readRecords(table); + assertThat( + returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); + } - // Write with Beam - Map config = - ImmutableMap.builder() - .put("table", tableId.toString()) - .put("catalog_name", "test-name") - .put( - "catalog_properties", - ImmutableMap.builder() - .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .put("warehouse", warehouseLocation) - .build()) + @Test + public void testWritePartitionedData() { + PartitionSpec partitionSpec = + PartitionSpec.builderFor(ICEBERG_SCHEMA) + .identity("str") + .identity("bool") + .identity("int") .build(); + Table table = catalog.createTable(tableId, ICEBERG_SCHEMA, partitionSpec); - PCollection input = writePipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA); + // Write with Beam + Map config = managedIcebergConfig(); + PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA); input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); - - writePipeline.run().waitUntilFinish(); + pipeline.run().waitUntilFinish(); // Read back and check records are correct - TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA); - List writtenRecords = new ArrayList<>(); - for (CombinedScanTask task : tableScan.planTasks()) { - InputFilesDecryptor decryptor = new InputFilesDecryptor(task, table.io(), table.encryption()); - for (FileScanTask fileTask : task.files()) { - InputFile inputFile = decryptor.getInputFile(fileTask); - CloseableIterable iterable = - Parquet.read(inputFile) - .split(fileTask.start(), fileTask.length()) - .project(ICEBERG_SCHEMA) - .createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader(ICEBERG_SCHEMA, fileSchema)) - .filter(fileTask.residual()) - .build(); - - for (Record rec : iterable) { - writtenRecords.add(rec); - } - } - } - - assertThat(inputRecords, containsInAnyOrder(writtenRecords.toArray())); + List returnedRecords = readRecords(table); + assertThat( + returnedRecords, containsInAnyOrder(INPUT_ROWS.stream().map(RECORD_FUNC::apply).toArray())); } } From 8a778b963db9b5b3a15b87171abc02339d3d1c66 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 7 Aug 2024 17:42:09 -0400 Subject: [PATCH 02/15] trigger integration tests --- .github/trigger_files/IO_Iceberg_Integration_Tests.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index bbdc3a3910e..62ae7886c57 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 3 + "modification": 4 } From 3f38f700ce602c4ec4850386fc8a3f52f274233b Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 12 Aug 2024 00:22:15 -0400 Subject: [PATCH 03/15] partitioned record writer to manage writers for different partitions --- .../beam/sdk/io/iceberg/RecordWriter.java | 34 ++-- .../io/iceberg/WriteGroupedRowsToFiles.java | 40 +++-- .../io/iceberg/WriteUngroupedRowsToFiles.java | 159 ++++-------------- 3 files changed, 82 insertions(+), 151 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index 01816855bae..8234adc56f6 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.LocationProviders; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; @@ -33,27 +34,36 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; class RecordWriter { - private final DataWriter icebergDataWriter; - private final Table table; private final String absoluteFilename; - RecordWriter(Catalog catalog, IcebergDestination destination, String filename) + RecordWriter( + Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey) throws IOException { this( - catalog.loadTable(destination.getTableIdentifier()), destination.getFileFormat(), filename); + catalog.loadTable(destination.getTableIdentifier()), + destination.getFileFormat(), + filename, + partitionKey); } - RecordWriter(Table table, FileFormat fileFormat, String filename) throws IOException { + RecordWriter(Table table, FileFormat fileFormat, String filename, PartitionKey partitionKey) + throws IOException { this.table = table; - this.absoluteFilename = table.location() + "/" + filename; + LocationProvider locationProvider = + LocationProviders.locationsFor(table.location(), table.properties()); + if (table.spec().isUnpartitioned()) { + absoluteFilename = locationProvider.newDataLocation(filename); + } else { + absoluteFilename = locationProvider.newDataLocation(table.spec(), partitionKey, filename); + } OutputFile outputFile = table.io().newOutputFile(absoluteFilename); - PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); switch (fileFormat) { case AVRO: @@ -88,12 +98,12 @@ public void write(Row row) { icebergDataWriter.write(record); } - public void close() throws IOException { - icebergDataWriter.close(); + void write(Record record) { + icebergDataWriter.write(record); } - public Table getTable() { - return table; + public void close() throws IOException { + icebergDataWriter.close(); } public long bytesWritten() { @@ -104,7 +114,7 @@ public ManifestFile getManifestFile() throws IOException { String manifestFilename = FileFormat.AVRO.addExtension(absoluteFilename + ".manifest"); OutputFile outputFile = table.io().newOutputFile(manifestFilename); ManifestWriter manifestWriter; - try (ManifestWriter openWriter = ManifestFiles.write(getTable().spec(), outputFile)) { + try (ManifestWriter openWriter = ManifestFiles.write(table.spec(), outputFile)) { openWriter.add(icebergDataWriter.toDataFile()); manifestWriter = openWriter; } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index c1126351944..471824c9856 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -17,15 +17,19 @@ */ package org.apache.beam.sdk.io.iceberg; -import java.io.IOException; import java.util.UUID; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ShardedKey; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.catalog.Catalog; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -59,6 +63,8 @@ private static class WriteGroupedRowsToFilesDoFn private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; private transient @MonotonicNonNull Catalog catalog; + private final String fileSuffix; + private final long maxFileSize; WriteGroupedRowsToFilesDoFn( IcebergCatalogConfig catalogConfig, @@ -66,6 +72,8 @@ private static class WriteGroupedRowsToFilesDoFn long maxFileSize) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; + this.fileSuffix = UUID.randomUUID().toString(); + this.maxFileSize = maxFileSize; } private org.apache.iceberg.catalog.Catalog getCatalog() { @@ -75,28 +83,32 @@ private org.apache.iceberg.catalog.Catalog getCatalog() { return catalog; } - private RecordWriter createWriter(IcebergDestination destination) throws IOException { - return new RecordWriter(getCatalog(), destination, "-" + UUID.randomUUID()); - } - @ProcessElement public void processElement( - ProcessContext c, @Element KV, Iterable> element) throws Exception { + ProcessContext c, + @Element KV, Iterable> element, + BoundedWindow window, + PaneInfo pane) + throws Exception { Row destMetadata = element.getKey().getKey(); IcebergDestination destination = dynamicDestinations.instantiateDestination(destMetadata); - RecordWriter writer = createWriter(destination); + PartitionedRecordWriter writer = + new PartitionedRecordWriter(getCatalog(), fileSuffix, maxFileSize, Integer.MAX_VALUE); for (Row e : element.getValue()) { - writer.write(e); + writer.write(destination, e, window, pane); } - writer.close(); - c.output( - FileWriteResult.builder() - .setTableIdentifier(destination.getTableIdentifier()) - .setManifestFile(writer.getManifestFile()) - .build()); + + for (WindowedValue windowedManifestFiles : + Preconditions.checkNotNull(writer.getManifestFiles().get(destination))) { + c.output( + FileWriteResult.builder() + .setTableIdentifier(destination.getTableIdentifier()) + .setManifestFile(windowedManifestFiles.getValue()) + .build()); + } } } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java index a00f3de4bb4..df2a016387a 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java @@ -18,9 +18,9 @@ package org.apache.beam.sdk.io.iceberg; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; -import java.io.IOException; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import java.util.List; import java.util.Map; import java.util.UUID; @@ -29,6 +29,8 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PInput; @@ -40,9 +42,9 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -174,10 +176,8 @@ private static class WriteUngroupedRowsToFilesDoFn extends DoFn writers; - private transient @MonotonicNonNull Map windows; private transient @MonotonicNonNull Catalog catalog; + private @Nullable PartitionedRecordWriter partitionedRecordWriter; public WriteUngroupedRowsToFilesDoFn( IcebergCatalogConfig catalogConfig, @@ -192,20 +192,6 @@ public WriteUngroupedRowsToFilesDoFn( this.maxFileSize = maxFileSize; } - private Map getWriters() { - if (writers == null) { - writers = Maps.newHashMap(); - } - return writers; - } - - private Map getWindows() { - if (windows == null) { - windows = Maps.newHashMap(); - } - return windows; - } - private org.apache.iceberg.catalog.Catalog getCatalog() { if (catalog == null) { this.catalog = catalogConfig.catalog(); @@ -213,40 +199,15 @@ private org.apache.iceberg.catalog.Catalog getCatalog() { return catalog; } - private RecordWriter createAndInsertWriter(IcebergDestination destination, BoundedWindow window) - throws IOException { - RecordWriter writer = - new RecordWriter(getCatalog(), destination, filename + "-" + UUID.randomUUID()); - getWindows().put(destination, window); - getWriters().put(destination, writer); - return writer; - } - - /** - * Returns active writer for this destination if possible. If this returns null then we have - * reached the maximum number of writers and should spill any records associated. - */ - @Nullable - RecordWriter getWriterIfPossible(IcebergDestination destination, BoundedWindow window) - throws IOException { - - RecordWriter existingWriter = getWriters().get(destination); - if (existingWriter != null) { - return existingWriter; - } - - if (getWriters().size() > maxWritersPerBundle) { - return null; - } - - return createAndInsertWriter(destination, window); - } - @StartBundle - public void startBundle() {} + public void startBundle() { + partitionedRecordWriter = + new PartitionedRecordWriter(getCatalog(), filename, maxFileSize, maxWritersPerBundle); + } @ProcessElement - public void processElement(@Element Row element, BoundedWindow window, MultiOutputReceiver out) + public void processElement( + @Element Row element, BoundedWindow window, PaneInfo pane, MultiOutputReceiver out) throws Exception { Row data = checkArgumentNotNull(element.getRow("data"), "Input row missing `data` field."); @@ -254,96 +215,44 @@ public void processElement(@Element Row element, BoundedWindow window, MultiOutp checkArgumentNotNull(element.getRow("dest"), "Input row missing `dest` field."); IcebergDestination destination = dynamicDestinations.instantiateDestination(destMetadata); - // Spill record if writer cannot be created - RecordWriter writer = getWriterIfPossible(destination, window); - if (writer == null) { - out.get(SPILLED_ROWS_TAG).output(element); - return; - } - - // Reset writer if max file size reached - if (writer.bytesWritten() > maxFileSize) { - writer.close(); - out.get(WRITTEN_FILES_TAG) - .output( - FileWriteResult.builder() - .setManifestFile(writer.getManifestFile()) - .setTableIdentifier(destination.getTableIdentifier()) - .build()); - writer = createAndInsertWriter(destination, window); - } - - // Actually write the data + // Attempt to write record. If the writer is saturated and cannot accept + // the record, spill it over to WriteGroupedRowsToFiles + boolean writeSuccess; try { - writer.write(data); - out.get(WRITTEN_ROWS_TAG).output(element); + writeSuccess = + Preconditions.checkNotNull(partitionedRecordWriter) + .write(destination, data, window, pane); } catch (Exception e) { try { - writer.close(); + Preconditions.checkNotNull(partitionedRecordWriter).close(); } catch (Exception closeException) { e.addSuppressed(closeException); } throw e; } + out.get(writeSuccess ? WRITTEN_ROWS_TAG : SPILLED_ROWS_TAG).output(element); } @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { - closeAllWriters(); - outputFinalWrittenFiles(c); - getWriters().clear(); - } - - private void outputFinalWrittenFiles(DoFn.FinishBundleContext c) - throws Exception { - List exceptionList = Lists.newArrayList(); - for (Map.Entry entry : getWriters().entrySet()) { - try { - IcebergDestination destination = entry.getKey(); - - RecordWriter writer = entry.getValue(); - BoundedWindow window = - checkStateNotNull( - getWindows().get(destination), "internal error: no windows for destination"); + if (partitionedRecordWriter == null) { + return; + } + partitionedRecordWriter.close(); + for (Map.Entry>> destinationAndFiles : + Preconditions.checkNotNull(partitionedRecordWriter).getManifestFiles().entrySet()) { + TableIdentifier identifier = destinationAndFiles.getKey().getTableIdentifier(); + for (WindowedValue windowedManifestFile : destinationAndFiles.getValue()) { c.output( FileWriteResult.builder() - .setManifestFile(writer.getManifestFile()) - .setTableIdentifier(destination.getTableIdentifier()) + .setManifestFile(windowedManifestFile.getValue()) + .setTableIdentifier(identifier) .build(), - window.maxTimestamp(), - window); - } catch (Exception e) { - exceptionList.add(e); - } - } - - if (!exceptionList.isEmpty()) { - Exception e = - new IOException("Exception emitting writer metadata. See suppressed exceptions"); - for (Exception thrown : exceptionList) { - e.addSuppressed(thrown); + windowedManifestFile.getTimestamp(), + Iterables.getFirst(windowedManifestFile.getWindows(), null)); } - throw e; - } - } - - private void closeAllWriters() throws Exception { - List exceptionList = Lists.newArrayList(); - for (RecordWriter writer : getWriters().values()) { - try { - writer.close(); - } catch (Exception e) { - exceptionList.add(e); - } - } - - if (!exceptionList.isEmpty()) { - Exception e = new IOException("Exception closing some writers. See suppressed exceptions."); - for (Exception thrown : exceptionList) { - e.addSuppressed(thrown); - } - throw e; } + partitionedRecordWriter = null; } } } From fb628400520b52f5df76ed50e1970b3d8e025214 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 12 Aug 2024 00:22:49 -0400 Subject: [PATCH 04/15] partitioned record writer --- .../io/iceberg/PartitionedRecordWriter.java | 246 ++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionedRecordWriter.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionedRecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionedRecordWriter.java new file mode 100644 index 00000000000..3103a39c9ae --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionedRecordWriter.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; +import org.apache.commons.compress.utils.Lists; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.data.Record; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A writer that opens and closes {@link RecordWriter}s as necessary for multiple tables, and + * multiple partitions within each table. If the Iceberg {@link Table} is un-partitioned, the data + * is written normally. + * + *

Each table has a {@link DestinationState} that creates a new {@link RecordWriter} for each + * partition encountered. If a {@link RecordWriter} is inactive for 5 minutes, the {@link + * DestinationState} will automatically close it to free up resources. + * + *

At any moment, the number of open data writers is at most equal to the number of partitions + * across destinations. Closing this {@link PartitionedRecordWriter} will close all {@link + * RecordWriter}s contained within. + */ +public class PartitionedRecordWriter implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(PartitionedRecordWriter.class); + + class DestinationState { + private final IcebergDestination icebergDestination; + private final PartitionKey partitionKey; + private final org.apache.iceberg.Schema schema; + private final Map>> manifestFiles = + Maps.newHashMap(); + private final Cache> writers; + + DestinationState( + IcebergDestination icebergDestination, + PartitionSpec partitionSpec, + org.apache.iceberg.Schema schema) { + this.icebergDestination = icebergDestination; + this.schema = schema; + this.partitionKey = new PartitionKey(partitionSpec, schema); + + // build a cache of RecordWriters + // writers will expire after 5min of idle time + // when a writer expires, its manifest file is collected + this.writers = + CacheBuilder.newBuilder() + .expireAfterAccess(5, TimeUnit.MINUTES) + .removalListener( + (RemovalNotification> removal) -> { + final @Nullable PartitionKey partitionKey = removal.getKey(); + String message = + partitionKey == null + ? "" + : String.format(", partition '%s'.", partitionKey); + final @Nullable WindowedValue recordWriter = removal.getValue(); + if (recordWriter != null) { + try { + LOG.info( + "Closing record writer for table '{}'" + message, + icebergDestination.getTableIdentifier()); + recordWriter.getValue().close(); + manifestFiles + .computeIfAbsent(icebergDestination, d -> Lists.newArrayList()) + .add( + WindowedValue.of( + recordWriter.getValue().getManifestFile(), + recordWriter.getTimestamp(), + recordWriter.getWindows(), + recordWriter.getPane())); + } catch (IOException e) { + throw new RuntimeException( + "Encountered an error when closing data writer for table " + + icebergDestination.getTableIdentifier() + + message, + e); + } + } + }) + .build(); + } + + void write(Record record, BoundedWindow window, PaneInfo pane) + throws IOException, ExecutionException { + partitionKey.partition(record); + RecordWriter writer = fetchWriterForPartition(partitionKey, window, pane); + writer.write(record); + } + + private RecordWriter fetchWriterForPartition( + PartitionKey partitionKey, BoundedWindow window, PaneInfo paneInfo) + throws ExecutionException { + RecordWriter recordWriter = + writers + .get( + partitionKey, + () -> + WindowedValue.of( + createWriter(partitionKey), window.maxTimestamp(), window, paneInfo)) + .getValue(); + + if (recordWriter.bytesWritten() > maxFileSize) { + writers.invalidate(partitionKey); + recordWriter = createWriter(partitionKey); + writers.put( + partitionKey, + WindowedValue.of(createWriter(partitionKey), window.maxTimestamp(), window, paneInfo)); + } + return recordWriter; + } + + private RecordWriter createWriter(PartitionKey partitionKey) { + try { + return new RecordWriter( + catalog, icebergDestination, fileSuffix + "-" + UUID.randomUUID(), partitionKey); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Encountered an error when creating a RecordWriter for table '%s', partition %s.", + icebergDestination.getTableIdentifier(), partitionKey), + e); + } + } + + void closeAllWriters() throws Exception { + List exceptionList = Lists.newArrayList(); + for (PartitionKey pk : writers.asMap().keySet()) { + try { + writers.invalidate(pk); + } catch (Exception e) { + exceptionList.add(e); + } + } + if (!exceptionList.isEmpty()) { + Exception e = new IOException("Exception closing some writers. See suppressed exceptions."); + for (Exception thrown : exceptionList) { + e.addSuppressed(thrown); + } + throw e; + } + } + } + + private final Catalog catalog; + private final String fileSuffix; + private final long maxFileSize; + private final int maxNumWriters; + private Map destinations = Maps.newHashMap(); + private final Map>> totalManifestFiles = + Maps.newHashMap(); + private boolean isClosed = false; + + PartitionedRecordWriter(Catalog catalog, String fileSuffix, long maxFileSize, int maxNumWriters) { + this.catalog = catalog; + this.fileSuffix = fileSuffix; + this.maxFileSize = maxFileSize; + this.maxNumWriters = maxNumWriters; + } + + /** + * Fetches the {@link RecordWriter} for the appropriate partition in this destination and writes + * the record. + */ + public boolean write( + IcebergDestination icebergDestination, Row row, BoundedWindow window, PaneInfo pane) + throws IOException, ExecutionException { + if (destinations.size() > maxNumWriters) { + return false; + } + DestinationState destinationState = + destinations.computeIfAbsent( + icebergDestination, + destination -> { + Table table = catalog.loadTable(destination.getTableIdentifier()); + return new DestinationState(destination, table.spec(), table.schema()); + }); + + Record icebergRecord = IcebergUtils.beamRowToIcebergRecord(destinationState.schema, row); + destinationState.write(icebergRecord, window, pane); + return true; + } + + /** Closes all remaining writers and collects all their {@link ManifestFile}s. */ + public void close() throws Exception { + for (DestinationState state : destinations.values()) { + state.closeAllWriters(); + for (Map.Entry>> entry : + state.manifestFiles.entrySet()) { + totalManifestFiles + .computeIfAbsent(entry.getKey(), icebergDestination -> Lists.newArrayList()) + .addAll(entry.getValue()); + } + state.manifestFiles.clear(); + } + destinations.clear(); + isClosed = true; + } + + /** + * Returns a mapping of {@link IcebergDestination}s to accumulated {@link ManifestFile}s outputted + * by all {@link RecordWriter}s. + */ + public Map>> getManifestFiles() { + Preconditions.checkArgument( + isClosed, + "Please close this %s before retrieving its manifest files.", + getClass().getSimpleName()); + return totalManifestFiles; + } +} From be60b36f37e55193e8aec7ef1d6906294791cdeb Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 12 Aug 2024 00:53:41 -0400 Subject: [PATCH 05/15] reject rows when we are saturated with record writers --- .../io/iceberg/PartitionedRecordWriter.java | 51 +++++++++++-------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionedRecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionedRecordWriter.java index 3103a39c9ae..441bc2f3410 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionedRecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionedRecordWriter.java @@ -45,19 +45,16 @@ import org.slf4j.LoggerFactory; /** - * A writer that opens and closes {@link RecordWriter}s as necessary for multiple tables, and - * multiple partitions within each table. If the Iceberg {@link Table} is un-partitioned, the data - * is written normally. + * A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions. + * Assigns one writer per partition. If the Iceberg {@link Table} is un-partitioned, the data is + * written normally using one {@link RecordWriter}. At a given moment, the number of open data + * writers should be less than or equal to the number of total partitions (across all destinations). * - *

Each table has a {@link DestinationState} that creates a new {@link RecordWriter} for each - * partition encountered. If a {@link RecordWriter} is inactive for 5 minutes, the {@link - * DestinationState} will automatically close it to free up resources. - * - *

At any moment, the number of open data writers is at most equal to the number of partitions - * across destinations. Closing this {@link PartitionedRecordWriter} will close all {@link - * RecordWriter}s contained within. + *

Maintains writers in a cache. If a {@link RecordWriter} is inactive for 5 minutes, the {@link + * DestinationState} will automatically close it to free up resources. Closing this {@link + * PartitionedRecordWriter} will close all of its underlying {@link RecordWriter}s. */ -public class PartitionedRecordWriter implements Serializable { +class PartitionedRecordWriter implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(PartitionedRecordWriter.class); class DestinationState { @@ -96,8 +93,9 @@ class DestinationState { "Closing record writer for table '{}'" + message, icebergDestination.getTableIdentifier()); recordWriter.getValue().close(); + openWriters--; manifestFiles - .computeIfAbsent(icebergDestination, d -> Lists.newArrayList()) + .computeIfAbsent(icebergDestination, unused -> Lists.newArrayList()) .add( WindowedValue.of( recordWriter.getValue().getManifestFile(), @@ -116,11 +114,16 @@ class DestinationState { .build(); } - void write(Record record, BoundedWindow window, PaneInfo pane) + boolean write(Record record, BoundedWindow window, PaneInfo pane) throws IOException, ExecutionException { partitionKey.partition(record); + // if we're already saturated and a writer doesn't exist for this partition, return false. + if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) { + return false; + } RecordWriter writer = fetchWriterForPartition(partitionKey, window, pane); writer.write(record); + return true; } private RecordWriter fetchWriterForPartition( @@ -147,8 +150,11 @@ private RecordWriter fetchWriterForPartition( private RecordWriter createWriter(PartitionKey partitionKey) { try { - return new RecordWriter( - catalog, icebergDestination, fileSuffix + "-" + UUID.randomUUID(), partitionKey); + RecordWriter writer = + new RecordWriter( + catalog, icebergDestination, fileSuffix + "-" + UUID.randomUUID(), partitionKey); + openWriters++; + return writer; } catch (IOException e) { throw new RuntimeException( String.format( @@ -181,6 +187,7 @@ void closeAllWriters() throws Exception { private final String fileSuffix; private final long maxFileSize; private final int maxNumWriters; + private int openWriters = 0; private Map destinations = Maps.newHashMap(); private final Map>> totalManifestFiles = Maps.newHashMap(); @@ -196,13 +203,13 @@ void closeAllWriters() throws Exception { /** * Fetches the {@link RecordWriter} for the appropriate partition in this destination and writes * the record. + * + *

If the writer is saturated (i.e. has hit the specified maximum of open writers), the record + * is rejected and returns {@code false}. */ public boolean write( IcebergDestination icebergDestination, Row row, BoundedWindow window, PaneInfo pane) throws IOException, ExecutionException { - if (destinations.size() > maxNumWriters) { - return false; - } DestinationState destinationState = destinations.computeIfAbsent( icebergDestination, @@ -212,8 +219,7 @@ public boolean write( }); Record icebergRecord = IcebergUtils.beamRowToIcebergRecord(destinationState.schema, row); - destinationState.write(icebergRecord, window, pane); - return true; + return destinationState.write(icebergRecord, window, pane); } /** Closes all remaining writers and collects all their {@link ManifestFile}s. */ @@ -229,6 +235,11 @@ public void close() throws Exception { state.manifestFiles.clear(); } destinations.clear(); + Preconditions.checkArgument( + openWriters == 0, + "Expected all data writers to be closed, but found %s data writer(s) still open", + getClass().getSimpleName(), + openWriters); isClosed = true; } From 3f7e1b8c31fb3c1050f7655952a054486b40c756 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 13 Aug 2024 15:46:12 -0400 Subject: [PATCH 06/15] refactor record writer manager --- .../io/iceberg/PartitionedRecordWriter.java | 257 ---------------- .../beam/sdk/io/iceberg/RecordWriter.java | 40 ++- .../sdk/io/iceberg/RecordWriterManager.java | 287 ++++++++++++++++++ .../io/iceberg/WriteGroupedRowsToFiles.java | 20 +- .../io/iceberg/WriteUngroupedRowsToFiles.java | 47 +-- .../io/iceberg/RecordWriterManagerTest.java | 129 ++++++++ .../sdk/io/iceberg/TestDataWarehouse.java | 8 +- 7 files changed, 492 insertions(+), 296 deletions(-) delete mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionedRecordWriter.java create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionedRecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionedRecordWriter.java deleted file mode 100644 index 441bc2f3410..00000000000 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionedRecordWriter.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.iceberg; - -import com.google.common.collect.Maps; -import java.io.IOException; -import java.io.Serializable; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; -import org.apache.commons.compress.utils.Lists; -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.data.Record; -import org.checkerframework.checker.nullness.qual.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions. - * Assigns one writer per partition. If the Iceberg {@link Table} is un-partitioned, the data is - * written normally using one {@link RecordWriter}. At a given moment, the number of open data - * writers should be less than or equal to the number of total partitions (across all destinations). - * - *

Maintains writers in a cache. If a {@link RecordWriter} is inactive for 5 minutes, the {@link - * DestinationState} will automatically close it to free up resources. Closing this {@link - * PartitionedRecordWriter} will close all of its underlying {@link RecordWriter}s. - */ -class PartitionedRecordWriter implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(PartitionedRecordWriter.class); - - class DestinationState { - private final IcebergDestination icebergDestination; - private final PartitionKey partitionKey; - private final org.apache.iceberg.Schema schema; - private final Map>> manifestFiles = - Maps.newHashMap(); - private final Cache> writers; - - DestinationState( - IcebergDestination icebergDestination, - PartitionSpec partitionSpec, - org.apache.iceberg.Schema schema) { - this.icebergDestination = icebergDestination; - this.schema = schema; - this.partitionKey = new PartitionKey(partitionSpec, schema); - - // build a cache of RecordWriters - // writers will expire after 5min of idle time - // when a writer expires, its manifest file is collected - this.writers = - CacheBuilder.newBuilder() - .expireAfterAccess(5, TimeUnit.MINUTES) - .removalListener( - (RemovalNotification> removal) -> { - final @Nullable PartitionKey partitionKey = removal.getKey(); - String message = - partitionKey == null - ? "" - : String.format(", partition '%s'.", partitionKey); - final @Nullable WindowedValue recordWriter = removal.getValue(); - if (recordWriter != null) { - try { - LOG.info( - "Closing record writer for table '{}'" + message, - icebergDestination.getTableIdentifier()); - recordWriter.getValue().close(); - openWriters--; - manifestFiles - .computeIfAbsent(icebergDestination, unused -> Lists.newArrayList()) - .add( - WindowedValue.of( - recordWriter.getValue().getManifestFile(), - recordWriter.getTimestamp(), - recordWriter.getWindows(), - recordWriter.getPane())); - } catch (IOException e) { - throw new RuntimeException( - "Encountered an error when closing data writer for table " - + icebergDestination.getTableIdentifier() - + message, - e); - } - } - }) - .build(); - } - - boolean write(Record record, BoundedWindow window, PaneInfo pane) - throws IOException, ExecutionException { - partitionKey.partition(record); - // if we're already saturated and a writer doesn't exist for this partition, return false. - if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) { - return false; - } - RecordWriter writer = fetchWriterForPartition(partitionKey, window, pane); - writer.write(record); - return true; - } - - private RecordWriter fetchWriterForPartition( - PartitionKey partitionKey, BoundedWindow window, PaneInfo paneInfo) - throws ExecutionException { - RecordWriter recordWriter = - writers - .get( - partitionKey, - () -> - WindowedValue.of( - createWriter(partitionKey), window.maxTimestamp(), window, paneInfo)) - .getValue(); - - if (recordWriter.bytesWritten() > maxFileSize) { - writers.invalidate(partitionKey); - recordWriter = createWriter(partitionKey); - writers.put( - partitionKey, - WindowedValue.of(createWriter(partitionKey), window.maxTimestamp(), window, paneInfo)); - } - return recordWriter; - } - - private RecordWriter createWriter(PartitionKey partitionKey) { - try { - RecordWriter writer = - new RecordWriter( - catalog, icebergDestination, fileSuffix + "-" + UUID.randomUUID(), partitionKey); - openWriters++; - return writer; - } catch (IOException e) { - throw new RuntimeException( - String.format( - "Encountered an error when creating a RecordWriter for table '%s', partition %s.", - icebergDestination.getTableIdentifier(), partitionKey), - e); - } - } - - void closeAllWriters() throws Exception { - List exceptionList = Lists.newArrayList(); - for (PartitionKey pk : writers.asMap().keySet()) { - try { - writers.invalidate(pk); - } catch (Exception e) { - exceptionList.add(e); - } - } - if (!exceptionList.isEmpty()) { - Exception e = new IOException("Exception closing some writers. See suppressed exceptions."); - for (Exception thrown : exceptionList) { - e.addSuppressed(thrown); - } - throw e; - } - } - } - - private final Catalog catalog; - private final String fileSuffix; - private final long maxFileSize; - private final int maxNumWriters; - private int openWriters = 0; - private Map destinations = Maps.newHashMap(); - private final Map>> totalManifestFiles = - Maps.newHashMap(); - private boolean isClosed = false; - - PartitionedRecordWriter(Catalog catalog, String fileSuffix, long maxFileSize, int maxNumWriters) { - this.catalog = catalog; - this.fileSuffix = fileSuffix; - this.maxFileSize = maxFileSize; - this.maxNumWriters = maxNumWriters; - } - - /** - * Fetches the {@link RecordWriter} for the appropriate partition in this destination and writes - * the record. - * - *

If the writer is saturated (i.e. has hit the specified maximum of open writers), the record - * is rejected and returns {@code false}. - */ - public boolean write( - IcebergDestination icebergDestination, Row row, BoundedWindow window, PaneInfo pane) - throws IOException, ExecutionException { - DestinationState destinationState = - destinations.computeIfAbsent( - icebergDestination, - destination -> { - Table table = catalog.loadTable(destination.getTableIdentifier()); - return new DestinationState(destination, table.spec(), table.schema()); - }); - - Record icebergRecord = IcebergUtils.beamRowToIcebergRecord(destinationState.schema, row); - return destinationState.write(icebergRecord, window, pane); - } - - /** Closes all remaining writers and collects all their {@link ManifestFile}s. */ - public void close() throws Exception { - for (DestinationState state : destinations.values()) { - state.closeAllWriters(); - for (Map.Entry>> entry : - state.manifestFiles.entrySet()) { - totalManifestFiles - .computeIfAbsent(entry.getKey(), icebergDestination -> Lists.newArrayList()) - .addAll(entry.getValue()); - } - state.manifestFiles.clear(); - } - destinations.clear(); - Preconditions.checkArgument( - openWriters == 0, - "Expected all data writers to be closed, but found %s data writer(s) still open", - getClass().getSimpleName(), - openWriters); - isClosed = true; - } - - /** - * Returns a mapping of {@link IcebergDestination}s to accumulated {@link ManifestFile}s outputted - * by all {@link RecordWriter}s. - */ - public Map>> getManifestFiles() { - Preconditions.checkArgument( - isClosed, - "Please close this %s before retrieving its manifest files.", - getClass().getSimpleName()); - return totalManifestFiles; - } -} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index 8234adc56f6..6c03631a82e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -20,10 +20,11 @@ import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord; import java.io.IOException; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.values.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.LocationProviders; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.ManifestWriter; @@ -34,14 +35,18 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class RecordWriter { + private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class); + private final Counter activeWriters = Metrics.counter(RecordWriterManager.class, "activeWriters"); private final DataWriter icebergDataWriter; private final Table table; private final String absoluteFilename; + private final FileFormat fileFormat; RecordWriter( Catalog catalog, IcebergDestination destination, String filename, PartitionKey partitionKey) @@ -56,12 +61,12 @@ class RecordWriter { RecordWriter(Table table, FileFormat fileFormat, String filename, PartitionKey partitionKey) throws IOException { this.table = table; - LocationProvider locationProvider = - LocationProviders.locationsFor(table.location(), table.properties()); + this.fileFormat = fileFormat; if (table.spec().isUnpartitioned()) { - absoluteFilename = locationProvider.newDataLocation(filename); + absoluteFilename = table.locationProvider().newDataLocation(filename); } else { - absoluteFilename = locationProvider.newDataLocation(table.spec(), partitionKey, filename); + absoluteFilename = + table.locationProvider().newDataLocation(table.spec(), partitionKey, filename); } OutputFile outputFile = table.io().newOutputFile(absoluteFilename); @@ -91,6 +96,13 @@ class RecordWriter { default: throw new RuntimeException("Unknown File Format: " + fileFormat); } + activeWriters.inc(); + LOG.info( + "Opened {} writer for table {}, partition {}. Writing to path: {}", + fileFormat, + table.name(), + partitionKey, + absoluteFilename); } public void write(Row row) { @@ -103,7 +115,17 @@ void write(Record record) { } public void close() throws IOException { - icebergDataWriter.close(); + try { + icebergDataWriter.close(); + } catch (IOException e) { + throw new IOException( + String.format( + "Failed to close %s writer for table %s, path: %s", + fileFormat, table.name(), absoluteFilename), + e); + } + activeWriters.dec(); + LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename); } public long bytesWritten() { @@ -121,4 +143,8 @@ public ManifestFile getManifestFile() throws IOException { return manifestWriter.toManifestFile(); } + + public DataFile getDataFile() { + return icebergDataWriter.toDataFile(); + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java new file mode 100644 index 00000000000..3da59cbe226 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.commons.compress.utils.Lists; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions. + * Assigns one {@link DestinationState} per windowed destination. A {@link DestinationState} assigns + * one writer per partition in table destination. If the Iceberg {@link Table} is un-partitioned, + * the data is written normally using one {@link RecordWriter} (i.e. the {@link DestinationState} + * has one writer). At any given moment, the number of open data writers should be less than or + * equal to the number of total partitions (across all windowed destinations). + * + *

A {@link DestinationState} maintains its writers in a {@link Cache}. If a {@link RecordWriter} + * is inactive for 1 minute, the {@link DestinationState} will automatically close it to free up + * resources. Calling {@link #close()} on this {@link RecordWriterManager} will do the following for + * each {@link DestinationState}: + * + *

    + *
  1. Close all underlying {@link RecordWriter}s + *
  2. Collect all {@link DataFile}s + *
  3. Create a new {@link ManifestFile} referencing these {@link DataFile}s + *
+ * + *

After closing, the resulting {@link ManifestFile}s can be retrieved using {@link + * #getManifestFiles()}. + */ +class RecordWriterManager implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(RecordWriterManager.class); + + /** + * Represents the state of one Iceberg table destination. Creates one {@link RecordWriter} per + * partition and manages them in a {@link Cache}. + * + *

On closing, each writer's output {@link DataFile} is collected. + */ + class DestinationState { + private final IcebergDestination icebergDestination; + private final PartitionSpec spec; + private final org.apache.iceberg.Schema schema; + private final PartitionKey partitionKey; + private final String tableLocation; + private final FileIO fileIO; + private final String stateToken = UUID.randomUUID().toString(); + private final List dataFiles = Lists.newArrayList(); + private final Cache writers; + private final Map writerCounts = Maps.newHashMap(); + + DestinationState(IcebergDestination icebergDestination, Table table) { + this.icebergDestination = icebergDestination; + this.schema = table.schema(); + this.spec = table.spec(); + this.partitionKey = new PartitionKey(spec, schema); + this.tableLocation = table.location(); + this.fileIO = table.io(); + + // build a cache of RecordWriters. + // writers will expire after 1 min of idle time. + // when a writer expires, its data file is collected. + this.writers = + CacheBuilder.newBuilder() + .expireAfterAccess(1, TimeUnit.MINUTES) + .removalListener( + (RemovalNotification removal) -> { + final RecordWriter recordWriter = + Preconditions.checkNotNull(removal.getValue()); + try { + recordWriter.close(); + openWriters--; + dataFiles.add(recordWriter.getDataFile()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .build(); + } + + /** + * Computes the partition key for this Iceberg {@link Record} and writes it using the + * appropriate {@link RecordWriter}, creating new writers as needed. + * + *

However, if this {@link RecordWriterManager} is already saturated with writers, and we + * can't create a new writer, the {@link Record} is rejected and {@code false} is returned. + */ + boolean write(Record record) { + partitionKey.partition(record); + + if (!writers.asMap().containsKey(partitionKey) && openWriters >= maxNumWriters) { + return false; + } + RecordWriter writer = fetchWriterForPartition(partitionKey); + writer.write(record); + return true; + } + + /** + * Checks if a viable {@link RecordWriter} already exists for this partition and returns it. If + * no {@link RecordWriter} exists or if it has reached the maximum limit of bytes written, a new + * one is created and returned. + */ + private RecordWriter fetchWriterForPartition(PartitionKey partitionKey) { + RecordWriter recordWriter = writers.getIfPresent(partitionKey); + + if (recordWriter == null || recordWriter.bytesWritten() > maxFileSize) { + // calling invalidate for a non-existent key is a safe operation + writers.invalidate(partitionKey); + recordWriter = createWriter(partitionKey); + writers.put(partitionKey, recordWriter); + } + return recordWriter; + } + + private RecordWriter createWriter(PartitionKey partitionKey) { + // keep track of how many writers we opened for each destination-partition path + // use this as a prefix to differentiate the new path. + // this avoids overwriting a data file written by a previous writer in this destination state. + int recordIndex = writerCounts.merge(partitionKey, 1, Integer::sum); + try { + RecordWriter writer = + new RecordWriter( + catalog, + icebergDestination, + filePrefix + "_" + stateToken + "_" + recordIndex, + partitionKey); + openWriters++; + return writer; + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Encountered an error when creating a RecordWriter for table '%s', partition %s.", + icebergDestination.getTableIdentifier(), partitionKey), + e); + } + } + + private String getManifestFileLocation(PaneInfo paneInfo) { + return FileFormat.AVRO.addExtension( + String.format( + "%s/metadata/%s-%s-%s.manifest", + tableLocation, filePrefix, stateToken, paneInfo.getIndex())); + } + } + + private final Catalog catalog; + private final String filePrefix; + private final long maxFileSize; + private final int maxNumWriters; + @VisibleForTesting int openWriters = 0; + + @VisibleForTesting + final Map, DestinationState> destinations = Maps.newHashMap(); + + @VisibleForTesting + private final Map, List> totalManifestFiles = + Maps.newHashMap(); + + @VisibleForTesting boolean isClosed = false; + + RecordWriterManager(Catalog catalog, String filePrefix, long maxFileSize, int maxNumWriters) { + this.catalog = catalog; + this.filePrefix = filePrefix; + this.maxFileSize = maxFileSize; + this.maxNumWriters = maxNumWriters; + } + + /** + * Fetches the appropriate {@link RecordWriter} for this destination and partition and writes the + * record. + * + *

If the {@link RecordWriterManager} is saturated (i.e. has hit the maximum limit of open + * writers), the record is rejected and {@code false} is returned. + */ + public boolean write(WindowedValue icebergDestination, Row row) { + DestinationState destinationState = + destinations.computeIfAbsent( + icebergDestination, + destination -> { + Table table = catalog.loadTable(destination.getValue().getTableIdentifier()); + return new DestinationState(destination.getValue(), table); + }); + + Record icebergRecord = IcebergUtils.beamRowToIcebergRecord(destinationState.schema, row); + return destinationState.write(icebergRecord); + } + + /** + * Closes all remaining writers and collects all their {@link DataFile}s. Writes one {@link + * ManifestFile} per windowed table destination. + */ + public void close() throws IOException { + for (Map.Entry, DestinationState> + windowedDestinationAndState : destinations.entrySet()) { + WindowedValue windowedDestination = windowedDestinationAndState.getKey(); + DestinationState state = windowedDestinationAndState.getValue(); + + // removing writers from the state's cache will trigger the logic to collect each writer's + // data file. + state.writers.invalidateAll(); + OutputFile outputFile = + state.fileIO.newOutputFile(state.getManifestFileLocation(windowedDestination.getPane())); + + ManifestWriter manifestWriter; + try (ManifestWriter openWriter = ManifestFiles.write(state.spec, outputFile)) { + openWriter.addAll(state.dataFiles); + manifestWriter = openWriter; + } + ManifestFile manifestFile = manifestWriter.toManifestFile(); + + LOG.info( + "Successfully wrote manifest file, adding {} data files ({} rows) to table '{}': {}.", + manifestFile.addedFilesCount(), + manifestFile.addedRowsCount(), + windowedDestination.getValue().getTableIdentifier(), + outputFile.location()); + + totalManifestFiles + .computeIfAbsent(windowedDestination, dest -> Lists.newArrayList()) + .add(manifestFile); + + state.dataFiles.clear(); + } + destinations.clear(); + Preconditions.checkArgument( + openWriters == 0, + "Expected all data writers to be closed, but found %s data writer(s) still open", + openWriters); + isClosed = true; + } + + /** + * Returns a list of accumulated windowed {@link ManifestFile}s for each windowed {@link + * IcebergDestination}. The {@link RecordWriterManager} must first be closed before this is + * called. + */ + public Map, List> getManifestFiles() { + Preconditions.checkArgument( + isClosed, + "Please close this %s before retrieving its manifest files.", + getClass().getSimpleName()); + return totalManifestFiles; + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index 471824c9856..3044768143b 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +import java.util.List; import java.util.UUID; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -63,7 +64,7 @@ private static class WriteGroupedRowsToFilesDoFn private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; private transient @MonotonicNonNull Catalog catalog; - private final String fileSuffix; + private final String filePrefix; private final long maxFileSize; WriteGroupedRowsToFilesDoFn( @@ -72,7 +73,7 @@ private static class WriteGroupedRowsToFilesDoFn long maxFileSize) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; - this.fileSuffix = UUID.randomUUID().toString(); + this.filePrefix = UUID.randomUUID().toString(); this.maxFileSize = maxFileSize; } @@ -93,20 +94,23 @@ public void processElement( Row destMetadata = element.getKey().getKey(); IcebergDestination destination = dynamicDestinations.instantiateDestination(destMetadata); - PartitionedRecordWriter writer = - new PartitionedRecordWriter(getCatalog(), fileSuffix, maxFileSize, Integer.MAX_VALUE); + WindowedValue windowedDestination = + WindowedValue.of(destination, window.maxTimestamp(), window, pane); + RecordWriterManager writer = + new RecordWriterManager(getCatalog(), filePrefix, maxFileSize, Integer.MAX_VALUE); for (Row e : element.getValue()) { - writer.write(destination, e, window, pane); + writer.write(windowedDestination, e); } writer.close(); - for (WindowedValue windowedManifestFiles : - Preconditions.checkNotNull(writer.getManifestFiles().get(destination))) { + List manifestFiles = + Preconditions.checkNotNull(writer.getManifestFiles().get(windowedDestination)); + for (ManifestFile manifestFile : manifestFiles) { c.output( FileWriteResult.builder() .setTableIdentifier(destination.getTableIdentifier()) - .setManifestFile(windowedManifestFiles.getValue()) + .setManifestFile(manifestFile) .build()); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java index df2a016387a..2720638f293 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java @@ -19,8 +19,6 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; import java.util.List; import java.util.Map; import java.util.UUID; @@ -40,11 +38,12 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -68,7 +67,7 @@ class WriteUngroupedRowsToFiles private static final TupleTag WRITTEN_ROWS_TAG = new TupleTag("writtenRows") {}; private static final TupleTag SPILLED_ROWS_TAG = new TupleTag("spilledRows") {}; - private final String fileSuffix; + private final String filePrefix; private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; @@ -76,7 +75,7 @@ class WriteUngroupedRowsToFiles IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; - this.fileSuffix = UUID.randomUUID().toString(); + this.filePrefix = UUID.randomUUID().toString(); } @Override @@ -88,7 +87,7 @@ public Result expand(PCollection input) { new WriteUngroupedRowsToFilesDoFn( catalogConfig, dynamicDestinations, - fileSuffix, + filePrefix, DEFAULT_MAX_WRITERS_PER_BUNDLE, DEFAULT_MAX_BYTES_PER_FILE)) .withOutputTags( @@ -177,7 +176,7 @@ private static class WriteUngroupedRowsToFilesDoFn extends DoFn windowedDestination = + WindowedValue.of(destination, window.maxTimestamp(), window, pane); // Attempt to write record. If the writer is saturated and cannot accept // the record, spill it over to WriteGroupedRowsToFiles boolean writeSuccess; try { writeSuccess = - Preconditions.checkNotNull(partitionedRecordWriter) - .write(destination, data, window, pane); + Preconditions.checkNotNull(recordWriterManager).write(windowedDestination, data); } catch (Exception e) { try { - Preconditions.checkNotNull(partitionedRecordWriter).close(); + Preconditions.checkNotNull(recordWriterManager).close(); } catch (Exception closeException) { e.addSuppressed(closeException); } @@ -235,24 +235,25 @@ public void processElement( @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { - if (partitionedRecordWriter == null) { + if (recordWriterManager == null) { return; } - partitionedRecordWriter.close(); - for (Map.Entry>> destinationAndFiles : - Preconditions.checkNotNull(partitionedRecordWriter).getManifestFiles().entrySet()) { - TableIdentifier identifier = destinationAndFiles.getKey().getTableIdentifier(); - for (WindowedValue windowedManifestFile : destinationAndFiles.getValue()) { + recordWriterManager.close(); + for (Map.Entry, List> destinationAndFiles : + Preconditions.checkNotNull(recordWriterManager).getManifestFiles().entrySet()) { + WindowedValue windowedDestination = destinationAndFiles.getKey(); + + for (ManifestFile manifestFile : destinationAndFiles.getValue()) { c.output( FileWriteResult.builder() - .setManifestFile(windowedManifestFile.getValue()) - .setTableIdentifier(identifier) + .setManifestFile(manifestFile) + .setTableIdentifier(windowedDestination.getValue().getTableIdentifier()) .build(), - windowedManifestFile.getTimestamp(), - Iterables.getFirst(windowedManifestFile.getWindows(), null)); + windowedDestination.getTimestamp(), + Iterables.getFirst(windowedDestination.getWindows(), null)); } } - partitionedRecordWriter = null; + recordWriterManager = null; } } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java new file mode 100644 index 00000000000..7d79070612b --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test class for {@link RecordWriterManager}. */ +@RunWith(JUnit4.class) +public class RecordWriterManagerTest { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule + public transient TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + + @Rule public TestName testName = new TestName(); + private static final Schema BEAM_SCHEMA = + Schema.builder().addInt32Field("id").addStringField("name").addBooleanField("bool").build(); + private static final org.apache.iceberg.Schema ICEBERG_SCHEMA = + IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA); + private static final PartitionSpec PARTITION_SPEC = + PartitionSpec.builderFor(ICEBERG_SCHEMA).truncate("name", 3).identity("bool").build(); + + private TableIdentifier tableIdentifier; + private WindowedValue windowedDestination; + private HadoopCatalog catalog; + + @Before + public void setUp() { + tableIdentifier = TableIdentifier.of("default", "table_" + testName.getMethodName()); + IcebergDestination icebergDestination = + IcebergDestination.builder() + .setFileFormat(FileFormat.PARQUET) + .setTableIdentifier(tableIdentifier) + .build(); + windowedDestination = + WindowedValue.of( + icebergDestination, + GlobalWindow.TIMESTAMP_MAX_VALUE, + GlobalWindow.INSTANCE, + PaneInfo.NO_FIRING); + warehouse.createTable(tableIdentifier, ICEBERG_SCHEMA, PARTITION_SPEC); + catalog = new HadoopCatalog(new Configuration(), warehouse.location); + } + + @Test + public void testCreateNewWriterForEachPartition() throws IOException { + // New writer manager with a maximum limit of 3 writers + RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 100, 3); + assertEquals(0, writerManager.openWriters); + + boolean writeSuccess; + + // The following row will have new partition: [aaa, true]. + // This is a new partition so a new writer will be created. + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + assertEquals(1, writerManager.openWriters); + + // The following row will have new partition: [bbb, false]. + // This is a new partition so a new writer will be created. + row = Row.withSchema(BEAM_SCHEMA).addValues(2, "bbb", false).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + assertEquals(2, writerManager.openWriters); + + // The following row will have existing partition: [bbb, false]. + // A writer already exists for this partition, so no new writers are created. + row = Row.withSchema(BEAM_SCHEMA).addValues(3, "bbbaaa", false).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + assertEquals(2, writerManager.openWriters); + + // The following row will have new partition: [bbb, true]. + // This is a new partition so a new writer will be created. + row = Row.withSchema(BEAM_SCHEMA).addValues(4, "bbb123", true).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + assertEquals(3, writerManager.openWriters); + + // The following row will have new partition: [aaa, false]. + // The writerManager is already saturated with three writers. This record is rejected. + row = Row.withSchema(BEAM_SCHEMA).addValues(5, "aaa123", false).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertFalse(writeSuccess); + assertEquals(3, writerManager.openWriters); + + // Closing PartitionRecordWriter will close all writers. + writerManager.close(); + assertEquals(0, writerManager.openWriters); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java index a8f63383801..ad4fc6b382d 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java @@ -44,6 +44,7 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Assert; import org.junit.rules.ExternalResource; import org.junit.rules.TemporaryFolder; @@ -140,7 +141,12 @@ public DataFile writeRecords(String filename, Schema schema, List record } public Table createTable(TableIdentifier tableId, Schema schema) { + return createTable(tableId, schema, null); + } + + public Table createTable( + TableIdentifier tableId, Schema schema, @Nullable PartitionSpec partitionSpec) { someTableHasBeenCreated = true; - return catalog.createTable(tableId, schema); + return catalog.createTable(tableId, schema, partitionSpec); } } From e8d5674d2c0b5cebd68c1523d516a0978ddd5a17 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 13 Aug 2024 16:29:23 -0400 Subject: [PATCH 07/15] add tests --- .../sdk/io/iceberg/RecordWriterManager.java | 6 +- .../io/iceberg/RecordWriterManagerTest.java | 64 ++++++++++++++++++- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 3da59cbe226..ba1844d96e2 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -31,8 +31,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; -import org.apache.commons.compress.utils.Lists; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.ManifestFile; @@ -88,8 +88,8 @@ class DestinationState { private final FileIO fileIO; private final String stateToken = UUID.randomUUID().toString(); private final List dataFiles = Lists.newArrayList(); - private final Cache writers; - private final Map writerCounts = Maps.newHashMap(); + @VisibleForTesting final Cache writers; + @VisibleForTesting final Map writerCounts = Maps.newHashMap(); DestinationState(IcebergDestination icebergDestination, Table table) { this.icebergDestination = icebergDestination; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 7d79070612b..3f2aa2b4fec 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -22,13 +22,18 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Map; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; @@ -57,13 +62,13 @@ public class RecordWriterManagerTest { private static final PartitionSpec PARTITION_SPEC = PartitionSpec.builderFor(ICEBERG_SCHEMA).truncate("name", 3).identity("bool").build(); - private TableIdentifier tableIdentifier; private WindowedValue windowedDestination; private HadoopCatalog catalog; @Before public void setUp() { - tableIdentifier = TableIdentifier.of("default", "table_" + testName.getMethodName()); + TableIdentifier tableIdentifier = + TableIdentifier.of("default", "table_" + testName.getMethodName()); IcebergDestination icebergDestination = IcebergDestination.builder() .setFileFormat(FileFormat.PARQUET) @@ -81,7 +86,7 @@ public void setUp() { @Test public void testCreateNewWriterForEachPartition() throws IOException { - // New writer manager with a maximum limit of 3 writers + // Writer manager with a maximum limit of 3 writers RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 100, 3); assertEquals(0, writerManager.openWriters); @@ -125,5 +130,58 @@ public void testCreateNewWriterForEachPartition() throws IOException { // Closing PartitionRecordWriter will close all writers. writerManager.close(); assertEquals(0, writerManager.openWriters); + + ManifestFile manifestFile = + Iterables.getOnlyElement(writerManager.getManifestFiles().get(windowedDestination)); + + assertEquals(3, manifestFile.addedFilesCount().intValue()); + assertEquals(4, manifestFile.addedRowsCount().intValue()); + } + + @Test + public void testRespectMaxFileSize() throws IOException { + // Writer manager with a maximum file size of 100 bytes + RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 100, 2); + assertEquals(0, writerManager.openWriters); + boolean writeSuccess; + + PartitionKey partitionKey = new PartitionKey(PARTITION_SPEC, ICEBERG_SCHEMA); + // row partition:: [aaa, true]. + // This is a new partition so a new writer will be created. + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + assertEquals(1, writerManager.openWriters); + + partitionKey.partition(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row)); + Map writerCounts = + writerManager.destinations.get(windowedDestination).writerCounts; + // this is our first writer + assertEquals(1, writerCounts.get(partitionKey).intValue()); + + // row partition:: [aaa, true]. + // existing partition. use existing writer + row = + Row.withSchema(BEAM_SCHEMA) + .addValues(2, "aaa" + RandomStringUtils.randomAlphanumeric(1000), true) + .build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + assertEquals(1, writerManager.openWriters); + // check that we still use our first writer + assertEquals(1, writerCounts.get(partitionKey).intValue()); + + // row partition:: [aaa, true]. + // writer has reached max file size. create a new writer + row = Row.withSchema(BEAM_SCHEMA).addValues(2, "aaabb", true).build(); + writeSuccess = writerManager.write(windowedDestination, row); + assertTrue(writeSuccess); + // check that we have opened and are using a second writer + assertEquals(2, writerCounts.get(partitionKey).intValue()); + // check that only one writer is open (we have closed the first writer) + assertEquals(1, writerManager.openWriters); + + writerManager.close(); + assertEquals(0, writerManager.openWriters); } } From 86fa05d8e97771e0e36aa31cea3032938ae4c6d3 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 13 Aug 2024 17:44:17 -0400 Subject: [PATCH 08/15] add more tests --- .../sdk/io/iceberg/RecordWriterManager.java | 9 +- .../beam/sdk/io/iceberg/IcebergIOIT.java | 15 ++- .../io/iceberg/RecordWriterManagerTest.java | 111 +++++++++++++++--- 3 files changed, 111 insertions(+), 24 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index ba1844d96e2..bcb7b7ecef8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -195,11 +195,10 @@ private String getManifestFileLocation(PaneInfo paneInfo) { @VisibleForTesting final Map, DestinationState> destinations = Maps.newHashMap(); - @VisibleForTesting private final Map, List> totalManifestFiles = Maps.newHashMap(); - @VisibleForTesting boolean isClosed = false; + private boolean isClosed = false; RecordWriterManager(Catalog catalog, String filePrefix, long maxFileSize, int maxNumWriters) { this.catalog = catalog; @@ -241,6 +240,10 @@ public void close() throws IOException { // removing writers from the state's cache will trigger the logic to collect each writer's // data file. state.writers.invalidateAll(); + if (state.dataFiles.isEmpty()) { + continue; + } + OutputFile outputFile = state.fileIO.newOutputFile(state.getManifestFileLocation(windowedDestination.getPane())); @@ -278,7 +281,7 @@ public void close() throws IOException { * called. */ public Map, List> getManifestFiles() { - Preconditions.checkArgument( + Preconditions.checkState( isClosed, "Please close this %s before retrieving its manifest files.", getClass().getSimpleName()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index a68689af4e9..5a524298398 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -84,6 +84,7 @@ public class IcebergIOIT implements Serializable { private static final org.apache.beam.sdk.schemas.Schema BEAM_SCHEMA = org.apache.beam.sdk.schemas.Schema.builder() .addStringField("str") + .addInt64Field("modulo_5") .addBooleanField("bool") .addInt32Field("int") .addRowField("row", NESTED_ROW_SCHEMA) @@ -110,7 +111,8 @@ public Row apply(Long num) { .build(); return Row.withSchema(BEAM_SCHEMA) - .addValue("str_value_" + strNum) + .addValue("value_" + strNum) + .addValue(num % 5) .addValue(num % 2 == 0) .addValue(Integer.valueOf(strNum)) .addValue(nestedRow) @@ -209,9 +211,10 @@ private List readRecords(Table table) { TableScan tableScan = table.newScan().project(ICEBERG_SCHEMA); List writtenRecords = new ArrayList<>(); for (CombinedScanTask task : tableScan.planTasks()) { - InputFilesDecryptor decryptor = new InputFilesDecryptor(task, table.io(), table.encryption()); + InputFilesDecryptor descryptor = + new InputFilesDecryptor(task, table.io(), table.encryption()); for (FileScanTask fileTask : task.files()) { - InputFile inputFile = decryptor.getInputFile(fileTask); + InputFile inputFile = descryptor.getInputFile(fileTask); CloseableIterable iterable = Parquet.read(inputFile) .split(fileTask.start(), fileTask.length()) @@ -287,11 +290,13 @@ public void testWrite() { @Test public void testWritePartitionedData() { + // For an example row where bool=true, modulo_5=3, str=value_303, + // this partition spec will create a partition like: /bool=true/modulo_5=3/str_trunc=value_3/ PartitionSpec partitionSpec = PartitionSpec.builderFor(ICEBERG_SCHEMA) - .identity("str") .identity("bool") - .identity("int") + .identity("modulo_5") + .truncate("str", "value_x".length()) .build(); Table table = catalog.createTable(tableId, ICEBERG_SCHEMA, partitionSpec); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 3f2aa2b4fec..1c2e8bc2c45 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -17,8 +17,11 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -37,6 +40,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -67,60 +71,124 @@ public class RecordWriterManagerTest { @Before public void setUp() { - TableIdentifier tableIdentifier = - TableIdentifier.of("default", "table_" + testName.getMethodName()); + windowedDestination = + getWindowedDestination("table_" + testName.getMethodName(), PARTITION_SPEC); + catalog = new HadoopCatalog(new Configuration(), warehouse.location); + } + + private WindowedValue getWindowedDestination( + String tableName, @Nullable PartitionSpec partitionSpec) { + TableIdentifier tableIdentifier = TableIdentifier.of("default", tableName); + + warehouse.createTable(tableIdentifier, ICEBERG_SCHEMA, partitionSpec); + IcebergDestination icebergDestination = IcebergDestination.builder() .setFileFormat(FileFormat.PARQUET) .setTableIdentifier(tableIdentifier) .build(); - windowedDestination = - WindowedValue.of( - icebergDestination, - GlobalWindow.TIMESTAMP_MAX_VALUE, - GlobalWindow.INSTANCE, - PaneInfo.NO_FIRING); - warehouse.createTable(tableIdentifier, ICEBERG_SCHEMA, PARTITION_SPEC); - catalog = new HadoopCatalog(new Configuration(), warehouse.location); + return WindowedValue.of( + icebergDestination, + GlobalWindow.TIMESTAMP_MAX_VALUE, + GlobalWindow.INSTANCE, + PaneInfo.NO_FIRING); + } + + @Test + public void testCreateNewWriterForEachDestination() throws IOException { + // Writer manager with a maximum limit of 3 writers + RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3); + assertEquals(0, writerManager.openWriters); + + boolean writeSuccess; + + WindowedValue dest1 = getWindowedDestination("dest1", null); + WindowedValue dest2 = getWindowedDestination("dest2", null); + WindowedValue dest3 = getWindowedDestination("dest3", PARTITION_SPEC); + WindowedValue dest4 = getWindowedDestination("dest4", null); + + // dest1 + // This is a new destination so a new writer will be created. + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writeSuccess = writerManager.write(dest1, row); + assertTrue(writeSuccess); + assertEquals(1, writerManager.openWriters); + + // dest2 + // This is a new destination so a new writer will be created. + row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writeSuccess = writerManager.write(dest2, row); + assertTrue(writeSuccess); + assertEquals(2, writerManager.openWriters); + + // dest3, partition: [aaa, true] + // This is a new destination so a new writer will be created. + row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writeSuccess = writerManager.write(dest3, row); + assertTrue(writeSuccess); + assertEquals(3, writerManager.openWriters); + + // dest4 + // This is a new destination, but the writer manager is saturated with 3 writers. reject the + // record + row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writeSuccess = writerManager.write(dest4, row); + assertFalse(writeSuccess); + assertEquals(3, writerManager.openWriters); + + // dest3, partition: [aaa, false] + // new partition, but the writer manager is saturated with 3 writers. reject the record + row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", false).build(); + writeSuccess = writerManager.write(dest3, row); + assertFalse(writeSuccess); + assertEquals(3, writerManager.openWriters); + + // Closing PartitionRecordWriter will close all writers. + writerManager.close(); + assertEquals(0, writerManager.openWriters); + + // We should only have 3 manifest files (one for each destination we wrote to) + assertEquals(3, writerManager.getManifestFiles().keySet().size()); + assertThat(writerManager.getManifestFiles().keySet(), containsInAnyOrder(dest1, dest2, dest3)); } @Test public void testCreateNewWriterForEachPartition() throws IOException { // Writer manager with a maximum limit of 3 writers - RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 100, 3); + RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3); assertEquals(0, writerManager.openWriters); boolean writeSuccess; - // The following row will have new partition: [aaa, true]. + // partition: [aaa, true]. // This is a new partition so a new writer will be created. Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); writeSuccess = writerManager.write(windowedDestination, row); assertTrue(writeSuccess); assertEquals(1, writerManager.openWriters); - // The following row will have new partition: [bbb, false]. + // partition: [bbb, false]. // This is a new partition so a new writer will be created. row = Row.withSchema(BEAM_SCHEMA).addValues(2, "bbb", false).build(); writeSuccess = writerManager.write(windowedDestination, row); assertTrue(writeSuccess); assertEquals(2, writerManager.openWriters); - // The following row will have existing partition: [bbb, false]. + // partition: [bbb, false]. // A writer already exists for this partition, so no new writers are created. row = Row.withSchema(BEAM_SCHEMA).addValues(3, "bbbaaa", false).build(); writeSuccess = writerManager.write(windowedDestination, row); assertTrue(writeSuccess); assertEquals(2, writerManager.openWriters); - // The following row will have new partition: [bbb, true]. + // partition: [bbb, true]. // This is a new partition so a new writer will be created. row = Row.withSchema(BEAM_SCHEMA).addValues(4, "bbb123", true).build(); writeSuccess = writerManager.write(windowedDestination, row); assertTrue(writeSuccess); assertEquals(3, writerManager.openWriters); - // The following row will have new partition: [aaa, false]. + // partition: [aaa, false]. // The writerManager is already saturated with three writers. This record is rejected. row = Row.withSchema(BEAM_SCHEMA).addValues(5, "aaa123", false).build(); writeSuccess = writerManager.write(windowedDestination, row); @@ -131,6 +199,7 @@ public void testCreateNewWriterForEachPartition() throws IOException { writerManager.close(); assertEquals(0, writerManager.openWriters); + assertEquals(1, writerManager.getManifestFiles().size()); ManifestFile manifestFile = Iterables.getOnlyElement(writerManager.getManifestFiles().get(windowedDestination)); @@ -184,4 +253,14 @@ public void testRespectMaxFileSize() throws IOException { writerManager.close(); assertEquals(0, writerManager.openWriters); } + + @Test + public void testRequireClosingBeforeFetchingManifestFiles() { + RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 100, 2); + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + writerManager.write(windowedDestination, row); + assertEquals(1, writerManager.openWriters); + + assertThrows(IllegalStateException.class, writerManager::getManifestFiles); + } } From 4c33e06628e26d8b1ee22a586ea614f39aebe8f5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 13 Aug 2024 18:05:04 -0400 Subject: [PATCH 09/15] make record writer manager transient --- .../org/apache/beam/sdk/io/iceberg/RecordWriterManager.java | 3 +-- .../apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index bcb7b7ecef8..fa1bb26c1d8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.iceberg; import java.io.IOException; -import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.UUID; @@ -70,7 +69,7 @@ *

After closing, the resulting {@link ManifestFile}s can be retrieved using {@link * #getManifestFiles()}. */ -class RecordWriterManager implements Serializable { +class RecordWriterManager { private static final Logger LOG = LoggerFactory.getLogger(RecordWriterManager.class); /** diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java index 2720638f293..0ca06d79775 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java @@ -176,7 +176,7 @@ private static class WriteUngroupedRowsToFilesDoFn extends DoFn Date: Tue, 13 Aug 2024 18:19:48 -0400 Subject: [PATCH 10/15] clean up test path --- .../java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 5a524298398..2e748e9644e 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -160,13 +160,9 @@ public static void beforeClass() { @Before public void setUp() { warehouseLocation = - String.format( - "%s/IcebergIOIT/%s/%s", - options.getTempLocation(), testName.getMethodName(), UUID.randomUUID()); + String.format("%s/IcebergIOIT/%s", options.getTempLocation(), UUID.randomUUID()); - tableId = - TableIdentifier.of( - testName.getMethodName(), "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + tableId = TableIdentifier.of(testName.getMethodName(), "test_table"); catalog = new HadoopCatalog(catalogHadoopConf, warehouseLocation); } From f3567c90af2e1ca497020df4cf7a3d9e92a2e950 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 13 Aug 2024 18:30:05 -0400 Subject: [PATCH 11/15] cleanup --- .../org/apache/beam/sdk/io/iceberg/RecordWriter.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index 6c03631a82e..e6b73c36495 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -17,12 +17,9 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord; - import java.io.IOException; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.values.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.ManifestFile; @@ -105,12 +102,7 @@ class RecordWriter { absoluteFilename); } - public void write(Row row) { - Record record = beamRowToIcebergRecord(table.schema(), row); - icebergDataWriter.write(record); - } - - void write(Record record) { + public void write(Record record) { icebergDataWriter.write(record); } From f26e1f6e235995981ece5d5487ff55dc87ff949a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 13 Aug 2024 18:31:51 -0400 Subject: [PATCH 12/15] cleanup --- .../apache/beam/sdk/io/iceberg/RecordWriter.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index e6b73c36495..576d1e32c46 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -22,9 +22,6 @@ import org.apache.beam.sdk.metrics.Metrics; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.ManifestFile; -import org.apache.iceberg.ManifestFiles; -import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; @@ -124,18 +121,6 @@ public long bytesWritten() { return icebergDataWriter.length(); } - public ManifestFile getManifestFile() throws IOException { - String manifestFilename = FileFormat.AVRO.addExtension(absoluteFilename + ".manifest"); - OutputFile outputFile = table.io().newOutputFile(manifestFilename); - ManifestWriter manifestWriter; - try (ManifestWriter openWriter = ManifestFiles.write(table.spec(), outputFile)) { - openWriter.add(icebergDataWriter.toDataFile()); - manifestWriter = openWriter; - } - - return manifestWriter.toManifestFile(); - } - public DataFile getDataFile() { return icebergDataWriter.toDataFile(); } From 5c6c5eae395a675c702c71a729c8613e39f29305 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 13 Aug 2024 23:02:14 -0400 Subject: [PATCH 13/15] address comments --- .../sdk/io/iceberg/RecordWriterManager.java | 45 ++++++++++++------- .../io/iceberg/WriteGroupedRowsToFiles.java | 13 +++--- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index fa1bb26c1d8..0b90bb96bca 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -17,16 +17,19 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + import java.io.IOException; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; @@ -69,7 +72,7 @@ *

After closing, the resulting {@link ManifestFile}s can be retrieved using {@link * #getManifestFiles()}. */ -class RecordWriterManager { +class RecordWriterManager implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(RecordWriterManager.class); /** @@ -104,21 +107,30 @@ class DestinationState { this.writers = CacheBuilder.newBuilder() .expireAfterAccess(1, TimeUnit.MINUTES) - .removalListener( - (RemovalNotification removal) -> { - final RecordWriter recordWriter = - Preconditions.checkNotNull(removal.getValue()); - try { - recordWriter.close(); - openWriters--; - dataFiles.add(recordWriter.getDataFile()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) + .removalListener(this::closeWriterForPartition) .build(); } + /** + * Called when a writer is evicted from the cache. Closes the partition's {@link RecordWriter} + * and collects its {@link DataFile}. + */ + private void closeWriterForPartition(RemovalNotification removal) { + final PartitionKey pk = Preconditions.checkStateNotNull(removal.getKey()); + final RecordWriter recordWriter = Preconditions.checkStateNotNull(removal.getValue()); + try { + recordWriter.close(); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Encountered an error when closing data writer for table '%s', partition %s", + icebergDestination.getTableIdentifier(), pk), + e); + } + openWriters--; + dataFiles.add(recordWriter.getDataFile()); + } + /** * Computes the partition key for this Iceberg {@link Record} and writes it using the * appropriate {@link RecordWriter}, creating new writers as needed. @@ -230,6 +242,7 @@ public boolean write(WindowedValue icebergDestination, Row r * Closes all remaining writers and collects all their {@link DataFile}s. Writes one {@link * ManifestFile} per windowed table destination. */ + @Override public void close() throws IOException { for (Map.Entry, DestinationState> windowedDestinationAndState : destinations.entrySet()) { @@ -267,7 +280,7 @@ public void close() throws IOException { state.dataFiles.clear(); } destinations.clear(); - Preconditions.checkArgument( + checkArgument( openWriters == 0, "Expected all data writers to be closed, but found %s data writer(s) still open", openWriters); @@ -280,7 +293,7 @@ public void close() throws IOException { * called. */ public Map, List> getManifestFiles() { - Preconditions.checkState( + checkState( isClosed, "Please close this %s before retrieving its manifest files.", getClass().getSimpleName()); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index 3044768143b..65cc3f3c305 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -96,13 +96,14 @@ public void processElement( IcebergDestination destination = dynamicDestinations.instantiateDestination(destMetadata); WindowedValue windowedDestination = WindowedValue.of(destination, window.maxTimestamp(), window, pane); - RecordWriterManager writer = - new RecordWriterManager(getCatalog(), filePrefix, maxFileSize, Integer.MAX_VALUE); - - for (Row e : element.getValue()) { - writer.write(windowedDestination, e); + RecordWriterManager writer; + try (RecordWriterManager openWriter = + new RecordWriterManager(getCatalog(), filePrefix, maxFileSize, Integer.MAX_VALUE)) { + writer = openWriter; + for (Row e : element.getValue()) { + writer.write(windowedDestination, e); + } } - writer.close(); List manifestFiles = Preconditions.checkNotNull(writer.getManifestFiles().get(windowedDestination)); From 23286c4afee6dac3afb4ab6c1c128f681cf9f647 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 13 Aug 2024 23:25:49 -0400 Subject: [PATCH 14/15] revert readability change --- .../sdk/io/iceberg/RecordWriterManager.java | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 0b90bb96bca..b16f0caeb81 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -107,30 +107,26 @@ class DestinationState { this.writers = CacheBuilder.newBuilder() .expireAfterAccess(1, TimeUnit.MINUTES) - .removalListener(this::closeWriterForPartition) + .removalListener( + (RemovalNotification removal) -> { + final PartitionKey pk = Preconditions.checkStateNotNull(removal.getKey()); + final RecordWriter recordWriter = + Preconditions.checkStateNotNull(removal.getValue()); + try { + recordWriter.close(); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Encountered an error when closing data writer for table '%s', partition %s", + icebergDestination.getTableIdentifier(), pk), + e); + } + openWriters--; + dataFiles.add(recordWriter.getDataFile()); + }) .build(); } - /** - * Called when a writer is evicted from the cache. Closes the partition's {@link RecordWriter} - * and collects its {@link DataFile}. - */ - private void closeWriterForPartition(RemovalNotification removal) { - final PartitionKey pk = Preconditions.checkStateNotNull(removal.getKey()); - final RecordWriter recordWriter = Preconditions.checkStateNotNull(removal.getValue()); - try { - recordWriter.close(); - } catch (IOException e) { - throw new RuntimeException( - String.format( - "Encountered an error when closing data writer for table '%s', partition %s", - icebergDestination.getTableIdentifier(), pk), - e); - } - openWriters--; - dataFiles.add(recordWriter.getDataFile()); - } - /** * Computes the partition key for this Iceberg {@link Record} and writes it using the * appropriate {@link RecordWriter}, creating new writers as needed. From e1cfd3540aaf8919bbfe48a2eaedc859b0b778da Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 14 Aug 2024 10:54:06 -0400 Subject: [PATCH 15/15] add to changes md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index d082f03fd31..cca3265783b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Improvements to the performance of BigqueryIO when using withPropagateSuccessfulStorageApiWrites(true) method (Java) ([#31840](https://github.com/apache/beam/pull/31840)). +* [Managed Iceberg] Added support for writing to partitioned tables ([#32102](https://github.com/apache/beam/pull/32102)) ## New Features / Improvements