Skip to content

Commit

Permalink
Partial LZ4 impl (wont work anyway)
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <petealft@amazon.com>
  • Loading branch information
Peter Alfonsi committed Jan 15, 2025
1 parent 4b392e1 commit 1ebdc0b
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.query;

import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.util.compress.LZ4;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CompressorPool {
private final ExecutorService executor;
private final ThreadLocal<LZ4Compressor> threadLocalCompressor;

public CompressorPool(int poolSize) {
this.executor = Executors.newFixedThreadPool(poolSize);
this.threadLocalCompressor = ThreadLocal.withInitial(LZ4Compressor::new);
}

public Future<byte[]> compress(byte[] data) {
return executor.submit(() -> {
LZ4Compressor compressor = threadLocalCompressor.get();
return compressor.compress(data);
});
}

public void shutdown() {
executor.shutdown();
}

public byte[] decompress(byte[] compressed) throws IOException {
// Decompression is threadsafe already (?)
ByteArrayDataInput in = new ByteArrayDataInput(compressed);
int originalLength = in.readInt();
byte[] result = new byte[originalLength];
LZ4.decompress(in, originalLength, result, 0);
return result;
}

static class LZ4Compressor {
private final static int scratchOverhead = 256;

private final LZ4.FastCompressionHashTable ht;

LZ4Compressor() {
ht = new LZ4.FastCompressionHashTable();
}

public byte[] compress(byte[] bytes) throws IOException {
byte[] result = new byte[bytes.length + 4 + scratchOverhead]; // We seem to need overhead? At least for short byte[]
ByteArrayDataOutput out = new ByteArrayDataOutput(result);
// Write the original length into out; we need it at decompression time
out.writeInt(bytes.length);
LZ4.compress(bytes, 0, bytes.length, out, ht);
int finalLength = out.getPosition();
byte[] trimmed = new byte[finalLength];
System.arraycopy(result, 0, trimmed, 0, finalLength);
return trimmed;
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
Expand All @@ -30,12 +32,20 @@
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BitDocIdSet;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.RoaringDocIdSet;
import org.apache.lucene.util.compress.LZ4;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.cache.CacheType;
Expand Down Expand Up @@ -65,6 +75,10 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -151,6 +165,7 @@ public class PluggableQueryCache implements QueryCache, OpenSearchQueryCache {
* The shard id dimension name.
*/
public static final String SHARD_ID_DIMENSION_NAME = "shards";
private final CompressorPool compressorPool;

// Is there any need for locks? The underlying TSC is threadsafe. I think the need for locks in original was due to LeafCache impl.

Expand Down Expand Up @@ -179,6 +194,7 @@ public PluggableQueryCache(
.ramBytesUsed();
this.removalListener = new TSCRemovalListener();
this.nextLeafCacheId = new AtomicInteger();
this.compressorPool = new CompressorPool(10);

this.innerCache = cacheService.createCache(
new CacheConfig.Builder<CompositeKey, CacheAndCount>().setSettings(settings)
Expand All @@ -190,7 +206,7 @@ public PluggableQueryCache(
// TODO: I dont know what to do with this. This shouldn't be hardcoded like that...
.setCachedResultParser((cacheAndCount) -> new CachedQueryResult.PolicyValues(1))
.setKeySerializer(keySerializer)
.setValueSerializer(new CacheAndCountSerializer())
.setValueSerializer(new CacheAndCountSerializer(compressorPool))
.setClusterSettings(clusterService.getClusterSettings())
.setStoragePath(nodeEnvironment.nodePaths()[0].path.toString() + "/query_cache")
.build(),
Expand Down Expand Up @@ -261,6 +277,7 @@ public void onClose(ShardId shardId) {
@Override
public void close() throws IOException {
innerCache.close();
compressorPool.shutdown();
}

@Override
Expand Down Expand Up @@ -838,6 +855,12 @@ static class CacheAndCountSerializer implements Serializer<CacheAndCount, byte[]

static final int BLOCK_SIZE = 1024;

private final CompressorPool compressorPool;

CacheAndCountSerializer(CompressorPool compressorPool) {
this.compressorPool = compressorPool;
}

@Override
public byte[] serialize(CacheAndCount object) {
if (object == null) return null;
Expand All @@ -846,8 +869,8 @@ public byte[] serialize(CacheAndCount object) {
os.writeVInt(object.count);
os.writeVInt(object.maxDoc);
serializeDocIdSet(object.cache, os);
return BytesReference.toBytes(os.bytes());
} catch (IOException e) {
return compressorPool.compress(BytesReference.toBytes(os.bytes())).get();
} catch (IOException | InterruptedException | ExecutionException e) {
logger.debug("Could not write CacheAndCount to byte[]");
throw new OpenSearchException(e);
}
Expand All @@ -857,7 +880,8 @@ public byte[] serialize(CacheAndCount object) {
public CacheAndCount deserialize(byte[] bytes) {
if (bytes == null) return null;
try {
BytesStreamInput is = new BytesStreamInput(bytes, 0, bytes.length);
byte[] decompressed = compressorPool.decompress(bytes);
BytesStreamInput is = new BytesStreamInput(decompressed, 0, decompressed.length);
int count = is.readVInt();
int maxDoc = is.readVInt();
DocIdSet cache = deserializeDocIdSet(is, maxDoc);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.query;

import org.opensearch.test.OpenSearchTestCase;

public class CompressorPoolTests extends OpenSearchTestCase {
public void testCompression() throws Exception {
CompressorPool pool = new CompressorPool(10);
byte[] orig = new byte[]{1, 2, 3, 4, 5, 6, 4, 5, 6, 7, 4, 5, 6, 7, 4, 5, 6};
byte[] compressed = pool.compress(orig).get();
byte[] decompressed = pool.decompress(compressed);
assertArrayEquals(orig, decompressed);
pool.shutdown();
}

// TODO
public void testMultithreadedCompression() throws Exception {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ public void testCacheAndCountSerializer() throws Exception {
// Check for both BigDocIdSet and RoaringDocIdSet
for (DocIdSet set : new DocIdSet[] { getBitDocIdSet(docs), getRoaringDocIdSet(docs, maxDoc) }) {
PluggableQueryCache.CacheAndCount original = new PluggableQueryCache.CacheAndCount(set, count, maxDoc);
PluggableQueryCache.CacheAndCountSerializer ser = new PluggableQueryCache.CacheAndCountSerializer();
CompressorPool pool = new CompressorPool(10);
PluggableQueryCache.CacheAndCountSerializer ser = new PluggableQueryCache.CacheAndCountSerializer(pool);
byte[] serialized = ser.serialize(original);
PluggableQueryCache.CacheAndCount deserialized = ser.deserialize(serialized);
assertTrue(ser.equals(original, serialized));
assertEquals(original, deserialized);
assertTrue(serialized.length > PluggableQueryCache.CacheAndCountSerializer.BLOCK_SIZE * Integer.BYTES);
pool.shutdown();
}
}

Expand All @@ -56,12 +58,14 @@ public void testCacheAndCountSerializerLongDocIdSet() throws Exception {

for (DocIdSet set : new DocIdSet[] { getBitDocIdSet(docs), getRoaringDocIdSet(docs, maxDoc) }) {
PluggableQueryCache.CacheAndCount original = new PluggableQueryCache.CacheAndCount(set, count, maxDoc);
PluggableQueryCache.CacheAndCountSerializer ser = new PluggableQueryCache.CacheAndCountSerializer();
CompressorPool pool = new CompressorPool(10);
PluggableQueryCache.CacheAndCountSerializer ser = new PluggableQueryCache.CacheAndCountSerializer(pool);
byte[] serialized = ser.serialize(original);
PluggableQueryCache.CacheAndCount deserialized = ser.deserialize(serialized);
assertTrue(ser.equals(original, serialized));
assertEquals(original, deserialized);
assertTrue(serialized.length > 2 * PluggableQueryCache.CacheAndCountSerializer.BLOCK_SIZE * Integer.BYTES);
pool.shutdown();
}
}

Expand Down

0 comments on commit 1ebdc0b

Please sign in to comment.