Skip to content

Commit

Permalink
Adding basic building blocks for MemoryOptimizedSearch. At the moment…
Browse files Browse the repository at this point in the history
…, only FAISS is supporing this.

Signed-off-by: Dooyong Kim <kdooyong@amazon.com>
  • Loading branch information
Dooyong Kim committed Mar 6, 2025
1 parent c7ac05c commit 83cd071
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.opensearch.knn.index.codec.KNN990Codec;

import lombok.extern.slf4j.Slf4j;
import org.apache.lucene.codecs.KnnVectorsReader;
import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
import org.apache.lucene.index.ByteVectorValues;
Expand All @@ -22,12 +23,16 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOSupplier;
import org.apache.lucene.util.IOUtils;
import org.opensearch.common.UUIDs;
import org.opensearch.knn.index.codec.util.KNNCodecUtil;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.quantizationservice.QuantizationService;
import org.opensearch.knn.memoryoptsearch.MemoryOptimizedSearcher;
import org.opensearch.knn.memoryoptsearch.MemoryOptimizedSearcherFactory;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationState;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateReadConfig;
Expand All @@ -37,23 +42,89 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.knn.common.KNNConstants.KNN_ENGINE;
import static org.opensearch.knn.index.mapper.KNNVectorFieldMapper.KNN_FIELD;

