diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 95394cdd37d02..819f978a862c1 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -135,9 +135,10 @@ def _gold_tests(self, gold_dir): skip.add("Rust") if prefix == '2.0.0-compression': skip.add("Go") - skip.add("Java") skip.add("JS") skip.add("Rust") + if name == 'zstd': + skip.add("Java") yield datagen.File(name, None, None, skip=skip, path=out_path) def _run_test_cases(self, producer, consumer, case_runner, diff --git a/java/compression/pom.xml b/java/compression/pom.xml new file mode 100644 index 0000000000000..9a6ab3508ed40 --- /dev/null +++ b/java/compression/pom.xml @@ -0,0 +1,51 @@ + + + + 4.0.0 + + org.apache.arrow + arrow-java-root + 4.0.0-SNAPSHOT + + arrow-compression + Arrow Compression + (Experimental/Contrib) A library for working with the compression/decompression of Arrow data. + + + + org.apache.arrow + arrow-vector + ${project.version} + ${arrow.vector.classifier} + + + org.apache.arrow + arrow-memory-core + ${project.version} + + + org.apache.arrow + arrow-memory-unsafe + ${project.version} + test + + + org.apache.commons + commons-compress + 1.20 + + + io.netty + netty-common + + + diff --git a/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java b/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java new file mode 100644 index 0000000000000..4becbbe78c964 --- /dev/null +++ b/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; + +/** + * A factory implementation based on Apache Commons library. + */ +public class CommonsCompressionFactory implements CompressionCodec.Factory { + + public static final CommonsCompressionFactory INSTANCE = new CommonsCompressionFactory(); + + @Override + public CompressionCodec createCodec(CompressionUtil.CodecType codecType) { + switch (codecType) { + case LZ4_FRAME: + return new Lz4CompressionCodec(); + default: + throw new IllegalArgumentException("Compression type not supported: " + codecType); + } + } +} diff --git a/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java b/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java new file mode 100644 index 0000000000000..af34a8fdd706f --- /dev/null +++ b/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The uncompressed buffer size exceeds the integer limit"); + + if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; + } + + try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { + // compressed buffer is larger, send the raw buffer + compressedBuffer.close(); + compressedBuffer = CompressionUtil.packageRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { + byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; + PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); + } + + byte[] outBytes = baos.toByteArray(); + + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + + long uncompressedLength = uncompressedBuffer.writerIndex(); + if (!MemoryUtil.LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); + } + // first 8 bytes reserved for uncompressed length, according to the specification + compressedBuffer.setLong(0, uncompressedLength); + + PlatformDependent.copyMemory( + outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { + Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The compressed buffer size exceeds the integer limit"); + + Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, + "Not enough data to decompress."); + + long decompressedLength = compressedBuffer.getLong(0); + if (!MemoryUtil.LITTLE_ENDIAN) { + decompressedLength = Long.reverseBytes(decompressedLength); + } + + if (decompressedLength == 0L) { + // shortcut for empty buffer + compressedBuffer.close(); + return allocator.getEmpty(); + } + + if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) { + // no compression + return CompressionUtil.extractUncompressedBuffer(compressedBuffer); + } + + try { + ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer); + compressedBuffer.close(); + return decompressedBuffer; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) throws IOException { + long decompressedLength = compressedBuffer.getLong(0); + if (!MemoryUtil.LITTLE_ENDIAN) { + decompressedLength = Long.reverseBytes(decompressedLength); + } + + byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)]; + PlatformDependent.copyMemory( + compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes, 0, inBytes.length); + ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength); + try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) { + IOUtils.copy(in, out); + } + + byte[] outBytes = out.toByteArray(); + ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length); + PlatformDependent.copyMemory(outBytes, 0, decompressedBuffer.memoryAddress(), outBytes.length); + decompressedBuffer.writerIndex(decompressedLength); + return decompressedBuffer; + } + + @Override + public CompressionUtil.CodecType getCodecType() { + return CompressionUtil.CodecType.LZ4_FRAME; + } +} diff --git a/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java new file mode 100644 index 0000000000000..52f24e20533ea --- /dev/null +++ b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.NoCompressionCodec; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Test cases for {@link CompressionCodec}s. + */ +@RunWith(Parameterized.class) +public class TestCompressionCodec { + + private final CompressionCodec codec; + + private BufferAllocator allocator; + + private final int vectorLength; + + @Before + public void init() { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void terminate() { + allocator.close(); + } + + public TestCompressionCodec(CompressionUtil.CodecType type, int vectorLength, CompressionCodec codec) { + this.codec = codec; + this.vectorLength = vectorLength; + } + + @Parameterized.Parameters(name = "codec = {0}, length = {1}") + public static Collection getCodecs() { + List params = new ArrayList<>(); + + int[] lengths = new int[] {10, 100, 1000}; + for (int len : lengths) { + CompressionCodec dumbCodec = NoCompressionCodec.INSTANCE; + params.add(new Object[]{dumbCodec.getCodecType(), len, dumbCodec}); + + CompressionCodec lz4Codec = new Lz4CompressionCodec(); + params.add(new Object[]{lz4Codec.getCodecType(), len, lz4Codec}); + } + return params; + } + + private List compressBuffers(List inputBuffers) { + List outputBuffers = new ArrayList<>(inputBuffers.size()); + for (ArrowBuf buf : inputBuffers) { + outputBuffers.add(codec.compress(allocator, buf)); + } + return outputBuffers; + } + + private List deCompressBuffers(List inputBuffers) { + List outputBuffers = new ArrayList<>(inputBuffers.size()); + for (ArrowBuf buf : inputBuffers) { + outputBuffers.add(codec.decompress(allocator, buf)); + } + return outputBuffers; + } + + @Test + public void testCompressFixedWidthBuffers() throws Exception { + // prepare vector to compress + IntVector origVec = new IntVector("vec", allocator); + origVec.allocateNew(vectorLength); + for (int i = 0; i < vectorLength; i++) { + if (i % 10 == 0) { + origVec.setNull(i); + } else { + origVec.set(i, i); + } + } + origVec.setValueCount(vectorLength); + int nullCount = origVec.getNullCount(); + + // compress & decompress + List origBuffers = origVec.getFieldBuffers(); + List compressedBuffers = compressBuffers(origBuffers); + List decompressedBuffers = deCompressBuffers(compressedBuffers); + + assertEquals(2, decompressedBuffers.size()); + + // orchestrate new vector + IntVector newVec = new IntVector("new vec", allocator); + newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, nullCount), decompressedBuffers); + + // verify new vector + assertEquals(vectorLength, newVec.getValueCount()); + for (int i = 0; i < vectorLength; i++) { + if (i % 10 == 0) { + assertTrue(newVec.isNull(i)); + } else { + assertEquals(i, newVec.get(i)); + } + } + + newVec.close(); + AutoCloseables.close(decompressedBuffers); + } + + @Test + public void testCompressVariableWidthBuffers() throws Exception { + // prepare vector to compress + VarCharVector origVec = new VarCharVector("vec", allocator); + origVec.allocateNew(); + for (int i = 0; i < vectorLength; i++) { + if (i % 10 == 0) { + origVec.setNull(i); + } else { + origVec.setSafe(i, String.valueOf(i).getBytes()); + } + } + origVec.setValueCount(vectorLength); + int nullCount = origVec.getNullCount(); + + // compress & decompress + List origBuffers = origVec.getFieldBuffers(); + List compressedBuffers = compressBuffers(origBuffers); + List decompressedBuffers = deCompressBuffers(compressedBuffers); + + assertEquals(3, decompressedBuffers.size()); + + // orchestrate new vector + VarCharVector newVec = new VarCharVector("new vec", allocator); + newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, nullCount), decompressedBuffers); + + // verify new vector + assertEquals(vectorLength, newVec.getValueCount()); + for (int i = 0; i < vectorLength; i++) { + if (i % 10 == 0) { + assertTrue(newVec.isNull(i)); + } else { + assertArrayEquals(String.valueOf(i).getBytes(), newVec.get(i)); + } + } + + newVec.close(); + AutoCloseables.close(decompressedBuffers); + } + + @Test + public void testEmptyBuffer() throws Exception { + final VarBinaryVector origVec = new VarBinaryVector("vec", allocator); + + origVec.allocateNew(vectorLength); + + // Do not set any values (all missing) + origVec.setValueCount(vectorLength); + + final List origBuffers = origVec.getFieldBuffers(); + final List compressedBuffers = compressBuffers(origBuffers); + final List decompressedBuffers = deCompressBuffers(compressedBuffers); + + // orchestrate new vector + VarBinaryVector newVec = new VarBinaryVector("new vec", allocator); + newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, vectorLength), decompressedBuffers); + + // verify new vector + assertEquals(vectorLength, newVec.getValueCount()); + for (int i = 0; i < vectorLength; i++) { + assertTrue(newVec.isNull(i)); + } + + newVec.close(); + AutoCloseables.close(decompressedBuffers); + } +} diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java index 63d2c091eb5c0..16ef39702ca3e 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java @@ -21,6 +21,7 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.security.AccessController; import java.security.PrivilegedAction; @@ -48,6 +49,11 @@ public class MemoryUtil { */ static final long BYTE_BUFFER_ADDRESS_OFFSET; + /** + * If the native byte order is little-endian. + */ + public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + static { try { // try to get the unsafe object @@ -132,7 +138,7 @@ public Object run() { } /** - * Given a {@link ByteBuf}, gets the address the underlying memory space. + * Given a {@link ByteBuffer}, gets the address the underlying memory space. * * @param buf the byte buffer. * @return address of the underlying memory. diff --git a/java/pom.xml b/java/pom.xml index e4f0c78323369..c776b833a1765 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -676,6 +676,7 @@ performance algorithm adapter/avro + compression diff --git a/java/tools/pom.xml b/java/tools/pom.xml index 07fd5ac55524d..375d243515471 100644 --- a/java/tools/pom.xml +++ b/java/tools/pom.xml @@ -32,6 +32,11 @@ ${project.version} ${arrow.vector.classifier} + + org.apache.arrow + arrow-compression + ${project.version} + com.google.guava guava diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java index cd374a97d7103..1db3eeb6449ee 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; +import org.apache.arrow.compression.CommonsCompressionFactory; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; @@ -186,7 +187,7 @@ public void execute(File arrowFile, File jsonFile) throws IOException { JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator); FileInputStream fileInputStream = new FileInputStream(arrowFile); ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), - allocator)) { + allocator, CommonsCompressionFactory.INSTANCE)) { Schema jsonSchema = jsonReader.start(); VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot(); Schema arrowSchema = arrowRoot.getSchema(); diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java index bcf4d1de4d245..6bd3c2fba2ff5 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java @@ -25,6 +25,7 @@ import java.io.OutputStream; import java.nio.channels.Channels; +import org.apache.arrow.compression.CommonsCompressionFactory; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; @@ -40,7 +41,7 @@ public class StreamToFile { */ public static void convert(InputStream in, OutputStream out) throws IOException { BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); - try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) { + try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator, CommonsCompressionFactory.INSTANCE)) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); // load the first batch before instantiating the writer so that we have any dictionaries. // Only writeBatches if we load the first one. diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 3a4b00de5efb9..24be349c2820d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -27,6 +27,7 @@ import org.apache.arrow.util.Collections2; import org.apache.arrow.vector.compression.CompressionCodec; import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Field; @@ -38,13 +39,32 @@ public class VectorLoader { private final VectorSchemaRoot root; + private final CompressionCodec.Factory factory; + + /** + * A flag indicating if decompression is needed. + * This will affect the behavior of releasing buffers. + */ + private boolean decompressionNeeded; + /** * Construct with a root to load and will create children in root based on schema. * * @param root the root to add vectors to based on schema */ public VectorLoader(VectorSchemaRoot root) { + this(root, NoCompressionCodec.Factory.INSTANCE); + } + + /** + * Construct with a root to load and will create children in root based on schema. + * + * @param root the root to add vectors to based on schema. + * @param factory the factory to create codec. + */ + public VectorLoader(VectorSchemaRoot root, CompressionCodec.Factory factory) { this.root = root; + this.factory = factory; } /** @@ -56,7 +76,10 @@ public VectorLoader(VectorSchemaRoot root) { public void load(ArrowRecordBatch recordBatch) { Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); - CompressionCodec codec = CompressionUtil.createCodec(recordBatch.getBodyCompression().getCodec()); + CompressionUtil.CodecType codecType = + CompressionUtil.CodecType.fromCompressionType(recordBatch.getBodyCompression().getCodec()); + decompressionNeeded = codecType != CompressionUtil.CodecType.NO_COMPRESSION; + CompressionCodec codec = decompressionNeeded ? factory.createCodec(codecType) : NoCompressionCodec.INSTANCE; for (FieldVector fieldVector : root.getFieldVectors()) { loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); } @@ -79,10 +102,21 @@ private void loadBuffers( List ownBuffers = new ArrayList<>(bufferLayoutCount); for (int j = 0; j < bufferLayoutCount; j++) { ArrowBuf nextBuf = buffers.next(); - ownBuffers.add(codec.decompress(vector.getAllocator(), nextBuf)); + // for vectors without nulls, the buffer is empty, so there is no need to decompress it. + ArrowBuf bufferToAdd = nextBuf.writerIndex() > 0 ? codec.decompress(vector.getAllocator(), nextBuf) : nextBuf; + ownBuffers.add(bufferToAdd); + if (decompressionNeeded) { + // decompression performed + nextBuf.getReferenceManager().retain(); + } } try { vector.loadFieldBuffers(fieldNode, ownBuffers); + if (decompressionNeeded) { + for (ArrowBuf buf : ownBuffers) { + buf.close(); + } + } } catch (RuntimeException e) { throw new IllegalArgumentException("Could not load buffers for field " + field + ". error message: " + e.getMessage(), e); @@ -100,5 +134,4 @@ private void loadBuffers( } } } - } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java index ce2dd73aab5f5..a6dd8b51fe554 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java @@ -28,11 +28,11 @@ public interface CompressionCodec { /** * Compress a buffer. * @param allocator the allocator for allocating memory for compressed buffer. - * @param unCompressedBuffer the buffer to compress. + * @param uncompressedBuffer the buffer to compress. * Implementation of this method should take care of releasing this buffer. - * @return the compressed buffer. + * @return the compressed buffer */ - ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer); + ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer); /** * Decompress a buffer. @@ -44,8 +44,19 @@ public interface CompressionCodec { ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer); /** - * Gets the name of the codec. - * @return the name of the codec. + * Gets the type of the codec. + * @return the type of the codec. */ - String getCodecName(); + CompressionUtil.CodecType getCodecType(); + + /** + * Factory to create compression codec. + */ + interface Factory { + + /** + * Creates the codec based on the codec type. + */ + CompressionCodec createCodec(CompressionUtil.CodecType codecType); + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index 464f3aa8e9c9e..1deb38c84da05 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -19,6 +19,8 @@ import org.apache.arrow.flatbuf.BodyCompressionMethod; import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.ipc.message.ArrowBodyCompression; /** @@ -26,35 +28,76 @@ */ public class CompressionUtil { + /** + * Compression codec types corresponding to flat buffer implementation in {@link CompressionType}. + */ + public enum CodecType { + + NO_COMPRESSION(NoCompressionCodec.COMPRESSION_TYPE), + + LZ4_FRAME(org.apache.arrow.flatbuf.CompressionType.LZ4_FRAME), + + ZSTD(org.apache.arrow.flatbuf.CompressionType.ZSTD); + + private final byte type; + + CodecType(byte type) { + this.type = type; + } + + public byte getType() { + return type; + } + + /** + * Gets the codec type from the compression type defined in {@link CompressionType}. + */ + public static CodecType fromCompressionType(byte type) { + for (CodecType codecType : values()) { + if (codecType.type == type) { + return codecType; + } + } + return NO_COMPRESSION; + } + } + + public static final long SIZE_OF_UNCOMPRESSED_LENGTH = 8L; + + /** + * Special flag to indicate no compression. + * (e.g. when the compressed buffer has a larger size.) + */ + public static final long NO_COMPRESSION_LENGTH = -1L; + private CompressionUtil() { } /** * Creates the {@link ArrowBodyCompression} object, given the {@link CompressionCodec}. - * The implementation of this method should depend on the values of {@link CompressionType#names}. + * The implementation of this method should depend on the values of + * {@link org.apache.arrow.flatbuf.CompressionType#names}. */ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) { - switch (codec.getCodecName()) { - case "default": - return NoCompressionCodec.DEFAULT_BODY_COMPRESSION; - case "LZ4_FRAME": - return new ArrowBodyCompression(CompressionType.LZ4_FRAME, BodyCompressionMethod.BUFFER); - case "ZSTD": - return new ArrowBodyCompression(CompressionType.ZSTD, BodyCompressionMethod.BUFFER); - default: - throw new IllegalArgumentException("Unknown codec: " + codec.getCodecName()); - } + return new ArrowBodyCompression(codec.getCodecType().getType(), BodyCompressionMethod.BUFFER); } /** - * Creates the {@link CompressionCodec} given the compression type. + * Process compression by compressing the buffer as is. */ - public static CompressionCodec createCodec(byte compressionType) { - switch (compressionType) { - case NoCompressionCodec.COMPRESSION_TYPE: - return NoCompressionCodec.INSTANCE; - default: - throw new IllegalArgumentException("Compression type not supported: " + compressionType); - } + public static ArrowBuf packageRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) { + ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex()); + compressedBuffer.setLong(0, NO_COMPRESSION_LENGTH); + compressedBuffer.setBytes(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer, 0, inputBuffer.writerIndex()); + compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex()); + return compressedBuffer; + } + + /** + * Process decompression by slicing the buffer that contains the uncompressed bytes. + */ + public static ArrowBuf extractUncompressedBuffer(ArrowBuf inputBuffer) { + return inputBuffer.slice(SIZE_OF_UNCOMPRESSED_LENGTH, + inputBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH); } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java index 72273de7630f2..e5e8e9d463bd0 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java @@ -38,8 +38,8 @@ private NoCompressionCodec() { } @Override - public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) { - return unCompressedBuffer; + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + return uncompressedBuffer; } @Override @@ -48,7 +48,20 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) } @Override - public String getCodecName() { - return "default"; + public CompressionUtil.CodecType getCodecType() { + return CompressionUtil.CodecType.NO_COMPRESSION; + } + + /** + * The default factory that creates a {@link NoCompressionCodec}. + */ + public static class Factory implements CompressionCodec.Factory { + + public static final NoCompressionCodec.Factory INSTANCE = new NoCompressionCodec.Factory(); + + @Override + public CompressionCodec createCodec(CompressionUtil.CodecType codecType) { + return NoCompressionCodec.INSTANCE; + } } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java index 9267b0a35642f..f4e9e0db1e5f7 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java @@ -28,6 +28,8 @@ import org.apache.arrow.flatbuf.Footer; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.VisibleForTesting; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowBlock; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; import org.apache.arrow.vector.ipc.message.ArrowFooter; @@ -51,11 +53,21 @@ public class ArrowFileReader extends ArrowReader { private int currentDictionaryBatch = 0; private int currentRecordBatch = 0; - public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) { - super(allocator); + public ArrowFileReader( + SeekableReadChannel in, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { + super(allocator, compressionFactory); this.in = in; } + public ArrowFileReader( + SeekableByteChannel in, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { + this(new SeekableReadChannel(in), allocator, compressionFactory); + } + + public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) { + this(in, allocator, NoCompressionCodec.Factory.INSTANCE); + } + public ArrowFileReader(SeekableByteChannel in, BufferAllocator allocator) { this(new SeekableReadChannel(in), allocator); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java index 3408c4541d2a0..9d940deecfe20 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java @@ -28,6 +28,8 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; @@ -49,8 +51,15 @@ public abstract class ArrowReader implements DictionaryProvider, AutoCloseable { protected Map dictionaries; private boolean initialized = false; + private final CompressionCodec.Factory compressionFactory; + protected ArrowReader(BufferAllocator allocator) { + this(allocator, NoCompressionCodec.Factory.INSTANCE); + } + + protected ArrowReader(BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { this.allocator = allocator; + this.compressionFactory = compressionFactory; } /** @@ -181,7 +190,7 @@ protected void initialize() throws IOException { Schema schema = new Schema(fields, originalSchema.getCustomMetadata()); this.root = new VectorSchemaRoot(schema, vectors, 0); - this.loader = new VectorLoader(root); + this.loader = new VectorLoader(root, compressionFactory); this.dictionaries = Collections.unmodifiableMap(dictionaries); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java index ad9f40e3c788d..a0096aaf3ee56 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java @@ -26,6 +26,8 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.ipc.message.MessageChannelReader; @@ -50,12 +52,36 @@ public class ArrowStreamReader extends ArrowReader { * * @param messageReader reader used to get messages from a ReadChannel * @param allocator to allocate new buffers + * @param compressionFactory the factory to create compression codec. */ - public ArrowStreamReader(MessageChannelReader messageReader, BufferAllocator allocator) { - super(allocator); + public ArrowStreamReader( + MessageChannelReader messageReader, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { + super(allocator, compressionFactory); this.messageReader = messageReader; } + /** + * Constructs a streaming reader using a MessageChannelReader. Non-blocking. + * + * @param messageReader reader used to get messages from a ReadChannel + * @param allocator to allocate new buffers + */ + public ArrowStreamReader(MessageChannelReader messageReader, BufferAllocator allocator) { + this(messageReader, allocator, NoCompressionCodec.Factory.INSTANCE); + } + + /** + * Constructs a streaming reader from a ReadableByteChannel input. Non-blocking. + * + * @param in ReadableByteChannel to read messages from + * @param allocator to allocate new buffers + * @param compressionFactory the factory to create compression codec. + */ + public ArrowStreamReader( + ReadableByteChannel in, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { + this(new MessageChannelReader(new ReadChannel(in), allocator), allocator, compressionFactory); + } + /** * Constructs a streaming reader from a ReadableByteChannel input. Non-blocking. * @@ -66,6 +92,18 @@ public ArrowStreamReader(ReadableByteChannel in, BufferAllocator allocator) { this(new MessageChannelReader(new ReadChannel(in), allocator), allocator); } + /** + * Constructs a streaming reader from a ReadableByteChannel input. Non-blocking. + * + * @param in InputStream to read messages from + * @param allocator to allocate new buffers + * @param compressionFactory the factory to create compression codec. + */ + public ArrowStreamReader( + InputStream in, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { + this(Channels.newChannel(in), allocator, compressionFactory); + } + /** * Constructs a streaming reader from an InputStream. Non-blocking. * diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index 6fa2fb1a72de9..dbf2774fba839 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -194,14 +194,14 @@ public int writeTo(FlatBufferBuilder builder) { RecordBatch.startBuffersVector(builder, buffers.size()); int buffersOffset = FBSerializables.writeAllStructsToVector(builder, buffersLayout); int compressOffset = 0; - if (bodyCompression != null && bodyCompression != NoCompressionCodec.DEFAULT_BODY_COMPRESSION) { + if (bodyCompression.getCodec() != NoCompressionCodec.COMPRESSION_TYPE) { compressOffset = bodyCompression.writeTo(builder); } RecordBatch.startRecordBatch(builder); RecordBatch.addLength(builder, length); RecordBatch.addNodes(builder, nodesOffset); RecordBatch.addBuffers(builder, buffersOffset); - if (bodyCompression != null && bodyCompression != NoCompressionCodec.DEFAULT_BODY_COMPRESSION) { + if (bodyCompression.getCodec() != NoCompressionCodec.COMPRESSION_TYPE) { RecordBatch.addCompression(builder, compressOffset); } return RecordBatch.endRecordBatch(builder);