From 4d27ba2388d78032bb792f7b8bf03c6b38e68c0c Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Thu, 17 Dec 2020 10:55:24 +0800 Subject: [PATCH 01/10] ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4 --- .../apache/arrow/memory/util/MemoryUtil.java | 2 +- java/vector/pom.xml | 5 + .../vector/compression/CompressionUtil.java | 2 + .../compression/Lz4CompressionCodec.java | 110 +++++++++++ .../compression/TestCompressionCodec.java | 177 ++++++++++++++++++ 5 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java create mode 100644 java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java index 63d2c091eb5c0..904a8bdaced8e 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java @@ -132,7 +132,7 @@ public Object run() { } /** - * Given a {@link ByteBuf}, gets the address the underlying memory space. + * Given a {@link ByteBuffer}, gets the address the underlying memory space. * * @param buf the byte buffer. * @return address of the underlying memory. diff --git a/java/vector/pom.xml b/java/vector/pom.xml index ed22e9b94cdc1..d6e05af7c0d25 100644 --- a/java/vector/pom.xml +++ b/java/vector/pom.xml @@ -74,6 +74,11 @@ org.slf4j slf4j-api + + org.lz4 + lz4-java + 1.7.1 + diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index 464f3aa8e9c9e..1612642739dd6 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -53,6 +53,8 @@ public static CompressionCodec createCodec(byte compressionType) { switch (compressionType) { case NoCompressionCodec.COMPRESSION_TYPE: return NoCompressionCodec.INSTANCE; + case CompressionType.LZ4_FRAME: + return new Lz4CompressionCodec(); default: throw new IllegalArgumentException("Compression type not supported: " + compressionType); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java new file mode 100644 index 0000000000000..0c82b86c87bde --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import java.nio.ByteBuffer; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; +import org.apache.arrow.util.Preconditions; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + private static final long SIZE_OF_MESSAGE_LENGTH = 8L; + + private final LZ4Factory factory; + + private LZ4Compressor compressor; + + private LZ4FastDecompressor decompressor; + + public Lz4CompressionCodec() { + factory = LZ4Factory.fastestInstance(); + } + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) { + Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The uncompressed buffer size exceeds the integer limit"); + + // create compressor lazily + if (compressor == null) { + compressor = factory.fastCompressor(); + } + + int maxCompressedLength = compressor.maxCompressedLength((int) unCompressedBuffer.writerIndex()); + + // first 8 bytes reserved for uncompressed length, to be consistent with the + // C++ implementation. + ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH); + compressedBuffer.setLong(0, unCompressedBuffer.writerIndex()); + + ByteBuffer uncompressed = + MemoryUtil.directBuffer(unCompressedBuffer.memoryAddress(), (int) unCompressedBuffer.writerIndex()); + ByteBuffer compressed = + MemoryUtil.directBuffer(compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, maxCompressedLength); + + int compressedLength = compressor.compress( + uncompressed, 0, (int) unCompressedBuffer.writerIndex(), compressed, 0, maxCompressedLength); + compressedBuffer.writerIndex(compressedLength + SIZE_OF_MESSAGE_LENGTH); + + unCompressedBuffer.close(); + return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { + Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The compressed buffer size exceeds the integer limit"); + + Preconditions.checkArgument(compressedBuffer.writerIndex() > SIZE_OF_MESSAGE_LENGTH, + "Not enough data to decompress."); + + // create decompressor lazily + if (decompressor == null) { + decompressor = factory.fastDecompressor(); + } + + long decompressedLength = compressedBuffer.getLong(0); + ByteBuffer compressed = MemoryUtil.directBuffer( + compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, (int) compressedBuffer.writerIndex()); + + ArrowBuf decompressedBuffer = allocator.buffer(decompressedLength); + ByteBuffer decompressed = MemoryUtil.directBuffer(decompressedBuffer.memoryAddress(), (int) decompressedLength); + + decompressor.decompress(compressed, decompressed); + decompressedBuffer.writerIndex(decompressedLength); + + compressedBuffer.close(); + return decompressedBuffer; + } + + @Override + public String getCodecName() { + return CompressionType.name(CompressionType.LZ4_FRAME); + } +} diff --git a/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java b/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java new file mode 100644 index 0000000000000..741f5f6b47e45 --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Test cases for {@link CompressionCodec}s. + */ +@RunWith(Parameterized.class) +public class TestCompressionCodec { + + private final CompressionCodec codec; + + private BufferAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void terminate() { + allocator.close(); + } + + public TestCompressionCodec(String name, CompressionCodec codec) { + this.codec = codec; + } + + @Parameterized.Parameters(name = "codec = {0}") + public static Collection getCodecs() { + List params = new ArrayList<>(); + + CompressionCodec dumbCodec = NoCompressionCodec.INSTANCE; + params.add(new Object[] {dumbCodec.getCodecName(), dumbCodec}); + + CompressionCodec lz4Codec = new Lz4CompressionCodec(); + params.add(new Object[] {lz4Codec.getCodecName(), lz4Codec}); + + return params; + } + + private List compressBuffers(List inputBuffers) { + List outputBuffers = new ArrayList<>(inputBuffers.size()); + for (ArrowBuf buf : inputBuffers) { + outputBuffers.add(codec.compress(allocator, buf)); + } + return outputBuffers; + } + + private List deCompressBuffers(List inputBuffers) { + List outputBuffers = new ArrayList<>(inputBuffers.size()); + for (ArrowBuf buf : inputBuffers) { + outputBuffers.add(codec.decompress(allocator, buf)); + } + return outputBuffers; + } + + @Test + public void testCompressFixedWidthBuffers() throws Exception { + final int vecLen = 1000; + + // prepare vector to compress + IntVector origVec = new IntVector("vec", allocator); + origVec.allocateNew(vecLen); + for (int i = 0; i < vecLen; i++) { + if (i % 10 == 0) { + origVec.setNull(i); + } else { + origVec.set(i, i); + } + } + origVec.setValueCount(vecLen); + int nullCount = origVec.getNullCount(); + + // compress & decompress + List origBuffers = origVec.getFieldBuffers(); + List compressedBuffers = compressBuffers(origBuffers); + List decompressedBuffers = deCompressBuffers(compressedBuffers); + + assertEquals(2, decompressedBuffers.size()); + + // orchestrate new vector + IntVector newVec = new IntVector("new vec", allocator); + newVec.loadFieldBuffers(new ArrowFieldNode(vecLen, nullCount), decompressedBuffers); + + // verify new vector + assertEquals(vecLen, newVec.getValueCount()); + for (int i = 0; i < vecLen; i++) { + if (i % 10 == 0) { + assertTrue(newVec.isNull(i)); + } else { + assertEquals(i, newVec.get(i)); + } + } + + newVec.close(); + AutoCloseables.close(decompressedBuffers); + } + + @Test + public void testCompressVariableWidthBuffers() throws Exception { + final int vecLen = 1000; + + // prepare vector to compress + VarCharVector origVec = new VarCharVector("vec", allocator); + origVec.allocateNew(); + for (int i = 0; i < vecLen; i++) { + if (i % 10 == 0) { + origVec.setNull(i); + } else { + origVec.setSafe(i, String.valueOf(i).getBytes()); + } + } + origVec.setValueCount(vecLen); + int nullCount = origVec.getNullCount(); + + // compress & decompress + List origBuffers = origVec.getFieldBuffers(); + List compressedBuffers = compressBuffers(origBuffers); + List decompressedBuffers = deCompressBuffers(compressedBuffers); + + assertEquals(3, decompressedBuffers.size()); + + // orchestrate new vector + VarCharVector newVec = new VarCharVector("new vec", allocator); + newVec.loadFieldBuffers(new ArrowFieldNode(vecLen, nullCount), decompressedBuffers); + + // verify new vector + assertEquals(vecLen, newVec.getValueCount()); + for (int i = 0; i < vecLen; i++) { + if (i % 10 == 0) { + assertTrue(newVec.isNull(i)); + } else { + assertArrayEquals(String.valueOf(i).getBytes(), newVec.get(i)); + } + } + + newVec.close(); + AutoCloseables.close(decompressedBuffers); + } +} From 47eb6dd8b8d1e9abd4204b6a0dafcece4aec68c4 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Mon, 21 Dec 2020 11:02:01 +0800 Subject: [PATCH 02/10] ARROW-10880: [Java] Support reading/writing big-endian message size --- .../java/org/apache/arrow/memory/util/MemoryUtil.java | 6 ++++++ .../arrow/vector/compression/Lz4CompressionCodec.java | 11 ++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java index 904a8bdaced8e..16ef39702ca3e 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java @@ -21,6 +21,7 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.security.AccessController; import java.security.PrivilegedAction; @@ -48,6 +49,11 @@ public class MemoryUtil { */ static final long BYTE_BUFFER_ADDRESS_OFFSET; + /** + * If the native byte order is little-endian. + */ + public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + static { try { // try to get the unsafe object diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java index 0c82b86c87bde..f1036347e183d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java @@ -17,6 +17,8 @@ package org.apache.arrow.vector.compression; +import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN; + import java.nio.ByteBuffer; import org.apache.arrow.flatbuf.CompressionType; @@ -61,7 +63,11 @@ public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) // first 8 bytes reserved for uncompressed length, to be consistent with the // C++ implementation. ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH); - compressedBuffer.setLong(0, unCompressedBuffer.writerIndex()); + long uncompressedLength = unCompressedBuffer.writerIndex(); + if (!LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); + } + compressedBuffer.setLong(0, uncompressedLength); ByteBuffer uncompressed = MemoryUtil.directBuffer(unCompressedBuffer.memoryAddress(), (int) unCompressedBuffer.writerIndex()); @@ -90,6 +96,9 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) } long decompressedLength = compressedBuffer.getLong(0); + if (!LITTLE_ENDIAN) { + decompressedLength = Long.reverseBytes(decompressedLength); + } ByteBuffer compressed = MemoryUtil.directBuffer( compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, (int) compressedBuffer.writerIndex()); From 4b260cd61ce75fe3e0b457bc93daab68e7edf95c Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Wed, 6 Jan 2021 11:03:22 +0800 Subject: [PATCH 03/10] ARROW-10880: [Java] Adjust variable names --- .../arrow/vector/compression/CompressionCodec.java | 4 ++-- .../vector/compression/Lz4CompressionCodec.java | 14 +++++++------- .../vector/compression/NoCompressionCodec.java | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java index ce2dd73aab5f5..8a4dd5e6509c6 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java @@ -28,11 +28,11 @@ public interface CompressionCodec { /** * Compress a buffer. * @param allocator the allocator for allocating memory for compressed buffer. - * @param unCompressedBuffer the buffer to compress. + * @param uncompressedBuffer the buffer to compress. * Implementation of this method should take care of releasing this buffer. * @return the compressed buffer. */ - ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer); + ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer); /** * Decompress a buffer. diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java index f1036347e183d..81b91d39709f2 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java @@ -49,8 +49,8 @@ public Lz4CompressionCodec() { } @Override - public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) { - Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= Integer.MAX_VALUE, + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, "The uncompressed buffer size exceeds the integer limit"); // create compressor lazily @@ -58,27 +58,27 @@ public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) compressor = factory.fastCompressor(); } - int maxCompressedLength = compressor.maxCompressedLength((int) unCompressedBuffer.writerIndex()); + int maxCompressedLength = compressor.maxCompressedLength((int) uncompressedBuffer.writerIndex()); // first 8 bytes reserved for uncompressed length, to be consistent with the // C++ implementation. ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH); - long uncompressedLength = unCompressedBuffer.writerIndex(); + long uncompressedLength = uncompressedBuffer.writerIndex(); if (!LITTLE_ENDIAN) { uncompressedLength = Long.reverseBytes(uncompressedLength); } compressedBuffer.setLong(0, uncompressedLength); ByteBuffer uncompressed = - MemoryUtil.directBuffer(unCompressedBuffer.memoryAddress(), (int) unCompressedBuffer.writerIndex()); + MemoryUtil.directBuffer(uncompressedBuffer.memoryAddress(), (int) uncompressedBuffer.writerIndex()); ByteBuffer compressed = MemoryUtil.directBuffer(compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, maxCompressedLength); int compressedLength = compressor.compress( - uncompressed, 0, (int) unCompressedBuffer.writerIndex(), compressed, 0, maxCompressedLength); + uncompressed, 0, (int) uncompressedBuffer.writerIndex(), compressed, 0, maxCompressedLength); compressedBuffer.writerIndex(compressedLength + SIZE_OF_MESSAGE_LENGTH); - unCompressedBuffer.close(); + uncompressedBuffer.close(); return compressedBuffer; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java index 72273de7630f2..2206844e1244e 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java @@ -38,8 +38,8 @@ private NoCompressionCodec() { } @Override - public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) { - return unCompressedBuffer; + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + return uncompressedBuffer; } @Override From ed66aa4515b7e301f90ef2e597389085abcc3198 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Thu, 7 Jan 2021 17:38:33 +0800 Subject: [PATCH 04/10] ARROW-10880: [Java] Support empty buffers --- .../compression/Lz4CompressionCodec.java | 26 +++++++++++++---- .../compression/TestCompressionCodec.java | 29 +++++++++++++++++++ 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java index 81b91d39709f2..2aecf9013fabb 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java @@ -53,6 +53,15 @@ public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, "The uncompressed buffer size exceeds the integer limit"); + if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_MESSAGE_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(SIZE_OF_MESSAGE_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; + } + // create compressor lazily if (compressor == null) { compressor = factory.fastCompressor(); @@ -87,18 +96,25 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, "The compressed buffer size exceeds the integer limit"); - Preconditions.checkArgument(compressedBuffer.writerIndex() > SIZE_OF_MESSAGE_LENGTH, + Preconditions.checkArgument(compressedBuffer.writerIndex() >= SIZE_OF_MESSAGE_LENGTH, "Not enough data to decompress."); + long decompressedLength = compressedBuffer.getLong(0); + if (!LITTLE_ENDIAN) { + decompressedLength = Long.reverseBytes(decompressedLength); + } + + if (decompressedLength == 0L) { + // shortcut for empty buffer + compressedBuffer.close(); + return allocator.getEmpty(); + } + // create decompressor lazily if (decompressor == null) { decompressor = factory.fastDecompressor(); } - long decompressedLength = compressedBuffer.getLong(0); - if (!LITTLE_ENDIAN) { - decompressedLength = Long.reverseBytes(decompressedLength); - } ByteBuffer compressed = MemoryUtil.directBuffer( compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, (int) compressedBuffer.writerIndex()); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java b/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java index 741f5f6b47e45..a0777175ab8c8 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java @@ -30,6 +30,7 @@ import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.junit.After; @@ -174,4 +175,32 @@ public void testCompressVariableWidthBuffers() throws Exception { newVec.close(); AutoCloseables.close(decompressedBuffers); } + + @Test + public void testEmptyBuffer() throws Exception { + final int vecLength = 10; + final VarBinaryVector origVec = new VarBinaryVector("vec", allocator); + + origVec.allocateNew(vecLength); + + // Do not set any values (all missing) + origVec.setValueCount(vecLength); + + final List origBuffers = origVec.getFieldBuffers(); + final List compressedBuffers = compressBuffers(origBuffers); + final List decompressedBuffers = deCompressBuffers(compressedBuffers); + + // orchestrate new vector + VarBinaryVector newVec = new VarBinaryVector("new vec", allocator); + newVec.loadFieldBuffers(new ArrowFieldNode(vecLength, vecLength), decompressedBuffers); + + // verify new vector + assertEquals(vecLength, newVec.getValueCount()); + for (int i = 0; i < vecLength; i++) { + assertTrue(newVec.isNull(i)); + } + + newVec.close(); + AutoCloseables.close(decompressedBuffers); + } } From 7e274a6ab869429ba824b42766b6146ce371856c Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Fri, 15 Jan 2021 17:01:38 +0800 Subject: [PATCH 05/10] ARROW-10880: [Java] Support passing raw data --- .../vector/compression/CompressionUtil.java | 25 +++++++++ .../compression/Lz4CompressionCodec.java | 31 ++++++---- .../compression/TestCompressionCodec.java | 56 +++++++++---------- 3 files changed, 74 insertions(+), 38 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index 1612642739dd6..5daefb12e1be3 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -19,6 +19,7 @@ import org.apache.arrow.flatbuf.BodyCompressionMethod; import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.vector.ipc.message.ArrowBodyCompression; /** @@ -26,6 +27,14 @@ */ public class CompressionUtil { + static final long SIZE_OF_UNCOMPRESSED_LENGTH = 8L; + + /** + * Special flag to indicate no compression. + * (e.g. when the compressed buffer has a larger size.) + */ + static final long NO_COMPRESSION_LENGTH = -1L; + private CompressionUtil() { } @@ -59,4 +68,20 @@ public static CompressionCodec createCodec(byte compressionType) { throw new IllegalArgumentException("Compression type not supported: " + compressionType); } } + + /** + * Process compression by compressing the buffer as is. + */ + public static void compressRawBuffer(ArrowBuf inputBuffer, ArrowBuf compressedBuffer) { + compressedBuffer.setLong(0, NO_COMPRESSION_LENGTH); + compressedBuffer.setBytes(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer, 0, inputBuffer.writerIndex()); + } + + /** + * Process decompression by decompressing the buffer as is. + */ + public static ArrowBuf decompressRawBuffer(ArrowBuf inputBuffer) { + return inputBuffer.slice(SIZE_OF_UNCOMPRESSED_LENGTH, + inputBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH); + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java index 2aecf9013fabb..b3ea2d3b87e94 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java @@ -18,6 +18,8 @@ package org.apache.arrow.vector.compression; import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN; +import static org.apache.arrow.vector.compression.CompressionUtil.NO_COMPRESSION_LENGTH; +import static org.apache.arrow.vector.compression.CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; import java.nio.ByteBuffer; @@ -36,8 +38,6 @@ */ public class Lz4CompressionCodec implements CompressionCodec { - private static final long SIZE_OF_MESSAGE_LENGTH = 8L; - private final LZ4Factory factory; private LZ4Compressor compressor; @@ -55,9 +55,9 @@ public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) if (uncompressedBuffer.writerIndex() == 0L) { // shortcut for empty buffer - ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_MESSAGE_LENGTH); + ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH); compressedBuffer.setLong(0, 0); - compressedBuffer.writerIndex(SIZE_OF_MESSAGE_LENGTH); + compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH); uncompressedBuffer.close(); return compressedBuffer; } @@ -71,7 +71,7 @@ public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) // first 8 bytes reserved for uncompressed length, to be consistent with the // C++ implementation. - ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH); + ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_UNCOMPRESSED_LENGTH); long uncompressedLength = uncompressedBuffer.writerIndex(); if (!LITTLE_ENDIAN) { uncompressedLength = Long.reverseBytes(uncompressedLength); @@ -81,11 +81,16 @@ public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) ByteBuffer uncompressed = MemoryUtil.directBuffer(uncompressedBuffer.memoryAddress(), (int) uncompressedBuffer.writerIndex()); ByteBuffer compressed = - MemoryUtil.directBuffer(compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, maxCompressedLength); + MemoryUtil.directBuffer(compressedBuffer.memoryAddress() + SIZE_OF_UNCOMPRESSED_LENGTH, maxCompressedLength); - int compressedLength = compressor.compress( + long compressedLength = compressor.compress( uncompressed, 0, (int) uncompressedBuffer.writerIndex(), compressed, 0, maxCompressedLength); - compressedBuffer.writerIndex(compressedLength + SIZE_OF_MESSAGE_LENGTH); + if (compressedLength > uncompressedBuffer.writerIndex()) { + // compressed buffer is larger, send the raw buffer + CompressionUtil.compressRawBuffer(uncompressedBuffer, compressedBuffer); + compressedLength = uncompressedBuffer.writerIndex(); + } + compressedBuffer.writerIndex(compressedLength + SIZE_OF_UNCOMPRESSED_LENGTH); uncompressedBuffer.close(); return compressedBuffer; @@ -96,7 +101,7 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, "The compressed buffer size exceeds the integer limit"); - Preconditions.checkArgument(compressedBuffer.writerIndex() >= SIZE_OF_MESSAGE_LENGTH, + Preconditions.checkArgument(compressedBuffer.writerIndex() >= SIZE_OF_UNCOMPRESSED_LENGTH, "Not enough data to decompress."); long decompressedLength = compressedBuffer.getLong(0); @@ -110,13 +115,19 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) return allocator.getEmpty(); } + if (decompressedLength == NO_COMPRESSION_LENGTH) { + // no compression + return CompressionUtil.decompressRawBuffer(compressedBuffer); + } + // create decompressor lazily if (decompressor == null) { decompressor = factory.fastDecompressor(); } ByteBuffer compressed = MemoryUtil.directBuffer( - compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, (int) compressedBuffer.writerIndex()); + compressedBuffer.memoryAddress() + SIZE_OF_UNCOMPRESSED_LENGTH, + (int) (compressedBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH)); ArrowBuf decompressedBuffer = allocator.buffer(decompressedLength); ByteBuffer decompressed = MemoryUtil.directBuffer(decompressedBuffer.memoryAddress(), (int) decompressedLength); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java b/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java index a0777175ab8c8..b94845b7a6f25 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java @@ -49,6 +49,8 @@ public class TestCompressionCodec { private BufferAllocator allocator; + private final int vectorLength; + @Before public void init() { allocator = new RootAllocator(Integer.MAX_VALUE); @@ -59,20 +61,23 @@ public void terminate() { allocator.close(); } - public TestCompressionCodec(String name, CompressionCodec codec) { + public TestCompressionCodec(String name, int vectorLength, CompressionCodec codec) { this.codec = codec; + this.vectorLength = vectorLength; } - @Parameterized.Parameters(name = "codec = {0}") + @Parameterized.Parameters(name = "codec = {0}, length = {1}") public static Collection getCodecs() { List params = new ArrayList<>(); - CompressionCodec dumbCodec = NoCompressionCodec.INSTANCE; - params.add(new Object[] {dumbCodec.getCodecName(), dumbCodec}); - - CompressionCodec lz4Codec = new Lz4CompressionCodec(); - params.add(new Object[] {lz4Codec.getCodecName(), lz4Codec}); + int[] lengths = new int[] {10, 100, 1000}; + for (int len : lengths) { + CompressionCodec dumbCodec = NoCompressionCodec.INSTANCE; + params.add(new Object[]{dumbCodec.getCodecName(), len, dumbCodec}); + CompressionCodec lz4Codec = new Lz4CompressionCodec(); + params.add(new Object[]{lz4Codec.getCodecName(), len, lz4Codec}); + } return params; } @@ -94,19 +99,17 @@ private List deCompressBuffers(List inputBuffers) { @Test public void testCompressFixedWidthBuffers() throws Exception { - final int vecLen = 1000; - // prepare vector to compress IntVector origVec = new IntVector("vec", allocator); - origVec.allocateNew(vecLen); - for (int i = 0; i < vecLen; i++) { + origVec.allocateNew(vectorLength); + for (int i = 0; i < vectorLength; i++) { if (i % 10 == 0) { origVec.setNull(i); } else { origVec.set(i, i); } } - origVec.setValueCount(vecLen); + origVec.setValueCount(vectorLength); int nullCount = origVec.getNullCount(); // compress & decompress @@ -118,11 +121,11 @@ public void testCompressFixedWidthBuffers() throws Exception { // orchestrate new vector IntVector newVec = new IntVector("new vec", allocator); - newVec.loadFieldBuffers(new ArrowFieldNode(vecLen, nullCount), decompressedBuffers); + newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, nullCount), decompressedBuffers); // verify new vector - assertEquals(vecLen, newVec.getValueCount()); - for (int i = 0; i < vecLen; i++) { + assertEquals(vectorLength, newVec.getValueCount()); + for (int i = 0; i < vectorLength; i++) { if (i % 10 == 0) { assertTrue(newVec.isNull(i)); } else { @@ -136,19 +139,17 @@ public void testCompressFixedWidthBuffers() throws Exception { @Test public void testCompressVariableWidthBuffers() throws Exception { - final int vecLen = 1000; - // prepare vector to compress VarCharVector origVec = new VarCharVector("vec", allocator); origVec.allocateNew(); - for (int i = 0; i < vecLen; i++) { + for (int i = 0; i < vectorLength; i++) { if (i % 10 == 0) { origVec.setNull(i); } else { origVec.setSafe(i, String.valueOf(i).getBytes()); } } - origVec.setValueCount(vecLen); + origVec.setValueCount(vectorLength); int nullCount = origVec.getNullCount(); // compress & decompress @@ -160,11 +161,11 @@ public void testCompressVariableWidthBuffers() throws Exception { // orchestrate new vector VarCharVector newVec = new VarCharVector("new vec", allocator); - newVec.loadFieldBuffers(new ArrowFieldNode(vecLen, nullCount), decompressedBuffers); + newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, nullCount), decompressedBuffers); // verify new vector - assertEquals(vecLen, newVec.getValueCount()); - for (int i = 0; i < vecLen; i++) { + assertEquals(vectorLength, newVec.getValueCount()); + for (int i = 0; i < vectorLength; i++) { if (i % 10 == 0) { assertTrue(newVec.isNull(i)); } else { @@ -178,13 +179,12 @@ public void testCompressVariableWidthBuffers() throws Exception { @Test public void testEmptyBuffer() throws Exception { - final int vecLength = 10; final VarBinaryVector origVec = new VarBinaryVector("vec", allocator); - origVec.allocateNew(vecLength); + origVec.allocateNew(vectorLength); // Do not set any values (all missing) - origVec.setValueCount(vecLength); + origVec.setValueCount(vectorLength); final List origBuffers = origVec.getFieldBuffers(); final List compressedBuffers = compressBuffers(origBuffers); @@ -192,11 +192,11 @@ public void testEmptyBuffer() throws Exception { // orchestrate new vector VarBinaryVector newVec = new VarBinaryVector("new vec", allocator); - newVec.loadFieldBuffers(new ArrowFieldNode(vecLength, vecLength), decompressedBuffers); + newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, vectorLength), decompressedBuffers); // verify new vector - assertEquals(vecLength, newVec.getValueCount()); - for (int i = 0; i < vecLength; i++) { + assertEquals(vectorLength, newVec.getValueCount()); + for (int i = 0; i < vectorLength; i++) { assertTrue(newVec.isNull(i)); } From b5143a1dc01264b0fc61abc46faf37072ff06217 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Thu, 4 Feb 2021 18:00:31 +0800 Subject: [PATCH 06/10] ARROW-10880: [Java] Switch to commons-compress library --- java/vector/pom.xml | 6 +- .../vector/compression/CompressionUtil.java | 6 +- .../compression/Lz4CompressionCodec.java | 110 ++++++++++-------- 3 files changed, 70 insertions(+), 52 deletions(-) diff --git a/java/vector/pom.xml b/java/vector/pom.xml index d6e05af7c0d25..63099b0f11ed9 100644 --- a/java/vector/pom.xml +++ b/java/vector/pom.xml @@ -75,9 +75,9 @@ slf4j-api - org.lz4 - lz4-java - 1.7.1 + org.apache.commons + commons-compress + 1.20 diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index 5daefb12e1be3..9248056ceebf4 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -20,6 +20,7 @@ import org.apache.arrow.flatbuf.BodyCompressionMethod; import org.apache.arrow.flatbuf.CompressionType; import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.ipc.message.ArrowBodyCompression; /** @@ -72,9 +73,12 @@ public static CompressionCodec createCodec(byte compressionType) { /** * Process compression by compressing the buffer as is. */ - public static void compressRawBuffer(ArrowBuf inputBuffer, ArrowBuf compressedBuffer) { + public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) { + ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex()); compressedBuffer.setLong(0, NO_COMPRESSION_LENGTH); compressedBuffer.setBytes(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer, 0, inputBuffer.writerIndex()); + compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex()); + return compressedBuffer; } /** diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java index b3ea2d3b87e94..843b761760eb8 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java @@ -21,33 +21,27 @@ import static org.apache.arrow.vector.compression.CompressionUtil.NO_COMPRESSION_LENGTH; import static org.apache.arrow.vector.compression.CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; -import java.nio.ByteBuffer; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import org.apache.arrow.flatbuf.CompressionType; import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.util.MemoryUtil; import org.apache.arrow.util.Preconditions; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; -import net.jpountz.lz4.LZ4Compressor; -import net.jpountz.lz4.LZ4Factory; -import net.jpountz.lz4.LZ4FastDecompressor; +import io.netty.util.internal.PlatformDependent; /** * Compression codec for the LZ4 algorithm. */ public class Lz4CompressionCodec implements CompressionCodec { - private final LZ4Factory factory; - - private LZ4Compressor compressor; - - private LZ4FastDecompressor decompressor; - - public Lz4CompressionCodec() { - factory = LZ4Factory.fastestInstance(); - } - @Override public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, @@ -62,37 +56,46 @@ public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) return compressedBuffer; } - // create compressor lazily - if (compressor == null) { - compressor = factory.fastCompressor(); + try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { + // compressed buffer is larger, send the raw buffer + compressedBuffer.close(); + compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { + byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; + PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); } - int maxCompressedLength = compressor.maxCompressedLength((int) uncompressedBuffer.writerIndex()); + byte[] outBytes = baos.toByteArray(); + + ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); - // first 8 bytes reserved for uncompressed length, to be consistent with the - // C++ implementation. - ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_UNCOMPRESSED_LENGTH); long uncompressedLength = uncompressedBuffer.writerIndex(); if (!LITTLE_ENDIAN) { uncompressedLength = Long.reverseBytes(uncompressedLength); } + // first 8 bytes reserved for uncompressed length, to be consistent with the + // C++ implementation. compressedBuffer.setLong(0, uncompressedLength); - ByteBuffer uncompressed = - MemoryUtil.directBuffer(uncompressedBuffer.memoryAddress(), (int) uncompressedBuffer.writerIndex()); - ByteBuffer compressed = - MemoryUtil.directBuffer(compressedBuffer.memoryAddress() + SIZE_OF_UNCOMPRESSED_LENGTH, maxCompressedLength); - - long compressedLength = compressor.compress( - uncompressed, 0, (int) uncompressedBuffer.writerIndex(), compressed, 0, maxCompressedLength); - if (compressedLength > uncompressedBuffer.writerIndex()) { - // compressed buffer is larger, send the raw buffer - CompressionUtil.compressRawBuffer(uncompressedBuffer, compressedBuffer); - compressedLength = uncompressedBuffer.writerIndex(); - } - compressedBuffer.writerIndex(compressedLength + SIZE_OF_UNCOMPRESSED_LENGTH); - - uncompressedBuffer.close(); + PlatformDependent.copyMemory( + outBytes, 0, compressedBuffer.memoryAddress() + SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); + compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); return compressedBuffer; } @@ -120,22 +123,33 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) return CompressionUtil.decompressRawBuffer(compressedBuffer); } - // create decompressor lazily - if (decompressor == null) { - decompressor = factory.fastDecompressor(); + try { + ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer); + compressedBuffer.close(); + return decompressedBuffer; + } catch (IOException e) { + throw new RuntimeException(e); } + } - ByteBuffer compressed = MemoryUtil.directBuffer( - compressedBuffer.memoryAddress() + SIZE_OF_UNCOMPRESSED_LENGTH, - (int) (compressedBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH)); + private ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) throws IOException { + long decompressedLength = compressedBuffer.getLong(0); + if (!LITTLE_ENDIAN) { + decompressedLength = Long.reverseBytes(decompressedLength); + } - ArrowBuf decompressedBuffer = allocator.buffer(decompressedLength); - ByteBuffer decompressed = MemoryUtil.directBuffer(decompressedBuffer.memoryAddress(), (int) decompressedLength); + byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH)]; + PlatformDependent.copyMemory( + compressedBuffer.memoryAddress() + SIZE_OF_UNCOMPRESSED_LENGTH, inBytes, 0, inBytes.length); + ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength); + try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) { + IOUtils.copy(in, out); + } - decompressor.decompress(compressed, decompressed); + byte[] outBytes = out.toByteArray(); + ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length); + PlatformDependent.copyMemory(outBytes, 0, decompressedBuffer.memoryAddress(), outBytes.length); decompressedBuffer.writerIndex(decompressedLength); - - compressedBuffer.close(); return decompressedBuffer; } From 0ddfa169e52aabe0fe4f51acd4901632d4224382 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Wed, 24 Feb 2021 14:55:51 +0800 Subject: [PATCH 07/10] ARROW-10880: [Java] Extract the codec implementations to a separate module --- java/compression/pom.xml | 56 +++++++++++++++++++ .../CommonsCompressionFactory.java | 41 ++++++++++++++ .../compression/Lz4CompressionCodec.java | 35 ++++++------ .../compression/TestCompressionCodec.java | 4 +- java/pom.xml | 1 + java/vector/pom.xml | 5 -- .../org/apache/arrow/vector/VectorLoader.java | 17 +++++- .../vector/compression/CompressionCodec.java | 11 ++++ .../vector/compression/CompressionUtil.java | 18 +----- .../compression/NoCompressionCodec.java | 13 +++++ 10 files changed, 159 insertions(+), 42 deletions(-) create mode 100644 java/compression/pom.xml create mode 100644 java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java rename java/{vector/src/main/java/org/apache/arrow/vector => compression/src/main/java/org/apache/arrow}/compression/Lz4CompressionCodec.java (82%) rename java/{vector/src/test/java/org/apache/arrow/vector => compression/src/test/java/org/apache/arrow}/compression/TestCompressionCodec.java (97%) diff --git a/java/compression/pom.xml b/java/compression/pom.xml new file mode 100644 index 0000000000000..009d60665adc8 --- /dev/null +++ b/java/compression/pom.xml @@ -0,0 +1,56 @@ + + + + 4.0.0 + + org.apache.arrow + arrow-java-root + 4.0.0-SNAPSHOT + + arrow-compression + Arrow Compression + (Experimental/Contrib) A library for working with the compression/decompression of Arrow data. + + + + org.apache.arrow + arrow-format + ${project.version} + + + org.apache.arrow + arrow-vector + ${project.version} + ${arrow.vector.classifier} + + + org.apache.arrow + arrow-memory-core + ${project.version} + + + org.apache.arrow + arrow-memory-unsafe + ${project.version} + test + + + org.apache.commons + commons-compress + 1.20 + + + io.netty + netty-common + + + diff --git a/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java b/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java new file mode 100644 index 0000000000000..800036210e60d --- /dev/null +++ b/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.compression; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; + +/** + * A factory implementation based on Apache Commons library. + */ +public class CommonsCompressionFactory implements CompressionCodec.Factory { + + @Override + public CompressionCodec createCodec(byte codecType) { + switch (codecType) { + case NoCompressionCodec.COMPRESSION_TYPE: + return NoCompressionCodec.INSTANCE; + case CompressionType.LZ4_FRAME: + return new Lz4CompressionCodec(); + default: + throw new IllegalArgumentException("Compression type not supported: " + codecType); + } + } + +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java b/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java similarity index 82% rename from java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java rename to java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java index 843b761760eb8..a32dc175214d5 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java +++ b/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java @@ -15,11 +15,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.compression; - -import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN; -import static org.apache.arrow.vector.compression.CompressionUtil.NO_COMPRESSION_LENGTH; -import static org.apache.arrow.vector.compression.CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; +package org.apache.arrow.compression; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -30,7 +26,10 @@ import org.apache.arrow.flatbuf.CompressionType; import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.util.MemoryUtil; import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; import org.apache.commons.compress.utils.IOUtils; @@ -49,16 +48,16 @@ public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) if (uncompressedBuffer.writerIndex() == 0L) { // shortcut for empty buffer - ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH); + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); compressedBuffer.setLong(0, 0); - compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH); uncompressedBuffer.close(); return compressedBuffer; } try { ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); - long compressedLength = compressedBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH; + long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; if (compressedLength > uncompressedBuffer.writerIndex()) { // compressed buffer is larger, send the raw buffer compressedBuffer.close(); @@ -83,10 +82,10 @@ private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuff byte[] outBytes = baos.toByteArray(); - ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); long uncompressedLength = uncompressedBuffer.writerIndex(); - if (!LITTLE_ENDIAN) { + if (!MemoryUtil.LITTLE_ENDIAN) { uncompressedLength = Long.reverseBytes(uncompressedLength); } // first 8 bytes reserved for uncompressed length, to be consistent with the @@ -94,8 +93,8 @@ private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuff compressedBuffer.setLong(0, uncompressedLength); PlatformDependent.copyMemory( - outBytes, 0, compressedBuffer.memoryAddress() + SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); - compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); + compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); return compressedBuffer; } @@ -104,11 +103,11 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, "The compressed buffer size exceeds the integer limit"); - Preconditions.checkArgument(compressedBuffer.writerIndex() >= SIZE_OF_UNCOMPRESSED_LENGTH, + Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, "Not enough data to decompress."); long decompressedLength = compressedBuffer.getLong(0); - if (!LITTLE_ENDIAN) { + if (!MemoryUtil.LITTLE_ENDIAN) { decompressedLength = Long.reverseBytes(decompressedLength); } @@ -118,7 +117,7 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) return allocator.getEmpty(); } - if (decompressedLength == NO_COMPRESSION_LENGTH) { + if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) { // no compression return CompressionUtil.decompressRawBuffer(compressedBuffer); } @@ -134,13 +133,13 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) private ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) throws IOException { long decompressedLength = compressedBuffer.getLong(0); - if (!LITTLE_ENDIAN) { + if (!MemoryUtil.LITTLE_ENDIAN) { decompressedLength = Long.reverseBytes(decompressedLength); } - byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH)]; + byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)]; PlatformDependent.copyMemory( - compressedBuffer.memoryAddress() + SIZE_OF_UNCOMPRESSED_LENGTH, inBytes, 0, inBytes.length); + compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes, 0, inBytes.length); ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength); try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) { IOUtils.copy(in, out); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java similarity index 97% rename from java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java rename to java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java index b94845b7a6f25..e741066d43094 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java +++ b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.arrow.vector.compression; +package org.apache.arrow.compression; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -32,6 +32,8 @@ import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.junit.After; import org.junit.Before; diff --git a/java/pom.xml b/java/pom.xml index e4f0c78323369..c776b833a1765 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -676,6 +676,7 @@ performance algorithm adapter/avro + compression diff --git a/java/vector/pom.xml b/java/vector/pom.xml index 63099b0f11ed9..ed22e9b94cdc1 100644 --- a/java/vector/pom.xml +++ b/java/vector/pom.xml @@ -74,11 +74,6 @@ org.slf4j slf4j-api - - org.apache.commons - commons-compress - 1.20 - diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 3a4b00de5efb9..5a3e712cb88e3 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -26,7 +26,7 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.util.Collections2; import org.apache.arrow.vector.compression.CompressionCodec; -import org.apache.arrow.vector.compression.CompressionUtil; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Field; @@ -38,13 +38,26 @@ public class VectorLoader { private final VectorSchemaRoot root; + private final CompressionCodec.Factory factory; + /** * Construct with a root to load and will create children in root based on schema. * * @param root the root to add vectors to based on schema */ public VectorLoader(VectorSchemaRoot root) { + this(root, NoCompressionCodec.Factory.INSTANCE); + } + + /** + * Construct with a root to load and will create children in root based on schema. + * + * @param root the root to add vectors to based on schema. + * @param factory the factory to create codec. + */ + public VectorLoader(VectorSchemaRoot root, CompressionCodec.Factory factory) { this.root = root; + this.factory = factory; } /** @@ -56,7 +69,7 @@ public VectorLoader(VectorSchemaRoot root) { public void load(ArrowRecordBatch recordBatch) { Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); - CompressionCodec codec = CompressionUtil.createCodec(recordBatch.getBodyCompression().getCodec()); + CompressionCodec codec = factory.createCodec(recordBatch.getBodyCompression().getCodec()); for (FieldVector fieldVector : root.getFieldVectors()) { loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java index 8a4dd5e6509c6..d5ef902741e54 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java @@ -48,4 +48,15 @@ public interface CompressionCodec { * @return the name of the codec. */ String getCodecName(); + + /** + * Factory to create compression codec. + */ + interface Factory { + + /** + * Creates the codec based on the codec type. + */ + CompressionCodec createCodec(byte codecType); + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index 9248056ceebf4..aa0c8c716256a 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -28,13 +28,13 @@ */ public class CompressionUtil { - static final long SIZE_OF_UNCOMPRESSED_LENGTH = 8L; + public static final long SIZE_OF_UNCOMPRESSED_LENGTH = 8L; /** * Special flag to indicate no compression. * (e.g. when the compressed buffer has a larger size.) */ - static final long NO_COMPRESSION_LENGTH = -1L; + public static final long NO_COMPRESSION_LENGTH = -1L; private CompressionUtil() { } @@ -56,20 +56,6 @@ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) } } - /** - * Creates the {@link CompressionCodec} given the compression type. - */ - public static CompressionCodec createCodec(byte compressionType) { - switch (compressionType) { - case NoCompressionCodec.COMPRESSION_TYPE: - return NoCompressionCodec.INSTANCE; - case CompressionType.LZ4_FRAME: - return new Lz4CompressionCodec(); - default: - throw new IllegalArgumentException("Compression type not supported: " + compressionType); - } - } - /** * Process compression by compressing the buffer as is. */ diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java index 2206844e1244e..1e9d5c2445ce8 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java @@ -51,4 +51,17 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) public String getCodecName() { return "default"; } + + /** + * The default factory that creates a {@link NoCompressionCodec}. + */ + public static class Factory implements CompressionCodec.Factory { + + public static final NoCompressionCodec.Factory INSTANCE = new NoCompressionCodec.Factory(); + + @Override + public CompressionCodec createCodec(byte codecType) { + return NoCompressionCodec.INSTANCE; + } + } } From 9394c0f64a60579c59e4eb618c9ddfa16ba6be5a Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Mon, 1 Mar 2021 12:53:22 +0800 Subject: [PATCH 08/10] ARROW-10880: [Java] Enable integration tests --- dev/archery/archery/integration/runner.py | 3 +- .../CommonsCompressionFactory.java | 2 + java/tools/pom.xml | 5 +++ .../org/apache/arrow/tools/Integration.java | 3 +- .../org/apache/arrow/tools/StreamToFile.java | 3 +- .../org/apache/arrow/vector/VectorLoader.java | 36 +++++++++++++++- .../arrow/vector/ipc/ArrowFileReader.java | 16 ++++++- .../apache/arrow/vector/ipc/ArrowReader.java | 19 ++++++++- .../arrow/vector/ipc/ArrowStreamReader.java | 42 ++++++++++++++++++- 9 files changed, 119 insertions(+), 10 deletions(-) diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 95394cdd37d02..819f978a862c1 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -135,9 +135,10 @@ def _gold_tests(self, gold_dir): skip.add("Rust") if prefix == '2.0.0-compression': skip.add("Go") - skip.add("Java") skip.add("JS") skip.add("Rust") + if name == 'zstd': + skip.add("Java") yield datagen.File(name, None, None, skip=skip, path=out_path) def _run_test_cases(self, producer, consumer, case_runner, diff --git a/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java b/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java index 800036210e60d..f93cca83836f8 100644 --- a/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java +++ b/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java @@ -26,6 +26,8 @@ */ public class CommonsCompressionFactory implements CompressionCodec.Factory { + public static final CommonsCompressionFactory INSTANCE = new CommonsCompressionFactory(); + @Override public CompressionCodec createCodec(byte codecType) { switch (codecType) { diff --git a/java/tools/pom.xml b/java/tools/pom.xml index 07fd5ac55524d..375d243515471 100644 --- a/java/tools/pom.xml +++ b/java/tools/pom.xml @@ -32,6 +32,11 @@ ${project.version} ${arrow.vector.classifier} + + org.apache.arrow + arrow-compression + ${project.version} + com.google.guava guava diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java index cd374a97d7103..1db3eeb6449ee 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; +import org.apache.arrow.compression.CommonsCompressionFactory; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; @@ -186,7 +187,7 @@ public void execute(File arrowFile, File jsonFile) throws IOException { JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator); FileInputStream fileInputStream = new FileInputStream(arrowFile); ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), - allocator)) { + allocator, CommonsCompressionFactory.INSTANCE)) { Schema jsonSchema = jsonReader.start(); VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot(); Schema arrowSchema = arrowRoot.getSchema(); diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java index bcf4d1de4d245..6bd3c2fba2ff5 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java @@ -25,6 +25,7 @@ import java.io.OutputStream; import java.nio.channels.Channels; +import org.apache.arrow.compression.CommonsCompressionFactory; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; @@ -40,7 +41,7 @@ public class StreamToFile { */ public static void convert(InputStream in, OutputStream out) throws IOException { BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); - try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) { + try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator, CommonsCompressionFactory.INSTANCE)) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); // load the first batch before instantiating the writer so that we have any dictionaries. // Only writeBatches if we load the first one. diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 5a3e712cb88e3..4f3d25710a3a4 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -40,6 +40,18 @@ public class VectorLoader { private final CompressionCodec.Factory factory; + /** + * A flag indicating if decompression actually performed. + * This will affect the behavior of releasing buffers. + */ + private boolean decompressionPerformed; + + /** + * Decompressed buffers. Such buffers are generated during decompression, + * so they need to be released explicitly. + */ + private List decompressedBuffers = new ArrayList<>(); + /** * Construct with a root to load and will create children in root based on schema. * @@ -70,6 +82,7 @@ public void load(ArrowRecordBatch recordBatch) { Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); CompressionCodec codec = factory.createCodec(recordBatch.getBodyCompression().getCodec()); + decompressionPerformed = !(codec instanceof NoCompressionCodec); for (FieldVector fieldVector : root.getFieldVectors()) { loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); } @@ -92,7 +105,12 @@ private void loadBuffers( List ownBuffers = new ArrayList<>(bufferLayoutCount); for (int j = 0; j < bufferLayoutCount; j++) { ArrowBuf nextBuf = buffers.next(); - ownBuffers.add(codec.decompress(vector.getAllocator(), nextBuf)); + // for vectors without nulls, the buffer is empty, so there is no need to decompress it. + ArrowBuf bufferToAdd = nextBuf.writerIndex() > 0 ? codec.decompress(vector.getAllocator(), nextBuf) : nextBuf; + ownBuffers.add(bufferToAdd); + if (decompressionPerformed) { + decompressedBuffers.add(bufferToAdd); + } } try { vector.loadFieldBuffers(fieldNode, ownBuffers); @@ -114,4 +132,20 @@ private void loadBuffers( } } + /** + * Checks if decompression actually performed. + */ + public boolean isDecompressionPerformed() { + return decompressionPerformed; + } + + /** + * Release decompressed buffers. + */ + public void releaseDecompressedBuffers() { + for (ArrowBuf buf : decompressedBuffers) { + buf.close(); + } + decompressedBuffers.clear(); + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java index 9267b0a35642f..f4e9e0db1e5f7 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java @@ -28,6 +28,8 @@ import org.apache.arrow.flatbuf.Footer; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.VisibleForTesting; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowBlock; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; import org.apache.arrow.vector.ipc.message.ArrowFooter; @@ -51,11 +53,21 @@ public class ArrowFileReader extends ArrowReader { private int currentDictionaryBatch = 0; private int currentRecordBatch = 0; - public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) { - super(allocator); + public ArrowFileReader( + SeekableReadChannel in, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { + super(allocator, compressionFactory); this.in = in; } + public ArrowFileReader( + SeekableByteChannel in, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { + this(new SeekableReadChannel(in), allocator, compressionFactory); + } + + public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) { + this(in, allocator, NoCompressionCodec.Factory.INSTANCE); + } + public ArrowFileReader(SeekableByteChannel in, BufferAllocator allocator) { this(new SeekableReadChannel(in), allocator); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java index 3408c4541d2a0..06a4103d54aec 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java @@ -28,6 +28,8 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; @@ -49,8 +51,15 @@ public abstract class ArrowReader implements DictionaryProvider, AutoCloseable { protected Map dictionaries; private boolean initialized = false; + private final CompressionCodec.Factory compressionFactory; + protected ArrowReader(BufferAllocator allocator) { + this(allocator, NoCompressionCodec.Factory.INSTANCE); + } + + protected ArrowReader(BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { this.allocator = allocator; + this.compressionFactory = compressionFactory; } /** @@ -181,7 +190,7 @@ protected void initialize() throws IOException { Schema schema = new Schema(fields, originalSchema.getCustomMetadata()); this.root = new VectorSchemaRoot(schema, vectors, 0); - this.loader = new VectorLoader(root); + this.loader = new VectorLoader(root, compressionFactory); this.dictionaries = Collections.unmodifiableMap(dictionaries); } @@ -204,7 +213,13 @@ protected void loadRecordBatch(ArrowRecordBatch batch) { try { loader.load(batch); } finally { - batch.close(); + // if decompression performed, the buffer has been released during + // decompression, so there is no need to release again + if (!loader.isDecompressionPerformed()) { + batch.close(); + } else { + loader.releaseDecompressedBuffers(); + } } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java index ad9f40e3c788d..a0096aaf3ee56 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java @@ -26,6 +26,8 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.ipc.message.MessageChannelReader; @@ -50,12 +52,36 @@ public class ArrowStreamReader extends ArrowReader { * * @param messageReader reader used to get messages from a ReadChannel * @param allocator to allocate new buffers + * @param compressionFactory the factory to create compression codec. */ - public ArrowStreamReader(MessageChannelReader messageReader, BufferAllocator allocator) { - super(allocator); + public ArrowStreamReader( + MessageChannelReader messageReader, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { + super(allocator, compressionFactory); this.messageReader = messageReader; } + /** + * Constructs a streaming reader using a MessageChannelReader. Non-blocking. + * + * @param messageReader reader used to get messages from a ReadChannel + * @param allocator to allocate new buffers + */ + public ArrowStreamReader(MessageChannelReader messageReader, BufferAllocator allocator) { + this(messageReader, allocator, NoCompressionCodec.Factory.INSTANCE); + } + + /** + * Constructs a streaming reader from a ReadableByteChannel input. Non-blocking. + * + * @param in ReadableByteChannel to read messages from + * @param allocator to allocate new buffers + * @param compressionFactory the factory to create compression codec. + */ + public ArrowStreamReader( + ReadableByteChannel in, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { + this(new MessageChannelReader(new ReadChannel(in), allocator), allocator, compressionFactory); + } + /** * Constructs a streaming reader from a ReadableByteChannel input. Non-blocking. * @@ -66,6 +92,18 @@ public ArrowStreamReader(ReadableByteChannel in, BufferAllocator allocator) { this(new MessageChannelReader(new ReadChannel(in), allocator), allocator); } + /** + * Constructs a streaming reader from a ReadableByteChannel input. Non-blocking. + * + * @param in InputStream to read messages from + * @param allocator to allocate new buffers + * @param compressionFactory the factory to create compression codec. + */ + public ArrowStreamReader( + InputStream in, BufferAllocator allocator, CompressionCodec.Factory compressionFactory) { + this(Channels.newChannel(in), allocator, compressionFactory); + } + /** * Constructs a streaming reader from an InputStream. Non-blocking. * From 04d60a7bb96b02c50e91395cd68ce3ed26d22d80 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Mon, 8 Mar 2021 12:40:00 +0800 Subject: [PATCH 09/10] ARROW-10880: [Java] Resolve comments --- .../CommonsCompressionFactory.java | 10 ++-- .../compression/Lz4CompressionCodec.java | 7 ++- .../org/apache/arrow/vector/VectorLoader.java | 44 ++++++--------- .../vector/compression/CompressionCodec.java | 4 +- .../vector/compression/CompressionUtil.java | 53 ++++++++++++++----- .../compression/NoCompressionCodec.java | 4 +- .../apache/arrow/vector/ipc/ArrowReader.java | 8 +-- .../vector/ipc/message/ArrowRecordBatch.java | 4 +- 8 files changed, 68 insertions(+), 66 deletions(-) diff --git a/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java b/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java index f93cca83836f8..4becbbe78c964 100644 --- a/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java +++ b/java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java @@ -17,9 +17,8 @@ package org.apache.arrow.compression; -import org.apache.arrow.flatbuf.CompressionType; import org.apache.arrow.vector.compression.CompressionCodec; -import org.apache.arrow.vector.compression.NoCompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; /** * A factory implementation based on Apache Commons library. @@ -29,15 +28,12 @@ public class CommonsCompressionFactory implements CompressionCodec.Factory { public static final CommonsCompressionFactory INSTANCE = new CommonsCompressionFactory(); @Override - public CompressionCodec createCodec(byte codecType) { + public CompressionCodec createCodec(CompressionUtil.CodecType codecType) { switch (codecType) { - case NoCompressionCodec.COMPRESSION_TYPE: - return NoCompressionCodec.INSTANCE; - case CompressionType.LZ4_FRAME: + case LZ4_FRAME: return new Lz4CompressionCodec(); default: throw new IllegalArgumentException("Compression type not supported: " + codecType); } } - } diff --git a/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java b/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java index a32dc175214d5..b97d259cbb3a1 100644 --- a/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java +++ b/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java @@ -61,7 +61,7 @@ public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) if (compressedLength > uncompressedBuffer.writerIndex()) { // compressed buffer is larger, send the raw buffer compressedBuffer.close(); - compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + compressedBuffer = CompressionUtil.packageRawBuffer(allocator, uncompressedBuffer); } uncompressedBuffer.close(); @@ -88,8 +88,7 @@ private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuff if (!MemoryUtil.LITTLE_ENDIAN) { uncompressedLength = Long.reverseBytes(uncompressedLength); } - // first 8 bytes reserved for uncompressed length, to be consistent with the - // C++ implementation. + // first 8 bytes reserved for uncompressed length, according to the specification compressedBuffer.setLong(0, uncompressedLength); PlatformDependent.copyMemory( @@ -119,7 +118,7 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) { // no compression - return CompressionUtil.decompressRawBuffer(compressedBuffer); + return CompressionUtil.extractUncompressedBuffer(compressedBuffer); } try { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 4f3d25710a3a4..24be349c2820d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -26,6 +26,7 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.util.Collections2; import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -41,16 +42,10 @@ public class VectorLoader { private final CompressionCodec.Factory factory; /** - * A flag indicating if decompression actually performed. + * A flag indicating if decompression is needed. * This will affect the behavior of releasing buffers. */ - private boolean decompressionPerformed; - - /** - * Decompressed buffers. Such buffers are generated during decompression, - * so they need to be released explicitly. - */ - private List decompressedBuffers = new ArrayList<>(); + private boolean decompressionNeeded; /** * Construct with a root to load and will create children in root based on schema. @@ -81,8 +76,10 @@ public VectorLoader(VectorSchemaRoot root, CompressionCodec.Factory factory) { public void load(ArrowRecordBatch recordBatch) { Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); - CompressionCodec codec = factory.createCodec(recordBatch.getBodyCompression().getCodec()); - decompressionPerformed = !(codec instanceof NoCompressionCodec); + CompressionUtil.CodecType codecType = + CompressionUtil.CodecType.fromCompressionType(recordBatch.getBodyCompression().getCodec()); + decompressionNeeded = codecType != CompressionUtil.CodecType.NO_COMPRESSION; + CompressionCodec codec = decompressionNeeded ? factory.createCodec(codecType) : NoCompressionCodec.INSTANCE; for (FieldVector fieldVector : root.getFieldVectors()) { loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); } @@ -108,12 +105,18 @@ private void loadBuffers( // for vectors without nulls, the buffer is empty, so there is no need to decompress it. ArrowBuf bufferToAdd = nextBuf.writerIndex() > 0 ? codec.decompress(vector.getAllocator(), nextBuf) : nextBuf; ownBuffers.add(bufferToAdd); - if (decompressionPerformed) { - decompressedBuffers.add(bufferToAdd); + if (decompressionNeeded) { + // decompression performed + nextBuf.getReferenceManager().retain(); } } try { vector.loadFieldBuffers(fieldNode, ownBuffers); + if (decompressionNeeded) { + for (ArrowBuf buf : ownBuffers) { + buf.close(); + } + } } catch (RuntimeException e) { throw new IllegalArgumentException("Could not load buffers for field " + field + ". error message: " + e.getMessage(), e); @@ -131,21 +134,4 @@ private void loadBuffers( } } } - - /** - * Checks if decompression actually performed. - */ - public boolean isDecompressionPerformed() { - return decompressionPerformed; - } - - /** - * Release decompressed buffers. - */ - public void releaseDecompressedBuffers() { - for (ArrowBuf buf : decompressedBuffers) { - buf.close(); - } - decompressedBuffers.clear(); - } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java index d5ef902741e54..ac796235d7782 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java @@ -30,7 +30,7 @@ public interface CompressionCodec { * @param allocator the allocator for allocating memory for compressed buffer. * @param uncompressedBuffer the buffer to compress. * Implementation of this method should take care of releasing this buffer. - * @return the compressed buffer. + * @return the compressed buffer */ ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer); @@ -57,6 +57,6 @@ interface Factory { /** * Creates the codec based on the codec type. */ - CompressionCodec createCodec(byte codecType); + CompressionCodec createCodec(CompressionUtil.CodecType codecType); } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index aa0c8c716256a..ccc70be551086 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -28,6 +28,40 @@ */ public class CompressionUtil { + /** + * Compression codec types corresponding to flat buffer implementation in {@link CompressionType}. + */ + public enum CodecType { + + NO_COMPRESSION(NoCompressionCodec.COMPRESSION_TYPE), + + LZ4_FRAME(org.apache.arrow.flatbuf.CompressionType.LZ4_FRAME), + + ZSTD(org.apache.arrow.flatbuf.CompressionType.ZSTD); + + private final byte type; + + CodecType(byte type) { + this.type = type; + } + + public byte getType() { + return type; + } + + /** + * Gets the codec type from the compression type defined in {@link CompressionType}. + */ + public static CodecType fromCompressionType(byte type) { + for (CodecType codecType : values()) { + if (codecType.type == type) { + return codecType; + } + } + return NO_COMPRESSION; + } + } + public static final long SIZE_OF_UNCOMPRESSED_LENGTH = 8L; /** @@ -41,25 +75,18 @@ private CompressionUtil() { /** * Creates the {@link ArrowBodyCompression} object, given the {@link CompressionCodec}. - * The implementation of this method should depend on the values of {@link CompressionType#names}. + * The implementation of this method should depend on the values of + * {@link org.apache.arrow.flatbuf.CompressionType#names}. */ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) { - switch (codec.getCodecName()) { - case "default": - return NoCompressionCodec.DEFAULT_BODY_COMPRESSION; - case "LZ4_FRAME": - return new ArrowBodyCompression(CompressionType.LZ4_FRAME, BodyCompressionMethod.BUFFER); - case "ZSTD": - return new ArrowBodyCompression(CompressionType.ZSTD, BodyCompressionMethod.BUFFER); - default: - throw new IllegalArgumentException("Unknown codec: " + codec.getCodecName()); - } + CodecType type = CodecType.valueOf(codec.getCodecName()); + return new ArrowBodyCompression(type.getType(), BodyCompressionMethod.BUFFER); } /** * Process compression by compressing the buffer as is. */ - public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) { + public static ArrowBuf packageRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) { ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex()); compressedBuffer.setLong(0, NO_COMPRESSION_LENGTH); compressedBuffer.setBytes(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer, 0, inputBuffer.writerIndex()); @@ -70,7 +97,7 @@ public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inp /** * Process decompression by decompressing the buffer as is. */ - public static ArrowBuf decompressRawBuffer(ArrowBuf inputBuffer) { + public static ArrowBuf extractUncompressedBuffer(ArrowBuf inputBuffer) { return inputBuffer.slice(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java index 1e9d5c2445ce8..71bea691c3528 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java @@ -49,7 +49,7 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) @Override public String getCodecName() { - return "default"; + return "NO_COMPRESSION"; } /** @@ -60,7 +60,7 @@ public static class Factory implements CompressionCodec.Factory { public static final NoCompressionCodec.Factory INSTANCE = new NoCompressionCodec.Factory(); @Override - public CompressionCodec createCodec(byte codecType) { + public CompressionCodec createCodec(CompressionUtil.CodecType codecType) { return NoCompressionCodec.INSTANCE; } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java index 06a4103d54aec..9d940deecfe20 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java @@ -213,13 +213,7 @@ protected void loadRecordBatch(ArrowRecordBatch batch) { try { loader.load(batch); } finally { - // if decompression performed, the buffer has been released during - // decompression, so there is no need to release again - if (!loader.isDecompressionPerformed()) { - batch.close(); - } else { - loader.releaseDecompressedBuffers(); - } + batch.close(); } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index 6fa2fb1a72de9..dbf2774fba839 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -194,14 +194,14 @@ public int writeTo(FlatBufferBuilder builder) { RecordBatch.startBuffersVector(builder, buffers.size()); int buffersOffset = FBSerializables.writeAllStructsToVector(builder, buffersLayout); int compressOffset = 0; - if (bodyCompression != null && bodyCompression != NoCompressionCodec.DEFAULT_BODY_COMPRESSION) { + if (bodyCompression.getCodec() != NoCompressionCodec.COMPRESSION_TYPE) { compressOffset = bodyCompression.writeTo(builder); } RecordBatch.startRecordBatch(builder); RecordBatch.addLength(builder, length); RecordBatch.addNodes(builder, nodesOffset); RecordBatch.addBuffers(builder, buffersOffset); - if (bodyCompression != null && bodyCompression != NoCompressionCodec.DEFAULT_BODY_COMPRESSION) { + if (bodyCompression.getCodec() != NoCompressionCodec.COMPRESSION_TYPE) { RecordBatch.addCompression(builder, compressOffset); } return RecordBatch.endRecordBatch(builder); From d4a6807ad71a39deff10c09ed7525fe35559fb73 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Wed, 17 Mar 2021 16:26:34 +0800 Subject: [PATCH 10/10] ARROW-10880: [Java] Resolve more comments --- java/compression/pom.xml | 5 ----- .../org/apache/arrow/compression/Lz4CompressionCodec.java | 5 ++--- .../org/apache/arrow/compression/TestCompressionCodec.java | 7 ++++--- .../apache/arrow/vector/compression/CompressionCodec.java | 6 +++--- .../apache/arrow/vector/compression/CompressionUtil.java | 5 ++--- .../arrow/vector/compression/NoCompressionCodec.java | 4 ++-- 6 files changed, 13 insertions(+), 19 deletions(-) diff --git a/java/compression/pom.xml b/java/compression/pom.xml index 009d60665adc8..9a6ab3508ed40 100644 --- a/java/compression/pom.xml +++ b/java/compression/pom.xml @@ -21,11 +21,6 @@ (Experimental/Contrib) A library for working with the compression/decompression of Arrow data. - - org.apache.arrow - arrow-format - ${project.version} - org.apache.arrow arrow-vector diff --git a/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java b/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java index b97d259cbb3a1..af34a8fdd706f 100644 --- a/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java +++ b/java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java @@ -23,7 +23,6 @@ import java.io.InputStream; import java.io.OutputStream; -import org.apache.arrow.flatbuf.CompressionType; import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.util.MemoryUtil; @@ -152,7 +151,7 @@ private ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuff } @Override - public String getCodecName() { - return CompressionType.name(CompressionType.LZ4_FRAME); + public CompressionUtil.CodecType getCodecType() { + return CompressionUtil.CodecType.LZ4_FRAME; } } diff --git a/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java index e741066d43094..52f24e20533ea 100644 --- a/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java +++ b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java @@ -33,6 +33,7 @@ import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.compression.CompressionCodec; +import org.apache.arrow.vector.compression.CompressionUtil; import org.apache.arrow.vector.compression.NoCompressionCodec; import org.apache.arrow.vector.ipc.message.ArrowFieldNode; import org.junit.After; @@ -63,7 +64,7 @@ public void terminate() { allocator.close(); } - public TestCompressionCodec(String name, int vectorLength, CompressionCodec codec) { + public TestCompressionCodec(CompressionUtil.CodecType type, int vectorLength, CompressionCodec codec) { this.codec = codec; this.vectorLength = vectorLength; } @@ -75,10 +76,10 @@ public static Collection getCodecs() { int[] lengths = new int[] {10, 100, 1000}; for (int len : lengths) { CompressionCodec dumbCodec = NoCompressionCodec.INSTANCE; - params.add(new Object[]{dumbCodec.getCodecName(), len, dumbCodec}); + params.add(new Object[]{dumbCodec.getCodecType(), len, dumbCodec}); CompressionCodec lz4Codec = new Lz4CompressionCodec(); - params.add(new Object[]{lz4Codec.getCodecName(), len, lz4Codec}); + params.add(new Object[]{lz4Codec.getCodecType(), len, lz4Codec}); } return params; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java index ac796235d7782..a6dd8b51fe554 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java @@ -44,10 +44,10 @@ public interface CompressionCodec { ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer); /** - * Gets the name of the codec. - * @return the name of the codec. + * Gets the type of the codec. + * @return the type of the codec. */ - String getCodecName(); + CompressionUtil.CodecType getCodecType(); /** * Factory to create compression codec. diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index ccc70be551086..1deb38c84da05 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -79,8 +79,7 @@ private CompressionUtil() { * {@link org.apache.arrow.flatbuf.CompressionType#names}. */ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec) { - CodecType type = CodecType.valueOf(codec.getCodecName()); - return new ArrowBodyCompression(type.getType(), BodyCompressionMethod.BUFFER); + return new ArrowBodyCompression(codec.getCodecType().getType(), BodyCompressionMethod.BUFFER); } /** @@ -95,7 +94,7 @@ public static ArrowBuf packageRawBuffer(BufferAllocator allocator, ArrowBuf inpu } /** - * Process decompression by decompressing the buffer as is. + * Process decompression by slicing the buffer that contains the uncompressed bytes. */ public static ArrowBuf extractUncompressedBuffer(ArrowBuf inputBuffer) { return inputBuffer.slice(SIZE_OF_UNCOMPRESSED_LENGTH, diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java index 71bea691c3528..e5e8e9d463bd0 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/NoCompressionCodec.java @@ -48,8 +48,8 @@ public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) } @Override - public String getCodecName() { - return "NO_COMPRESSION"; + public CompressionUtil.CodecType getCodecType() { + return CompressionUtil.CodecType.NO_COMPRESSION; } /**