/**
* Vectors reader class for reading the flat vectors for native engines. The class provides methods for iterating
* over the vectors and retrieving their values.
*/
@Slf4j
public class NativeEngines990KnnVectorsReader extends KnnVectorsReader {
private static final int RESERVE_TWICE_SPACE = 2;
private static final float SUFFICIENT_LOAD_FACTOR = 0.6f;

private final FlatVectorsReader flatVectorsReader;
private Map<String, String> quantizationStateCacheKeyPerField;
private SegmentReadState segmentReadState;
private final List<String> cacheKeys;
private Map<String, MemoryOptimizedSearcher> memoryOptimizedSearchers;

public NativeEngines990KnnVectorsReader(final SegmentReadState state, final FlatVectorsReader flatVectorsReader) {
this.flatVectorsReader = flatVectorsReader;
this.segmentReadState = state;
this.cacheKeys = getVectorCacheKeysFromSegmentReaderState(state);
this.memoryOptimizedSearchers = new HashMap<>(RESERVE_TWICE_SPACE * segmentReadState.fieldInfos.size(), SUFFICIENT_LOAD_FACTOR);
loadCacheKeyMap();

//
// TMP(KDY) : Dynamic update will be covered in part-7. Please refer to
// https://github.com/opensearch-project/k-NN/issues/2401#issuecomment-2699777824
//
final boolean isMemoryOptimizedSearchEnabled = false;
if (isMemoryOptimizedSearchEnabled) {
loadMemoryOptimizedSearcher();
}
}

private IOSupplier<MemoryOptimizedSearcher> getIndexFileNameIfMemoryOptimizedSearchSupported(final FieldInfo fieldInfo) {
// Skip non-knn fields.
final Map<String, String> attributes = fieldInfo.attributes();
if (attributes == null || attributes.containsKey(KNN_FIELD) == false) {
return null;
}

// Get engine
final String engineName = attributes.getOrDefault(KNN_ENGINE, KNNEngine.DEFAULT.getName());
final KNNEngine knnEngine = KNNEngine.getEngine(engineName);

// Get memory optimized searcher from engine
final MemoryOptimizedSearcherFactory searcherFactory = knnEngine.getMemoryOptimizedSearcherFactory();
if (searcherFactory == null) {
// It's not supported
return null;
}

// Start creating searcher
final String fileName = KNNCodecUtil.getNativeEngineFileFromFieldInfo(fieldInfo, segmentReadState.segmentInfo);
if (fileName != null) {
return () -> searcherFactory.createMemoryOptimizedSearcher(segmentReadState.directory, fileName);
}

// Not supported
return null;
}

public void loadMemoryOptimizedSearcher() {
try {
for (FieldInfo fieldInfo : segmentReadState.fieldInfos) {
final IOSupplier<MemoryOptimizedSearcher> searcherSupplier = getIndexFileNameIfMemoryOptimizedSearchSupported(fieldInfo);
if (searcherSupplier != null) {
final MemoryOptimizedSearcher searcher = Objects.requireNonNull(searcherSupplier.get());
memoryOptimizedSearchers.put(fieldInfo.getName(), searcher);
}
}
} catch (Exception e) {
// Close opened searchers first, then suppress
try {
IOUtils.closeWhileHandlingException(memoryOptimizedSearchers.values());
} catch (Exception closeException) {
log.error(closeException.getMessage(), closeException);
}
throw new RuntimeException(e);
}
}

/**
Expand Down Expand Up @@ -135,6 +206,14 @@ public void search(String field, float[] target, KnnCollector knnCollector, Bits
((QuantizationConfigKNNCollector) knnCollector).setQuantizationState(quantizationState);
return;
}

// Try with memory optimized searcher
final MemoryOptimizedSearcher memoryOptimizedSearcher = memoryOptimizedSearchers.get(field);
if (memoryOptimizedSearcher != null) {
memoryOptimizedSearcher.search(target, knnCollector, acceptDocs);
return;
}

throw new UnsupportedOperationException("Search functionality using codec is not supported with Native Engine Reader");
}

Expand Down Expand Up @@ -197,6 +276,9 @@ public void close() throws IOException {
quantizationStateCacheManager.evict(cacheKey);
}
}

// TODO(KDY)
// Close all memory optimized searchers.
}

private void loadCacheKeyMap() {
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/opensearch/knn/index/engine/KNNEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.collect.ImmutableSet;
import org.opensearch.common.ValidationException;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.memoryoptsearch.MemoryOptimizedSearcherFactory;
import org.opensearch.knn.index.engine.faiss.Faiss;
import org.opensearch.knn.index.engine.lucene.Lucene;
import org.opensearch.knn.index.engine.nmslib.Nmslib;
Expand Down Expand Up @@ -216,4 +217,9 @@ public ResolvedMethodContext resolveMethod(
public boolean supportsRemoteIndexBuild() {
return knnLibrary.supportsRemoteIndexBuild();
}

@Override
public MemoryOptimizedSearcherFactory getMemoryOptimizedSearcherFactory() {
return knnLibrary.getMemoryOptimizedSearcherFactory();
}
}
12 changes: 11 additions & 1 deletion src/main/java/org/opensearch/knn/index/engine/KNNLibrary.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.opensearch.common.ValidationException;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.memoryoptsearch.MemoryOptimizedSearcherFactory;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -140,11 +141,20 @@ default List<String> mmapFileExtensions() {
return Collections.emptyList();
}

/**
/*
* Returns whether or not the engine implementation supports remote index build
* @return true if remote index build is supported, false otherwise
*/
default boolean supportsRemoteIndexBuild() {
return false;
}

/**
* Create a new memory optimized searcher factory.
* @return New searcher factory that performs KNN search with optimized memory management.
* If null, it indicates it does not support memory optimized searcher.
*/
default MemoryOptimizedSearcherFactory getMemoryOptimizedSearcherFactory() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import com.google.common.collect.ImmutableMap;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.memoryoptsearch.MemoryOptimizedSearcherFactory;
import org.opensearch.knn.index.engine.KNNMethod;
import org.opensearch.knn.index.engine.KNNMethodConfigContext;
import org.opensearch.knn.index.engine.KNNMethodContext;
import org.opensearch.knn.index.engine.MethodResolver;
import org.opensearch.knn.index.engine.NativeLibrary;
import org.opensearch.knn.index.engine.ResolvedMethodContext;
import org.opensearch.knn.memoryoptsearch.faiss.FaissMemoryOptimizedSearcherFactory;

import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -123,4 +125,9 @@ public ResolvedMethodContext resolveMethod(
public boolean supportsRemoteIndexBuild() {
return true;
}

@Override
public MemoryOptimizedSearcherFactory getMemoryOptimizedSearcherFactory() {
return new FaissMemoryOptimizedSearcherFactory();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.memoryoptsearch;

import org.apache.lucene.search.KnnCollector;
import org.apache.lucene.util.Bits;

import java.io.Closeable;
import java.io.IOException;

/**
* Memory optimized searcher that performs vector search with the best efforts to minimize memory pressure.
* Its implementation should only focus on optimizing memory allocations, and must not increase memory pressure in JVM.
* The main focus of the pressure is limited to JVM, and it would be better if it could take a leverage of OS cache memory to improve
* performance internally.
* Although its main focus is on memory pressure, but it does not mean that we sacrifice performance over memory consumption.
* If anything, this searcher must be balanced between two factors, seeking the best performance while minimizing memory pressure.
* This is the goal of this searcher.
*/
public interface MemoryOptimizedSearcher extends Closeable {

/**
* Return the k nearest neighbor documents as determined by comparison of their vector values for
* this field, to the given vector, by the field's similarity function. The score of each document
* is derived from the vector similarity in a way that ensures scores are positive and that a
* larger score corresponds to a higher ranking.
*
* <p>The search is allowed to be approximate, meaning the results are not guaranteed to be the
* true k closest neighbors. For large values of k (for example when k is close to the total
* number of documents), the search may also retrieve fewer than k documents.
*
* @param target the vector-valued float vector query
* @param knnCollector a KnnResults collector and relevant settings for gathering vector results
* @param acceptDocs {@link Bits} that represents the allowed documents to match, or {@code null}
* if they are all allowed to match.
*/
void search(float[] target, KnnCollector knnCollector, Bits acceptDocs) throws IOException;

/**
* Return the k nearest neighbor documents as determined by comparison of their vector values for
* this field, to the given vector, by the field's similarity function. The score of each document
* is derived from the vector similarity in a way that ensures scores are positive and that a
* larger score corresponds to a higher ranking.
*
* <p>The search is allowed to be approximate, meaning the results are not guaranteed to be the
* true k closest neighbors. For large values of k (for example when k is close to the total
* number of documents), the search may also retrieve fewer than k documents.
*
* @param target the vector-valued byte vector query
* @param knnCollector a KnnResults collector and relevant settings for gathering vector results
* @param acceptDocs {@link Bits} that represents the allowed documents to match, or {@code null}
* if they are all allowed to match.
*/
void search(byte[] target, KnnCollector knnCollector, Bits acceptDocs) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.memoryoptsearch;

import org.apache.lucene.store.Directory;

import java.io.IOException;

/**
* Factory to create {@link MemoryOptimizedSearcher}.
* Provided parameters will have {@link Directory} and a file name where implementation can rely on it to open an input stream.
*/
public interface MemoryOptimizedSearcherFactory {
/**
* Create a non-null {@link MemoryOptimizedSearcher} with given Lucene's {@link Directory}.
*
* @param directory Lucene's Directory.
* @param fileName Logical file name to load.
* @return It must return a non-null {@link MemoryOptimizedSearcher}
* @throws IOException
*/
MemoryOptimizedSearcher createMemoryOptimizedSearcher(Directory directory, String fileName) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.memoryoptsearch.faiss;

import org.apache.lucene.search.KnnCollector;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.Bits;
import org.opensearch.knn.memoryoptsearch.MemoryOptimizedSearcher;

import java.io.IOException;

public class FaissMemoryOptimizedSearcher implements MemoryOptimizedSearcher {
private final IndexInput indexInput;

public FaissMemoryOptimizedSearcher(IndexInput indexInput) {
this.indexInput = indexInput;
}

@Override
public void search(float[] target, KnnCollector knnCollector, Bits acceptDocs) throws IOException {
// TODO(KDY) : This will be covered in subsequent parts.
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public void search(byte[] target, KnnCollector knnCollector, Bits acceptDocs) throws IOException {
// TODO(KDY) : This will be covered in subsequent parts.
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public void close() throws IOException {
indexInput.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.memoryoptsearch.faiss;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.ReadAdvice;
import org.opensearch.knn.memoryoptsearch.MemoryOptimizedSearcher;
import org.opensearch.knn.memoryoptsearch.MemoryOptimizedSearcherFactory;

import java.io.IOException;

public class FaissMemoryOptimizedSearcherFactory implements MemoryOptimizedSearcherFactory {
@Override
public MemoryOptimizedSearcher createMemoryOptimizedSearcher(final Directory directory, final String fileName) throws IOException {
// Why ReadAdvice.RANDOM?
// We pass `RANDOM` as advice to prevent the underlying storage from performing read-ahead. Since vector search naturally accesses
// random vector locations, read-ahead does not improve performance. By passing the `RANDOM` context, we explicitly indicate that
// this searcher will access vectors randomly.
final IndexInput indexInput = directory.openInput(
fileName,
new IOContext(IOContext.Context.DEFAULT, null, null, ReadAdvice.RANDOM)
);
return new FaissMemoryOptimizedSearcher(indexInput);
}
}

0 comments on commit 83cd071

Please sign in to comment.