From f5d9ae6f2a18839dc7acf5b2f52594e86f6a1ca4 Mon Sep 17 00:00:00 2001 From: Gabor Szadovszky Date: Fri, 23 Feb 2024 10:05:36 +0100 Subject: [PATCH] Addressing comments --- .../org/apache/parquet/bytes/BytesInput.java | 14 +++++++++++--- .../bytes/ReusingByteBufferAllocator.java | 2 +- .../converter/ParquetMetadataConverter.java | 16 ++++++++++++++-- .../apache/parquet/hadoop/ParquetFileWriter.java | 15 +++++++-------- .../parquet/hadoop/ParquetOutputFormat.java | 5 +---- .../org/apache/parquet/hadoop/ParquetWriter.java | 5 +---- 6 files changed, 35 insertions(+), 22 deletions(-) diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java index 2ff5ee85ae..88bb1da7cf 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -250,8 +250,12 @@ public ByteBuffer toByteBuffer() throws IOException { /** * Copies the content of this {@link BytesInput} object to a newly created {@link ByteBuffer} and returns it wrapped - * in a {@link BytesInput} object. The data content shall be able to be fit in a {@link ByteBuffer} - * object! + * in a {@link BytesInput} object. + * + * The data content shall be able to be fit in a {@link ByteBuffer} object! (In case of the size of + * this {@link BytesInput} object cannot fit in an {@code int}, an {@link ArithmeticException} will be thrown. The + * {@code allocator} might throw an {@link OutOfMemoryError} if it is unable to allocate the required + * {@link ByteBuffer}.) * * @param allocator the allocator to be used for creating the new {@link ByteBuffer} object * @param callback the callback called with the newly created {@link ByteBuffer} object; to be used for make it @@ -279,7 +283,11 @@ public BytesInput copy(ByteBufferReleaser releaser) { * {@link ByteBuffer} object if this {@link BytesInput} is not backed by a single {@link ByteBuffer}. In the latter * case the specified {@link ByteBufferAllocator} object will be used. In case of allocation the specified callback * will be invoked so the release of the newly allocated {@link ByteBuffer} object can be released at a proper time. - * The data content shall be able to be fit in a {@link ByteBuffer} object! + * + * The data content shall be able to be fit in a {@link ByteBuffer} object! (In case of the size of + * this {@link BytesInput} object cannot fit in an {@code int}, an {@link ArithmeticException} will be thrown. The + * {@code allocator} might throw an {@link OutOfMemoryError} if it is unable to allocate the required + * {@link ByteBuffer}.) * * @param allocator the {@link ByteBufferAllocator} to be used for potentially allocating a new {@link ByteBuffer} * object diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ReusingByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ReusingByteBufferAllocator.java index 2a36c6eb59..83b77b0ca9 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ReusingByteBufferAllocator.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ReusingByteBufferAllocator.java @@ -21,7 +21,7 @@ import java.nio.ByteBuffer; /** - * A special {@link ByteBufferAllocator} implementation that keeps one {@link ByteBuffer} object and reuse it at the + * A special {@link ByteBufferAllocator} implementation that keeps one {@link ByteBuffer} object and reuses it at the * next {@link #allocate(int)} call. The {@link #close()} shall be called when this allocator is not needed anymore to * really release the one buffer. */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 33b2a40a95..178a6786f3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -25,6 +25,7 @@ import static org.apache.parquet.format.Util.writeColumnMetaData; import static org.apache.parquet.format.Util.writePageHeader; +import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -1501,7 +1502,7 @@ public FileMetaDataAndRowGroupOffsetInfo( } public ParquetMetadata readParquetMetadata( - final InputStream from, + final InputStream fromInputStream, MetadataFilter filter, final InternalFileDecryptor fileDecryptor, final boolean encryptedFooter, @@ -1513,7 +1514,18 @@ public ParquetMetadata readParquetMetadata( (encryptedFooter ? AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null); // Mark the beginning of the footer for verifyFooterIntegrity - from.mark(combinedFooterLength); + final InputStream from; + if (fileDecryptor.checkFooterIntegrity()) { + // fromInputStream should already support marking but let's be on the safe side + if (!fromInputStream.markSupported()) { + from = new BufferedInputStream(fromInputStream, combinedFooterLength); + } else { + from = fromInputStream; + } + from.mark(combinedFooterLength); + } else { + from = fromInputStream; + } FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo = filter.accept(new MetadataFilterVisitor() { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 6a13521f95..9867964def 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -359,6 +359,7 @@ public ParquetFileWriter( statisticsTruncateLength, pageWriteChecksumEnabled, null, + null, null); } @@ -383,6 +384,7 @@ public ParquetFileWriter( statisticsTruncateLength, pageWriteChecksumEnabled, encryptionProperties, + null, null); } @@ -392,11 +394,8 @@ public ParquetFileWriter( Mode mode, long rowGroupSize, int maxPaddingSize, - int columnIndexTruncateLength, - int statisticsTruncateLength, - boolean pageWriteChecksumEnabled, FileEncryptionProperties encryptionProperties, - ByteBufferAllocator allocator) + ParquetProperties props) throws IOException { this( file, @@ -404,12 +403,12 @@ public ParquetFileWriter( mode, rowGroupSize, maxPaddingSize, - columnIndexTruncateLength, - statisticsTruncateLength, - pageWriteChecksumEnabled, + props.getColumnIndexTruncateLength(), + props.getStatisticsTruncateLength(), + props.getPageWriteChecksumEnabled(), encryptionProperties, null, - allocator); + props.getAllocator()); } @Deprecated diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index fb34b04739..37a551cdea 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -505,11 +505,8 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp mode, blockSize, maxPaddingSize, - props.getColumnIndexTruncateLength(), - props.getStatisticsTruncateLength(), - props.getPageWriteChecksumEnabled(), encryptionProperties, - props.getAllocator()); + props); w.start(); float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO, MemoryManager.DEFAULT_MEMORY_POOL_RATIO); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 9eb68286be..5922517cf6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -398,11 +398,8 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport mode, rowGroupSize, maxPaddingSize, - encodingProps.getColumnIndexTruncateLength(), - encodingProps.getStatisticsTruncateLength(), - encodingProps.getPageWriteChecksumEnabled(), encryptionProperties, - encodingProps.getAllocator()); + encodingProps); fileWriter.start(); this.codecFactory = codecFactory;