From 29a86b413761a7b05644a4fee239219339f55bae Mon Sep 17 00:00:00 2001 From: Gabor Szadovszky Date: Wed, 28 Feb 2024 16:44:52 +0100 Subject: [PATCH] PARQUET-2437: Avoid flushing at Parquet writes after an exception --- .../column/impl/ColumnWriteStoreBase.java | 17 +- .../parquet/column/impl/ColumnWriterBase.java | 184 ++++++++++++------ .../parquet/column/impl/StatusManager.java | 42 ++++ .../hadoop/InternalParquetRecordWriter.java | 52 +++-- .../filter2/recordlevel/PhoneBookWriter.java | 6 +- .../hadoop/TestParquetWriterError.java | 170 ++++++++++++++++ 6 files changed, 384 insertions(+), 87 deletions(-) create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/impl/StatusManager.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java index d127c1ac64..e2d26d8c28 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import org.apache.parquet.column.ColumnDescriptor; @@ -57,6 +58,7 @@ private interface ColumnWriterProvider { private final long thresholdTolerance; private long rowCount; private long rowCountForNextSizeCheck; + private StatusManager statusManager = StatusManager.create(); // To be used by the deprecated constructor of ColumnWriteStoreV1 @Deprecated @@ -73,7 +75,7 @@ private interface ColumnWriterProvider { public ColumnWriter getColumnWriter(ColumnDescriptor path) { ColumnWriterBase column = columns.get(path); if (column == null) { - column = createColumnWriter(path, pageWriteStore.getPageWriter(path), null, props); + column = createColumnWriterBase(path, pageWriteStore.getPageWriter(path), null, props); columns.put(path, column); } return column; @@ -87,7 +89,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) { Map mcolumns = new TreeMap<>(); for (ColumnDescriptor path : schema.getColumns()) { PageWriter pageWriter = pageWriteStore.getPageWriter(path); - mcolumns.put(path, createColumnWriter(path, pageWriter, null, props)); + mcolumns.put(path, createColumnWriterBase(path, pageWriter, null, props)); } this.columns = unmodifiableMap(mcolumns); @@ -114,9 +116,9 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) { PageWriter pageWriter = pageWriteStore.getPageWriter(path); if (props.isBloomFilterEnabled(path)) { BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path); - mcolumns.put(path, createColumnWriter(path, pageWriter, bloomFilterWriter, props)); + mcolumns.put(path, createColumnWriterBase(path, pageWriter, bloomFilterWriter, props)); } else { - mcolumns.put(path, createColumnWriter(path, pageWriter, null, props)); + mcolumns.put(path, createColumnWriterBase(path, pageWriter, null, props)); } } this.columns = unmodifiableMap(mcolumns); @@ -131,6 +133,13 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) { }; } + private ColumnWriterBase createColumnWriterBase( + ColumnDescriptor path, PageWriter pageWriter, BloomFilterWriter bloomFilterWriter, ParquetProperties props) { + ColumnWriterBase columnWriterBase = createColumnWriter(path, pageWriter, bloomFilterWriter, props); + columnWriterBase.initStatusManager(statusManager); + return columnWriterBase; + } + abstract ColumnWriterBase createColumnWriter( ColumnDescriptor path, PageWriter pageWriter, BloomFilterWriter bloomFilterWriter, ParquetProperties props); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java index e0d0e1a19a..e04724eb3b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.impl; import java.io.IOException; +import java.util.Objects; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnWriter; import org.apache.parquet.column.ParquetProperties; @@ -52,6 +53,7 @@ abstract class ColumnWriterBase implements ColumnWriter { private long rowsWrittenSoFar = 0; private int pageRowCount; + private StatusManager statusManager = StatusManager.create(); private final ColumnValueCollector collector; @@ -74,6 +76,10 @@ abstract class ColumnWriterBase implements ColumnWriter { this.collector = new ColumnValueCollector(path, bloomFilterWriter, props); } + void initStatusManager(StatusManager statusManager) { + this.statusManager = Objects.requireNonNull(statusManager); + } + abstract ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path); abstract ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path); @@ -103,10 +109,15 @@ private void repetitionLevel(int repetitionLevel) { @Override public void writeNull(int repetitionLevel, int definitionLevel) { if (DEBUG) log(null, repetitionLevel, definitionLevel); - repetitionLevel(repetitionLevel); - definitionLevel(definitionLevel); - collector.writeNull(repetitionLevel, definitionLevel); - ++valueCount; + try { + repetitionLevel(repetitionLevel); + definitionLevel(definitionLevel); + collector.writeNull(repetitionLevel, definitionLevel); + ++valueCount; + } catch (Throwable e) { + statusManager.abort(); + throw e; + } } @Override @@ -135,11 +146,16 @@ public long getBufferedSizeInMemory() { @Override public void write(double value, int repetitionLevel, int definitionLevel) { if (DEBUG) log(value, repetitionLevel, definitionLevel); - repetitionLevel(repetitionLevel); - definitionLevel(definitionLevel); - dataColumn.writeDouble(value); - collector.write(value, repetitionLevel, definitionLevel); - ++valueCount; + try { + repetitionLevel(repetitionLevel); + definitionLevel(definitionLevel); + dataColumn.writeDouble(value); + collector.write(value, repetitionLevel, definitionLevel); + ++valueCount; + } catch (Throwable e) { + statusManager.abort(); + throw e; + } } /** @@ -152,11 +168,16 @@ public void write(double value, int repetitionLevel, int definitionLevel) { @Override public void write(float value, int repetitionLevel, int definitionLevel) { if (DEBUG) log(value, repetitionLevel, definitionLevel); - repetitionLevel(repetitionLevel); - definitionLevel(definitionLevel); - dataColumn.writeFloat(value); - collector.write(value, repetitionLevel, definitionLevel); - ++valueCount; + try { + repetitionLevel(repetitionLevel); + definitionLevel(definitionLevel); + dataColumn.writeFloat(value); + collector.write(value, repetitionLevel, definitionLevel); + ++valueCount; + } catch (Throwable e) { + statusManager.abort(); + throw e; + } } /** @@ -169,11 +190,16 @@ public void write(float value, int repetitionLevel, int definitionLevel) { @Override public void write(Binary value, int repetitionLevel, int definitionLevel) { if (DEBUG) log(value, repetitionLevel, definitionLevel); - repetitionLevel(repetitionLevel); - definitionLevel(definitionLevel); - dataColumn.writeBytes(value); - collector.write(value, repetitionLevel, definitionLevel); - ++valueCount; + try { + repetitionLevel(repetitionLevel); + definitionLevel(definitionLevel); + dataColumn.writeBytes(value); + collector.write(value, repetitionLevel, definitionLevel); + ++valueCount; + } catch (Throwable e) { + statusManager.abort(); + throw e; + } } /** @@ -186,11 +212,16 @@ public void write(Binary value, int repetitionLevel, int definitionLevel) { @Override public void write(boolean value, int repetitionLevel, int definitionLevel) { if (DEBUG) log(value, repetitionLevel, definitionLevel); - repetitionLevel(repetitionLevel); - definitionLevel(definitionLevel); - dataColumn.writeBoolean(value); - collector.write(value, repetitionLevel, definitionLevel); - ++valueCount; + try { + repetitionLevel(repetitionLevel); + definitionLevel(definitionLevel); + dataColumn.writeBoolean(value); + collector.write(value, repetitionLevel, definitionLevel); + ++valueCount; + } catch (Throwable e) { + statusManager.abort(); + throw e; + } } /** @@ -203,11 +234,16 @@ public void write(boolean value, int repetitionLevel, int definitionLevel) { @Override public void write(int value, int repetitionLevel, int definitionLevel) { if (DEBUG) log(value, repetitionLevel, definitionLevel); - repetitionLevel(repetitionLevel); - definitionLevel(definitionLevel); - dataColumn.writeInteger(value); - collector.write(value, repetitionLevel, definitionLevel); - ++valueCount; + try { + repetitionLevel(repetitionLevel); + definitionLevel(definitionLevel); + dataColumn.writeInteger(value); + collector.write(value, repetitionLevel, definitionLevel); + ++valueCount; + } catch (Throwable e) { + statusManager.abort(); + throw e; + } } /** @@ -220,11 +256,16 @@ public void write(int value, int repetitionLevel, int definitionLevel) { @Override public void write(long value, int repetitionLevel, int definitionLevel) { if (DEBUG) log(value, repetitionLevel, definitionLevel); - repetitionLevel(repetitionLevel); - definitionLevel(definitionLevel); - dataColumn.writeLong(value); - collector.write(value, repetitionLevel, definitionLevel); - ++valueCount; + try { + repetitionLevel(repetitionLevel); + definitionLevel(definitionLevel); + dataColumn.writeLong(value); + collector.write(value, repetitionLevel, definitionLevel); + ++valueCount; + } catch (Throwable e) { + statusManager.abort(); + throw e; + } } /** @@ -232,18 +273,27 @@ public void write(long value, int repetitionLevel, int definitionLevel) { * Is called right after writePage */ void finalizeColumnChunk() { - final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose(); - if (dictionaryPage != null) { - if (DEBUG) LOG.debug("write dictionary"); - try { - pageWriter.writeDictionaryPage(dictionaryPage); - } catch (IOException e) { - throw new ParquetEncodingException("could not write dictionary page for " + path, e); - } - dataColumn.resetDictionary(); + if (statusManager.isAborted()) { + // We are aborting -> nothing to be done + return; } + try { + final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose(); + if (dictionaryPage != null) { + if (DEBUG) LOG.debug("write dictionary"); + try { + pageWriter.writeDictionaryPage(dictionaryPage); + } catch (IOException e) { + throw new ParquetEncodingException("could not write dictionary page for " + path, e); + } + dataColumn.resetDictionary(); + } - collector.finalizeColumnChunk(); + collector.finalizeColumnChunk(); + } catch (Throwable t) { + statusManager.abort(); + throw t; + } } /** @@ -317,26 +367,36 @@ void writePage() { if (valueCount == 0) { throw new ParquetEncodingException("writing empty page"); } - this.rowsWrittenSoFar += pageRowCount; - if (DEBUG) LOG.debug("write page"); + if (statusManager.isAborted()) { + // We are aborting -> nothing to be done + return; + } try { - writePage( - pageRowCount, - valueCount, - collector.getStatistics(), - collector.getSizeStatistics(), - repetitionLevelColumn, - definitionLevelColumn, - dataColumn); - } catch (IOException e) { - throw new ParquetEncodingException("could not write page for " + path, e); + this.rowsWrittenSoFar += pageRowCount; + if (DEBUG) + LOG.debug("write page"); + try { + writePage( + pageRowCount, + valueCount, + collector.getStatistics(), + collector.getSizeStatistics(), + repetitionLevelColumn, + definitionLevelColumn, + dataColumn); + } catch (IOException e) { + throw new ParquetEncodingException("could not write page for " + path, e); + } + repetitionLevelColumn.reset(); + definitionLevelColumn.reset(); + dataColumn.reset(); + valueCount = 0; + collector.resetPageStatistics(); + pageRowCount = 0; + } catch(Throwable t) { + statusManager.abort(); + throw t; } - repetitionLevelColumn.reset(); - definitionLevelColumn.reset(); - dataColumn.reset(); - valueCount = 0; - collector.resetPageStatistics(); - pageRowCount = 0; } abstract void writePage( diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/StatusManager.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/StatusManager.java new file mode 100644 index 0000000000..76b5c5627c --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/StatusManager.java @@ -0,0 +1,42 @@ +package org.apache.parquet.column.impl; + +/** + * Interface to manage the current error status. It is used to share the status of all the different (column, page, + * etc.) writer/reader instances. + */ +interface StatusManager { + + /** + * Creates an instance of the default {@link StatusManager} implementation. + * + * @return the newly created {@link StatusManager} instance + */ + static StatusManager create() { + return new StatusManager() { + private boolean aborted; + + @Override + public void abort() { + aborted = true; + } + + @Override + public boolean isAborted() { + return aborted; + } + }; + } + + /** + * To be invoked if the current process is to be aborted. For example in case of an exception is occurred during + * writing a page. + */ + void abort(); + + /** + * Returns whether the current process is aborted. + * + * @return {@code true} if the current process is aborted, {@code false} otherwise + */ + boolean isAborted(); +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index 20809089a4..0cc05d6d75 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -68,6 +68,7 @@ class InternalParquetRecordWriter { private InternalFileEncryptor fileEncryptor; private int rowGroupOrdinal; + private boolean aborted; /** * @param parquetFileWriter the file to write to @@ -127,6 +128,9 @@ private void initStore() { public void close() throws IOException, InterruptedException { if (!closed) { try { + if (aborted) { + return; + } flushRowGroupToStore(); FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite(); Map finalMetadata = new HashMap(extraMetaData); @@ -144,9 +148,14 @@ public void close() throws IOException, InterruptedException { } public void write(T value) throws IOException, InterruptedException { - writeSupport.write(value); - ++recordCount; - checkBlockSizeReached(); + try { + writeSupport.write(value); + ++recordCount; + checkBlockSizeReached(); + } catch (Throwable t) { + aborted = true; + throw t; + } } /** @@ -187,25 +196,28 @@ private void checkBlockSizeReached() throws IOException { } private void flushRowGroupToStore() throws IOException { - recordConsumer.flush(); - LOG.debug("Flushing mem columnStore to file. allocated memory: {}", columnStore.getAllocatedSize()); - if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) { - LOG.warn("Too much memory used: {}", columnStore.memUsageString()); - } + try { + recordConsumer.flush(); + LOG.debug("Flushing mem columnStore to file. allocated memory: {}", columnStore.getAllocatedSize()); + if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) { + LOG.warn("Too much memory used: {}", columnStore.memUsageString()); + } - if (recordCount > 0) { - rowGroupOrdinal++; - parquetFileWriter.startBlock(recordCount); - columnStore.flush(); - pageStore.flushToFileWriter(parquetFileWriter); - recordCount = 0; - parquetFileWriter.endBlock(); - this.nextRowGroupSize = Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold); + if (recordCount > 0) { + rowGroupOrdinal++; + parquetFileWriter.startBlock(recordCount); + columnStore.flush(); + pageStore.flushToFileWriter(parquetFileWriter); + recordCount = 0; + parquetFileWriter.endBlock(); + this.nextRowGroupSize = Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold); + } + } finally { + AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore); + columnStore = null; + pageStore = null; + bloomFilterWriteStore = null; } - AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore); - columnStore = null; - pageStore = null; - bloomFilterWriteStore = null; } long getRowGroupSizeThreshold() { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java index 97d836aec9..d4a77879e1 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java @@ -56,7 +56,11 @@ public class PhoneBookWriter { + " }\n" + "}\n"; - private static final MessageType schema = MessageTypeParser.parseMessageType(schemaString); + private static final MessageType schema = getSchema(); + + public static MessageType getSchema() { + return MessageTypeParser.parseMessageType(schemaString); + } public static class Location { private final Double lon; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java new file mode 100644 index 0000000000..e37d8d04bb --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java @@ -0,0 +1,170 @@ +package org.apache.parquet.hadoop; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.filter2.recordlevel.PhoneBookWriter; +import org.apache.parquet.hadoop.codec.CleanUtil; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.LocalOutputFile; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Unit test to check how Parquet writing behaves in case of an error happens during the writes. We use an OOM because + * that is the most tricky to handle. In this case we shall avoid flushing since it may cause writing to already + * released memory spaces. + *

+ * To catch the potential issue of writing into released ByteBuffer objects, direct memory allocation is used and at the + * release() call we actually release the related direct memory and zero the address inside the ByteBuffer object. As a + * result, a subsequent read/write call on the related ByteBuffer object will crash the whole jvm. (Unfortunately, there + * is no better way to test this.) To avoid crashing the test executor jvm, the code of this test is executed in a + * separate process. + */ +public class TestParquetWriterError { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testInSeparateProcess() throws IOException, InterruptedException { + String outputFile = tmpFolder.newFile("out.parquet").toString(); + + String classpath = System.getProperty("java.class.path"); + String javaPath = Paths.get(System.getProperty("java.home"), "bin", "java").toAbsolutePath().toString(); + Process process = new ProcessBuilder() + .command( + javaPath, + "-cp", + classpath, + Main.class.getName(), + outputFile) + .redirectError(ProcessBuilder.Redirect.INHERIT) + .redirectOutput(ProcessBuilder.Redirect.INHERIT) + .start(); + Assert.assertEquals("Test process exited with a non-zero return code. See previous logs for details.", 0, + process.waitFor()); + } + + /** + * The class to be used to execute this test in a separate thread. + */ + public static class Main { + + private static final Random RANDOM = new Random(2024_02_27_14_20L); + + // See the release() implementation in createAllocator() + private static final Field BUFFER_ADDRESS; + + static { + try { + Class bufferClass = Class.forName("java.nio.Buffer"); + BUFFER_ADDRESS = bufferClass.getDeclaredField("address"); + BUFFER_ADDRESS.setAccessible(true); + } catch (ClassNotFoundException | NoSuchFieldException e) { + throw new RuntimeException(e); + } + } + + private static Group generateNext() { + PhoneBookWriter.Location location; + double chance = RANDOM.nextDouble(); + if (chance < .45) { + location = new PhoneBookWriter.Location(RANDOM.nextDouble(), RANDOM.nextDouble()); + } else if (chance < .9) { + location = new PhoneBookWriter.Location(RANDOM.nextDouble(), null); + } else { + location = null; + } + List phoneNumbers; + if (RANDOM.nextDouble() < .1) { + phoneNumbers = null; + } else { + int n = RANDOM.nextInt(4); + phoneNumbers = new ArrayList<>(n); + for (int i = 0; i < n; ++i) { + String kind = RANDOM.nextDouble() < .1 ? null : "kind" + RANDOM.nextInt(5); + phoneNumbers.add(new PhoneBookWriter.PhoneNumber(RANDOM.nextInt(), kind)); + } + } + String name = RANDOM.nextDouble() < .1 ? null : "name" + RANDOM.nextLong(); + PhoneBookWriter.User user = new PhoneBookWriter.User(RANDOM.nextLong(), name, phoneNumbers, location); + return PhoneBookWriter.groupFromUser(user); + } + + private static TrackingByteBufferAllocator createAllocator(final int oomAt) { + return TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator() { + private int counter = 0; + + @Override + public ByteBuffer allocate(int size) { + if (++counter >= oomAt) { + Assert.assertEquals("There should not be any additional allocations after an OOM", oomAt, counter); + throw new OutOfMemoryError("Artificial OOM to fail write"); + } + return super.allocate(size); + } + + @Override + public void release(ByteBuffer b) { + CleanUtil.cleanDirectBuffer(b); + + // It seems, if the size of the buffers are small, the related memory space is not given back to the OS, so + // writing to them after release does not cause any identifiable issue. Therefore, we explicitly zero the + // address, so the jvm crashes for a subsequent access. + try { + BUFFER_ADDRESS.setLong(b, 0L); + } catch (IllegalAccessException e) { + throw new RuntimeException("Unable to zero direct ByteBuffer address", e); + } + } + }); + } + + public static void main(String[] args) throws Throwable { + CompressionCodecName[] codecs = { + CompressionCodecName.UNCOMPRESSED, + CompressionCodecName.GZIP, + CompressionCodecName.SNAPPY, + CompressionCodecName.ZSTD, + CompressionCodecName.LZ4_RAW}; + for (int cycle = 0; cycle < 50; ++cycle) { + try (TrackingByteBufferAllocator allocator = createAllocator(RANDOM.nextInt(100) + 1); + ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(Paths.get(args[0]))) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withType(PhoneBookWriter.getSchema()) + .withAllocator(allocator) + .withCodecFactory(CodecFactory.createDirectCodecFactory( + new Configuration(), + allocator, + ParquetProperties.DEFAULT_PAGE_SIZE)) + .withCompressionCodec(codecs[RANDOM.nextInt(codecs.length)]) + .build()) { + for (int i = 0; i < 100_000; ++i) { + writer.write(generateNext()); + } + Assert.fail("An OOM should have been thrown"); + } catch (OutOfMemoryError oom) { + Throwable[] suppressed = oom.getSuppressed(); + // No exception should be suppressed after the expected OOM: + // It would mean that a close() call fails with an exception + if (suppressed != null && suppressed.length > 0) { + throw suppressed[0]; + } + } + } + } + } +}