From 39082230b384275b6a308b95578eaecd8d33332f Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 16 Oct 2024 08:47:19 -0400 Subject: [PATCH] Address code review comments Signed-off-by: Andriy Redko --- .../Lucene912QatStoredFieldsFormat.java | 11 +- .../customcodecs/QatCompressionMode.java | 20 +- .../Lucene99QatStoredFieldsFormat.java | 13 +- .../lucene99/QatCompressionMode.java | 206 ------------------ .../QatDeflateCompressorTests.java | 6 +- .../customcodecs/QatLz4CompressorTests.java | 6 +- .../Lucene99QatStoredFieldsFormatTests.java | 1 + 7 files changed, 41 insertions(+), 222 deletions(-) delete mode 100644 src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/lucene99/QatCompressionMode.java diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/Lucene912QatStoredFieldsFormat.java b/src/main/java/org/opensearch/index/codec/customcodecs/Lucene912QatStoredFieldsFormat.java index cf4ff21..819c8e6 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/Lucene912QatStoredFieldsFormat.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/Lucene912QatStoredFieldsFormat.java @@ -87,7 +87,7 @@ public Lucene912QatStoredFieldsFormat(Lucene912QatCodec.Mode mode, Supplier supplier) { this.mode = Objects.requireNonNull(mode); - qatCompressionMode = new QatCompressionMode(mode, compressionLevel, supplier); + qatCompressionMode = new QatCompressionMode(getAlgorithm(mode), compressionLevel, supplier); } /** @@ -176,4 +176,13 @@ public Lucene912QatCodec.Mode getMode() { public QatCompressionMode getCompressionMode() { return qatCompressionMode; } + + /** + * Returns {@link QatZipper.Algorithm} instance that corresponds codec's {@link Lucene912QatCodec.Mode mode} + * @param mode codec's {@link Lucene912QatCodec.Mode mode} + * @return the {@link QatZipper.Algorithm} instance that corresponds codec's {@link Lucene912QatCodec.Mode mode} + */ + private static QatZipper.Algorithm getAlgorithm(Lucene912QatCodec.Mode mode) { + return (mode == Lucene912QatCodec.Mode.QAT_LZ4) ? QatZipper.Algorithm.LZ4 : QatZipper.Algorithm.DEFLATE; + } } diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/QatCompressionMode.java b/src/main/java/org/opensearch/index/codec/customcodecs/QatCompressionMode.java index 96800c5..86046c6 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/QatCompressionMode.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/QatCompressionMode.java @@ -36,37 +36,37 @@ public class QatCompressionMode extends CompressionMode { /** default constructor */ protected QatCompressionMode() { - this(Lucene912QatCodec.DEFAULT_COMPRESSION_MODE, DEFAULT_COMPRESSION_LEVEL, () -> { return DEFAULT_QAT_MODE; }); + this(QatZipper.Algorithm.LZ4, DEFAULT_COMPRESSION_LEVEL, () -> { return DEFAULT_QAT_MODE; }); } /** * Creates a new instance. * - * @param mode The compression mode (QAT_LZ4 or QAT_DEFLATE) + * @param algorithm The compression algorithm (LZ4 or DEFLATE) */ - protected QatCompressionMode(Lucene912QatCodec.Mode mode) { - this(mode, DEFAULT_COMPRESSION_LEVEL, () -> { return DEFAULT_QAT_MODE; }); + protected QatCompressionMode(QatZipper.Algorithm algorithm) { + this(algorithm, DEFAULT_COMPRESSION_LEVEL, () -> { return DEFAULT_QAT_MODE; }); } /** * Creates a new instance. * - * @param mode The compression mode (QAT_LZ4 or QAT_DEFLATE) + * @param algorithm The compression algorithm (LZ4 or DEFLATE) * @param compressionLevel The compression level to use. */ - protected QatCompressionMode(Lucene912QatCodec.Mode mode, int compressionLevel) { - this(mode, compressionLevel, () -> { return DEFAULT_QAT_MODE; }); + protected QatCompressionMode(QatZipper.Algorithm algorithm, int compressionLevel) { + this(algorithm, compressionLevel, () -> { return DEFAULT_QAT_MODE; }); } /** * Creates a new instance. * - * @param mode The compression mode (QAT_LZ4 or QAT_DEFLATE) + * @param algorithm The compression algorithm (LZ4 or DEFLATE) * @param compressionLevel The compression level to use. * @param supplier a supplier for QAT acceleration mode. */ - protected QatCompressionMode(Lucene912QatCodec.Mode mode, int compressionLevel, Supplier supplier) { - this.algorithm = mode == Lucene912QatCodec.Mode.QAT_LZ4 ? QatZipper.Algorithm.LZ4 : QatZipper.Algorithm.DEFLATE; + protected QatCompressionMode(QatZipper.Algorithm algorithm, int compressionLevel, Supplier supplier) { + this.algorithm = algorithm; this.compressionLevel = compressionLevel; this.supplier = supplier; } diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/lucene99/Lucene99QatStoredFieldsFormat.java b/src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/lucene99/Lucene99QatStoredFieldsFormat.java index 3782bee..d2f4103 100644 --- a/src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/lucene99/Lucene99QatStoredFieldsFormat.java +++ b/src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/lucene99/Lucene99QatStoredFieldsFormat.java @@ -17,6 +17,7 @@ import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; +import org.opensearch.index.codec.customcodecs.QatCompressionMode; import java.io.IOException; import java.util.Objects; @@ -84,7 +85,8 @@ public Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode mode, Supplier supplier) { this.mode = Objects.requireNonNull(mode); - qatCompressionMode = new QatCompressionMode(mode, compressionLevel, supplier); + qatCompressionMode = new QatCompressionMode(getAlgorithm(mode), compressionLevel, supplier) { + }; } /** @@ -173,4 +175,13 @@ public Lucene99QatCodec.Mode getMode() { public QatCompressionMode getCompressionMode() { return qatCompressionMode; } + + /** + * Returns {@link QatZipper.Algorithm} instance that corresponds codec's {@link Lucene99QatCodec.Mode mode} + * @param mode codec's {@link Lucene99QatCodec.Mode mode} + * @return the {@link QatZipper.Algorithm} instance that corresponds codec's {@link Lucene99QatCodec.Mode mode} + */ + private static QatZipper.Algorithm getAlgorithm(Lucene99QatCodec.Mode mode) { + return (mode == Lucene99QatCodec.Mode.QAT_LZ4) ? QatZipper.Algorithm.LZ4 : QatZipper.Algorithm.DEFLATE; + } } diff --git a/src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/lucene99/QatCompressionMode.java b/src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/lucene99/QatCompressionMode.java deleted file mode 100644 index 9bdd0e3..0000000 --- a/src/main/java/org/opensearch/index/codec/customcodecs/backward_codecs/lucene99/QatCompressionMode.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.codec.customcodecs.backward_codecs.lucene99; - -import org.apache.lucene.codecs.compressing.CompressionMode; -import org.apache.lucene.codecs.compressing.Compressor; -import org.apache.lucene.codecs.compressing.Decompressor; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.store.DataInput; -import org.apache.lucene.store.DataOutput; -import org.apache.lucene.util.ArrayUtil; -import org.apache.lucene.util.BytesRef; -import org.opensearch.index.codec.customcodecs.QatZipperFactory; - -import java.io.IOException; -import java.util.function.Supplier; - -import com.intel.qat.QatZipper; - -/** QatCompressionMode offers QAT_LZ4 and QAT_DEFLATE compressors. */ -public class QatCompressionMode extends CompressionMode { - - private static final int NUM_SUB_BLOCKS = 10; - - private final QatZipper.Algorithm algorithm; - private final int compressionLevel; - private final Supplier supplier; - - /** default constructor */ - protected QatCompressionMode() { - this(Lucene99QatCodec.DEFAULT_COMPRESSION_MODE, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL, () -> { - return Lucene99QatCodec.DEFAULT_QAT_MODE; - }); - } - - /** - * Creates a new instance. - * - * @param mode The compression mode (QAT_LZ4 or QAT_DEFLATE) - */ - protected QatCompressionMode(Lucene99QatCodec.Mode mode) { - this(mode, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL, () -> { return Lucene99QatCodec.DEFAULT_QAT_MODE; }); - } - - /** - * Creates a new instance. - * - * @param mode The compression mode (QAT_LZ4 or QAT_DEFLATE) - * @param compressionLevel The compression level to use. - */ - protected QatCompressionMode(Lucene99QatCodec.Mode mode, int compressionLevel) { - this(mode, compressionLevel, () -> { return Lucene99QatCodec.DEFAULT_QAT_MODE; }); - } - - /** - * Creates a new instance. - * - * @param mode The compression mode (QAT_LZ4 or QAT_DEFLATE) - * @param compressionLevel The compression level to use. - * @param supplier a supplier for QAT acceleration mode. - */ - protected QatCompressionMode(Lucene99QatCodec.Mode mode, int compressionLevel, Supplier supplier) { - this.algorithm = mode == Lucene99QatCodec.Mode.QAT_LZ4 ? QatZipper.Algorithm.LZ4 : QatZipper.Algorithm.DEFLATE; - this.compressionLevel = compressionLevel; - this.supplier = supplier; - } - - @Override - public Compressor newCompressor() { - return new QatCompressor(algorithm, compressionLevel, supplier.get()); - } - - @Override - public Decompressor newDecompressor() { - return new QatDecompressor(algorithm, supplier.get()); - } - - public int getCompressionLevel() { - return compressionLevel; - } - - /** The QatCompressor. */ - private static final class QatCompressor extends Compressor { - - private byte[] compressedBuffer; - private final QatZipper qatZipper; - - /** compressor with a given compresion level */ - public QatCompressor(QatZipper.Algorithm algorithm, int compressionLevel, QatZipper.Mode qatMode) { - compressedBuffer = BytesRef.EMPTY_BYTES; - qatZipper = QatZipperFactory.createInstance(algorithm, compressionLevel, qatMode, QatZipper.PollingMode.PERIODICAL); - } - - private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { - assert offset >= 0 : "Offset value must be greater than 0."; - - int blockLength = (length + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; - out.writeVInt(blockLength); - - final int end = offset + length; - assert end >= 0 : "Buffer read size must be greater than 0."; - - for (int start = offset; start < end; start += blockLength) { - int l = Math.min(blockLength, end - start); - - if (l == 0) { - out.writeVInt(0); - return; - } - - final int maxCompressedLength = qatZipper.maxCompressedLength(l); - compressedBuffer = ArrayUtil.grow(compressedBuffer, maxCompressedLength); - - int compressedSize = qatZipper.compress(bytes, start, l, compressedBuffer, 0, compressedBuffer.length); - out.writeVInt(compressedSize); - out.writeBytes(compressedBuffer, compressedSize); - } - } - - @Override - public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { - final int length = (int) buffersInput.size(); - byte[] bytes = new byte[length]; - buffersInput.readBytes(bytes, 0, length); - compress(bytes, 0, length, out); - } - - @Override - public void close() throws IOException {} - } - - /** QAT_DEFLATE decompressor */ - private static final class QatDecompressor extends Decompressor { - - private byte[] compressed; - private final QatZipper qatZipper; - private final QatZipper.Mode qatMode; - private final QatZipper.Algorithm algorithm; - - /** default decompressor */ - public QatDecompressor(QatZipper.Algorithm algorithm, QatZipper.Mode qatMode) { - this.algorithm = algorithm; - this.qatMode = qatMode; - compressed = BytesRef.EMPTY_BYTES; - qatZipper = QatZipperFactory.createInstance(algorithm, qatMode, QatZipper.PollingMode.PERIODICAL); - } - - /*resuable decompress function*/ - @Override - public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { - assert offset + length <= originalLength : "Buffer read size must be within limit."; - - if (length == 0) { - bytes.length = 0; - return; - } - - final int blockLength = in.readVInt(); - bytes.offset = bytes.length = 0; - int offsetInBlock = 0; - int offsetInBytesRef = offset; - - // Skip unneeded blocks - while (offsetInBlock + blockLength < offset) { - final int compressedLength = in.readVInt(); - in.skipBytes(compressedLength); - offsetInBlock += blockLength; - offsetInBytesRef -= blockLength; - } - - // Read blocks that intersect with the interval we need - while (offsetInBlock < offset + length) { - final int compressedLength = in.readVInt(); - if (compressedLength == 0) { - return; - } - compressed = ArrayUtil.grow(compressed, compressedLength); - in.readBytes(compressed, 0, compressedLength); - - int l = Math.min(blockLength, originalLength - offsetInBlock); - bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + l); - - final int uncompressed = qatZipper.decompress(compressed, 0, compressedLength, bytes.bytes, bytes.length, l); - - bytes.length += uncompressed; - offsetInBlock += blockLength; - } - - bytes.offset = offsetInBytesRef; - bytes.length = length; - - assert bytes.isValid() : "Decompression output is corrupted."; - } - - @Override - public Decompressor clone() { - return new QatDecompressor(algorithm, qatMode); - } - } -} diff --git a/src/test/java/org/opensearch/index/codec/customcodecs/QatDeflateCompressorTests.java b/src/test/java/org/opensearch/index/codec/customcodecs/QatDeflateCompressorTests.java index edecb0a..1cf21ed 100644 --- a/src/test/java/org/opensearch/index/codec/customcodecs/QatDeflateCompressorTests.java +++ b/src/test/java/org/opensearch/index/codec/customcodecs/QatDeflateCompressorTests.java @@ -10,6 +10,8 @@ import org.apache.lucene.codecs.compressing.Compressor; import org.apache.lucene.codecs.compressing.Decompressor; +import com.intel.qat.QatZipper; + import java.io.IOException; import static org.hamcrest.Matchers.is; @@ -21,13 +23,13 @@ public class QatDeflateCompressorTests extends AbstractCompressorTests { @Override Compressor compressor() { assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); - return new QatCompressionMode(Lucene912QatCodec.Mode.QAT_DEFLATE).newCompressor(); + return new QatCompressionMode(QatZipper.Algorithm.DEFLATE).newCompressor(); } @Override Decompressor decompressor() { assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); - return new QatCompressionMode(Lucene912QatCodec.Mode.QAT_DEFLATE).newDecompressor(); + return new QatCompressionMode(QatZipper.Algorithm.DEFLATE).newDecompressor(); } @Override diff --git a/src/test/java/org/opensearch/index/codec/customcodecs/QatLz4CompressorTests.java b/src/test/java/org/opensearch/index/codec/customcodecs/QatLz4CompressorTests.java index b11db58..2fc4044 100644 --- a/src/test/java/org/opensearch/index/codec/customcodecs/QatLz4CompressorTests.java +++ b/src/test/java/org/opensearch/index/codec/customcodecs/QatLz4CompressorTests.java @@ -10,6 +10,8 @@ import org.apache.lucene.codecs.compressing.Compressor; import org.apache.lucene.codecs.compressing.Decompressor; +import com.intel.qat.QatZipper; + import java.io.IOException; import static org.hamcrest.Matchers.is; @@ -21,13 +23,13 @@ public class QatLz4CompressorTests extends AbstractCompressorTests { @Override Compressor compressor() { assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); - return new QatCompressionMode(Lucene912QatCodec.Mode.QAT_LZ4).newCompressor(); + return new QatCompressionMode(QatZipper.Algorithm.LZ4).newCompressor(); } @Override Decompressor decompressor() { assumeThat("Qat library is available", QatZipperFactory.isQatAvailable(), is(true)); - return new QatCompressionMode(Lucene912QatCodec.Mode.QAT_LZ4).newDecompressor(); + return new QatCompressionMode(QatZipper.Algorithm.LZ4).newDecompressor(); } @Override diff --git a/src/test/java/org/opensearch/index/codec/customcodecs/backward_codecs/lucene99/Lucene99QatStoredFieldsFormatTests.java b/src/test/java/org/opensearch/index/codec/customcodecs/backward_codecs/lucene99/Lucene99QatStoredFieldsFormatTests.java index a70dc81..0f0697a 100644 --- a/src/test/java/org/opensearch/index/codec/customcodecs/backward_codecs/lucene99/Lucene99QatStoredFieldsFormatTests.java +++ b/src/test/java/org/opensearch/index/codec/customcodecs/backward_codecs/lucene99/Lucene99QatStoredFieldsFormatTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.codec.customcodecs.backward_codecs.lucene99; +import org.opensearch.index.codec.customcodecs.QatCompressionMode; import org.opensearch.index.codec.customcodecs.QatZipperFactory; import org.opensearch.test.OpenSearchTestCase;