Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gszadovszky committed Feb 23, 2024
1 parent cb5bd9c commit f5d9ae6
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,12 @@ public ByteBuffer toByteBuffer() throws IOException {

/**
* Copies the content of this {@link BytesInput} object to a newly created {@link ByteBuffer} and returns it wrapped
* in a {@link BytesInput} object. <strong>The data content shall be able to be fit in a {@link ByteBuffer}
* object!</strong>
* in a {@link BytesInput} object.
*
* <strong>The data content shall be able to be fit in a {@link ByteBuffer} object!</strong> (In case of the size of
* this {@link BytesInput} object cannot fit in an {@code int}, an {@link ArithmeticException} will be thrown. The
* {@code allocator} might throw an {@link OutOfMemoryError} if it is unable to allocate the required
* {@link ByteBuffer}.)
*
* @param allocator the allocator to be used for creating the new {@link ByteBuffer} object
* @param callback the callback called with the newly created {@link ByteBuffer} object; to be used for make it
Expand Down Expand Up @@ -279,7 +283,11 @@ public BytesInput copy(ByteBufferReleaser releaser) {
* {@link ByteBuffer} object if this {@link BytesInput} is not backed by a single {@link ByteBuffer}. In the latter
* case the specified {@link ByteBufferAllocator} object will be used. In case of allocation the specified callback
* will be invoked so the release of the newly allocated {@link ByteBuffer} object can be released at a proper time.
* <strong>The data content shall be able to be fit in a {@link ByteBuffer} object!</strong>
*
* <strong>The data content shall be able to be fit in a {@link ByteBuffer} object!</strong> (In case of the size of
* this {@link BytesInput} object cannot fit in an {@code int}, an {@link ArithmeticException} will be thrown. The
* {@code allocator} might throw an {@link OutOfMemoryError} if it is unable to allocate the required
* {@link ByteBuffer}.)
*
* @param allocator the {@link ByteBufferAllocator} to be used for potentially allocating a new {@link ByteBuffer}
* object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.nio.ByteBuffer;

/**
* A special {@link ByteBufferAllocator} implementation that keeps one {@link ByteBuffer} object and reuse it at the
* A special {@link ByteBufferAllocator} implementation that keeps one {@link ByteBuffer} object and reuses it at the
* next {@link #allocate(int)} call. The {@link #close()} shall be called when this allocator is not needed anymore to
* really release the one buffer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.parquet.format.Util.writeColumnMetaData;
import static org.apache.parquet.format.Util.writePageHeader;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -1501,7 +1502,7 @@ public FileMetaDataAndRowGroupOffsetInfo(
}

public ParquetMetadata readParquetMetadata(
final InputStream from,
final InputStream fromInputStream,
MetadataFilter filter,
final InternalFileDecryptor fileDecryptor,
final boolean encryptedFooter,
Expand All @@ -1513,7 +1514,18 @@ public ParquetMetadata readParquetMetadata(
(encryptedFooter ? AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null);

// Mark the beginning of the footer for verifyFooterIntegrity
from.mark(combinedFooterLength);
final InputStream from;
if (fileDecryptor.checkFooterIntegrity()) {
// fromInputStream should already support marking but let's be on the safe side
if (!fromInputStream.markSupported()) {
from = new BufferedInputStream(fromInputStream, combinedFooterLength);
} else {
from = fromInputStream;
}
from.mark(combinedFooterLength);
} else {
from = fromInputStream;
}

FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo =
filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ public ParquetFileWriter(
statisticsTruncateLength,
pageWriteChecksumEnabled,
null,
null,
null);
}

Expand All @@ -383,6 +384,7 @@ public ParquetFileWriter(
statisticsTruncateLength,
pageWriteChecksumEnabled,
encryptionProperties,
null,
null);
}

Expand All @@ -392,24 +394,21 @@ public ParquetFileWriter(
Mode mode,
long rowGroupSize,
int maxPaddingSize,
int columnIndexTruncateLength,
int statisticsTruncateLength,
boolean pageWriteChecksumEnabled,
FileEncryptionProperties encryptionProperties,
ByteBufferAllocator allocator)
ParquetProperties props)
throws IOException {
this(
file,
schema,
mode,
rowGroupSize,
maxPaddingSize,
columnIndexTruncateLength,
statisticsTruncateLength,
pageWriteChecksumEnabled,
props.getColumnIndexTruncateLength(),
props.getStatisticsTruncateLength(),
props.getPageWriteChecksumEnabled(),
encryptionProperties,
null,
allocator);
props.getAllocator());
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,11 +505,8 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
mode,
blockSize,
maxPaddingSize,
props.getColumnIndexTruncateLength(),
props.getStatisticsTruncateLength(),
props.getPageWriteChecksumEnabled(),
encryptionProperties,
props.getAllocator());
props);
w.start();

float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO, MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,8 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport
mode,
rowGroupSize,
maxPaddingSize,
encodingProps.getColumnIndexTruncateLength(),
encodingProps.getStatisticsTruncateLength(),
encodingProps.getPageWriteChecksumEnabled(),
encryptionProperties,
encodingProps.getAllocator());
encodingProps);
fileWriter.start();

this.codecFactory = codecFactory;
Expand Down

0 comments on commit f5d9ae6

Please sign in to comment.