Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Index Client] Implement remote client build awaiting functionality, validate encoder support #2576

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* [Remote Vector Index Build] Implement vector data upload and vector data size threshold setting [#2550](https://github.com/opensearch-project/k-NN/pull/2550)
* [Remote Vector Index Build] Implement data download and IndexOutput write functionality [#2554](https://github.com/opensearch-project/k-NN/pull/2554)
* [Remote Vector Index Build] Introduce Client Skeleton + basic Build Request implementation [#2560](https://github.com/opensearch-project/k-NN/pull/2560)
* [Remote Vector Index Build] Implement remote client build awaiting functionality, validate encoder support [#2576](https://github.com/opensearch-project/k-NN/pull/2576)
* Add concurrency optimizations with native memory graph loading and force eviction (#2265) [https://github.com/opensearch-project/k-NN/pull/2345]
### Enhancements
* Introduce node level circuit breakers for k-NN [#2509](https://github.com/opensearch-project/k-NN/pull/2509)
Expand Down
16 changes: 15 additions & 1 deletion src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public class KNNSettings {

// TODO: Tune these default values based on benchmarking
public static final Integer KNN_DEFAULT_REMOTE_BUILD_CLIENT_TIMEOUT_MINUTES = 60;
public static final Integer KNN_DEFAULT_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SECONDS = 30;
public static final Integer KNN_DEFAULT_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SECONDS = 5;

/**
* Settings Definition
Expand Down Expand Up @@ -783,6 +783,20 @@ public static String getRemoteBuildServiceEndpoint() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_BUILD_SERVICE_ENDPOINT);
}

/**
* Gets the amount of time the client will wait before abandoning a remote build.
*/
public static TimeValue getRemoteBuildClientTimeout() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_BUILD_CLIENT_TIMEOUT);
}

/**
* Gets the interval at which a RemoteIndexPoller will poll for remote build status.
*/
public static TimeValue getRemoteBuildClientPollInterval() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL);
}

public static boolean isFaissAVX2Disabled() {
try {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_FAISS_AVX2_DISABLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public NativeIndexBuildStrategy getBuildStrategy(
if (KNNFeatureFlags.isKNNRemoteVectorBuildEnabled()
&& repositoriesServiceSupplier != null
&& indexSettings != null
&& knnEngine.supportsRemoteIndexBuild()
&& knnEngine.supportsRemoteIndexBuild(fieldInfo.attributes())
&& RemoteIndexBuildStrategy.shouldBuildIndexRemotely(indexSettings, vectorBlobLength)) {
return new RemoteIndexBuildStrategy(repositoriesServiceSupplier, strategy, indexSettings);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,36 @@

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang.StringUtils;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.IndexSettings;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.store.IndexOutputWithBuffer;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.DOC_ID_FILE_EXTENSION;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.VECTORS_PATH;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.VECTOR_BLOB_FILE_EXTENSION;

@Log4j2
@AllArgsConstructor
public class DefaultVectorRepositoryAccessor implements VectorRepositoryAccessor {
private final BlobStoreRepository repository;
private final IndexSettings indexSettings;
private final BlobContainer blobContainer;

/**
* If the repository implements {@link AsyncMultiStreamBlobContainer}, then parallel uploads will be used. Parallel uploads are backed by a {@link WriteContext}, for which we have a custom
Expand All @@ -65,18 +59,14 @@ public void writeToRepository(
VectorDataType vectorDataType,
Supplier<KNNVectorValues<?>> knnVectorValuesSupplier
) throws IOException, InterruptedException {
assert repository != null;
// Get the blob container based on blobName and the repo base path. This is where the blobs will be written to.
BlobPath path = repository.basePath().add(indexSettings.getUUID() + VECTORS_PATH);
BlobContainer blobContainer = repository.blobStore().blobContainer(path);

assert blobContainer != null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assserts do not work in prod. Can we do null checks instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the logic of getting the blob container comes from OpenSearch core so the assertion is just a sanity check on that

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jed326 if assertion fails will we fallback to CPU based index builds? and will there be a useful exception which can tell what happened? If no, please add these information

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is there is not any way for blobContainer to be null, and none of the methods in the calling path to retrieve blobContainer throw any checked exceptions. The only thing that can throw an exception is getRepository, and creating a blobContainer on top of a repository reference is more or less just setting the file path to be written/read to. This assertion just helps make sure in tests if we do try to test/mock this method we are properly configuring things.

KNNVectorValues<?> knnVectorValues = knnVectorValuesSupplier.get();
initializeVectorValues(knnVectorValues);
long vectorBlobLength = (long) knnVectorValues.bytesPerVector() * totalLiveDocs;

if (blobContainer instanceof AsyncMultiStreamBlobContainer) {
// First initiate vectors upload
log.debug("Repository {} Supports Parallel Blob Upload", repository);
log.debug("Container {} Supports Parallel Blob Upload", blobContainer);
// WriteContext is the main entry point into asyncBlobUpload. It stores all of our upload configurations, analogous to
// BuildIndexParams
WriteContext writeContext = new WriteContext.Builder().fileName(blobName + VECTOR_BLOB_FILE_EXTENSION)
Expand Down Expand Up @@ -128,7 +118,7 @@ public void onFailure(Exception e) {
throw new IOException(exception.get());
}
} else {
log.debug("Repository {} Does Not Support Parallel Blob Upload", repository);
log.debug("Container {} Does Not Support Parallel Blob Upload", blobContainer);
// Write Vectors
InputStream vectorStream = new BufferedInputStream(new VectorValuesInputStream(knnVectorValuesSupplier.get(), vectorDataType));
log.debug("Writing {} bytes for {} docs to {}", vectorBlobLength, totalLiveDocs, blobName + VECTOR_BLOB_FILE_EXTENSION);
Expand Down Expand Up @@ -216,25 +206,15 @@ private CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOExceptio
}

@Override
public void readFromRepository(String path, IndexOutputWithBuffer indexOutputWithBuffer) throws IOException {
if (path == null || path.isEmpty()) {
public void readFromRepository(String fileName, IndexOutputWithBuffer indexOutputWithBuffer) throws IOException {
if (StringUtils.isBlank(fileName)) {
throw new IllegalArgumentException("download path is null or empty");
}
Path downloadPath = Paths.get(path);
String fileName = downloadPath.getFileName().toString();
if (!fileName.endsWith(KNNEngine.FAISS.getExtension())) {
log.error("download path [{}] does not end with extension [{}}", downloadPath, KNNEngine.FAISS.getExtension());
log.error("file name [{}] does not end with extension [{}}", fileName, KNNEngine.FAISS.getExtension());
throw new IllegalArgumentException("download path has incorrect file extension");
}

BlobPath blobContainerPath = new BlobPath();
if (downloadPath.getParent() != null) {
for (Path p : downloadPath.getParent()) {
blobContainerPath = blobContainerPath.add(p.getFileName().toString());
}
}

BlobContainer blobContainer = repository.blobStore().blobContainer(blobContainerPath);
// TODO: We are using the sequential download API as multi-part parallel download is difficult for us to implement today and
// requires some changes in core. For more details, see: https://github.com/opensearch-project/k-NN/issues/2464
InputStream graphStream = blobContainer.readBlob(fileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@
import org.opensearch.common.StopWatch;
import org.opensearch.common.UUIDs;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy;
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
import org.opensearch.knn.index.remote.RemoteBuildRequest;
import org.opensearch.knn.index.remote.RemoteBuildResponse;
import org.opensearch.knn.index.remote.RemoteBuildStatusRequest;
import org.opensearch.knn.index.remote.RemoteBuildStatusResponse;
import org.opensearch.knn.index.remote.RemoteIndexClient;
import org.opensearch.knn.index.remote.RemoteIndexClientFactory;
import org.opensearch.knn.index.remote.RemoteStatusResponse;
import org.opensearch.knn.index.remote.RemoteIndexWaiter;
import org.opensearch.knn.index.remote.RemoteIndexWaiterFactory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
Expand All @@ -29,6 +33,7 @@
import static org.opensearch.knn.index.KNNSettings.KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING;
import static org.opensearch.knn.index.KNNSettings.KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING;
import static org.opensearch.knn.index.KNNSettings.KNN_REMOTE_VECTOR_REPO_SETTING;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.VECTORS_PATH;

/**
* This class orchestrates building vector indices. It handles uploading data to a repository, submitting a remote
Expand Down Expand Up @@ -111,7 +116,11 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
StopWatch stopWatch;
long time_in_millis;
try {
VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor(getRepository(), indexSettings);
BlobStoreRepository repository = getRepository();
BlobPath blobPath = repository.basePath().add(indexSettings.getUUID() + VECTORS_PATH);
VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor(
repository.blobStore().blobContainer(blobPath)
);
stopWatch = new StopWatch().start();
// We create a new time based UUID per file in order to avoid conflicts across shards. It is also very difficult to get the
// shard id in this context.
Expand All @@ -126,19 +135,26 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
log.debug("Repository write took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());

RemoteIndexClient client = RemoteIndexClientFactory.getRemoteIndexClient();
RemoteBuildRequest request = new RemoteBuildRequest(indexSettings, indexInfo, getRepository().getMetadata(), blobName);
RemoteBuildRequest buildRequest = new RemoteBuildRequest(
indexSettings,
indexInfo,
repository.getMetadata(),
blobPath.buildAsString()
);
stopWatch = new StopWatch().start();
RemoteBuildResponse remoteBuildResponse = client.submitVectorBuild(request);
RemoteBuildResponse remoteBuildResponse = client.submitVectorBuild(buildRequest);
time_in_millis = stopWatch.stop().totalTime().millis();
log.debug("Submit vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());

RemoteBuildStatusRequest remoteBuildStatusRequest = RemoteBuildStatusRequest.build(remoteBuildResponse);
RemoteIndexWaiter waiter = RemoteIndexWaiterFactory.getRemoteIndexWaiter(client);
stopWatch = new StopWatch().start();
RemoteStatusResponse remoteStatusResponse = client.awaitVectorBuild(remoteBuildResponse);
RemoteBuildStatusResponse remoteBuildStatusResponse = waiter.awaitVectorBuild(remoteBuildStatusRequest);
time_in_millis = stopWatch.stop().totalTime().millis();
log.debug("Await vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());

stopWatch = new StopWatch().start();
vectorRepositoryAccessor.readFromRepository(remoteStatusResponse.getIndexPath(), indexInfo.getIndexOutputWithBuffer());
vectorRepositoryAccessor.readFromRepository(remoteBuildStatusResponse.getFileName(), indexInfo.getIndexOutputWithBuffer());
time_in_millis = stopWatch.stop().totalTime().millis();
log.debug("Repository read took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
} catch (Exception e) {
Expand All @@ -149,10 +165,8 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
}

/**
* Gets the KNN repository container from the repository service.
*
* @return {@link RepositoriesService}
* @throws RepositoryMissingException if repository is not registered or if {@link KNN_REMOTE_VECTOR_REPO_SETTING} is not set
* @return {@link BlobStoreRepository} referencing the repository
* @throws RepositoryMissingException if repository is not registered or if {@link KNNSettings#KNN_REMOTE_VECTOR_REPO_SETTING} is not set
*/
private BlobStoreRepository getRepository() throws RepositoryMissingException {
RepositoriesService repositoriesService = repositoriesServiceSupplier.get();
Expand All @@ -165,4 +179,5 @@ private BlobStoreRepository getRepository() throws RepositoryMissingException {
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
return (BlobStoreRepository) repository;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ void writeToRepository(

/**
* Read constructed vector file from remote repository and write to IndexOutput
* @param path File path as String
* @param fileName File name as String
* @param indexOutputWithBuffer {@link IndexOutputWithBuffer} which will be used to write to the underlying {@link org.apache.lucene.store.IndexOutput}
* @throws IOException
*/
void readFromRepository(String path, IndexOutputWithBuffer indexOutputWithBuffer) throws IOException;
void readFromRepository(String fileName, IndexOutputWithBuffer indexOutputWithBuffer) throws IOException;
}
10 changes: 8 additions & 2 deletions src/main/java/org/opensearch/knn/index/engine/KNNEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,17 @@ public ResolvedMethodContext resolveMethod(
return knnLibrary.resolveMethod(knnMethodContext, knnMethodConfigContext, shouldRequireTraining, spaceType);
}

/**
* {@inheritDoc}
*/
@Override
public boolean supportsRemoteIndexBuild() {
return knnLibrary.supportsRemoteIndexBuild();
public boolean supportsRemoteIndexBuild(Map<String, String> attributes) {
return knnLibrary.supportsRemoteIndexBuild(attributes);
}

/**
* {@inheritDoc}
*/
@Override
public RemoteIndexParameters createRemoteIndexingParameters(Map<String, Object> indexInfoParameters) {
return knnLibrary.createRemoteIndexingParameters(indexInfoParameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ default List<String> mmapFileExtensions() {
}

/**
* Returns whether or not the engine implementation supports remote index build
* Returns whether the engine implementation supports remote index build
* @return true if remote index build is supported, false otherwise
*/
default boolean supportsRemoteIndexBuild() {
default boolean supportsRemoteIndexBuild(Map<String, String> attributes) {
return false;
}

Expand All @@ -161,8 +161,10 @@ default boolean isRestricted(Version indexVersionCreated) {
return false; // By default, libraries are not deprecated
}

/**
* Creates the set of index parameters needed to build the remote index
*/
default RemoteIndexParameters createRemoteIndexingParameters(Map<String, Object> indexInfoParameters) {
throw new UnsupportedOperationException("Remote build service does not support this engine");

}
}
56 changes: 54 additions & 2 deletions src/main/java/org/opensearch/knn/index/engine/faiss/Faiss.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
package org.opensearch.knn.index.engine.faiss;

import com.google.common.collect.ImmutableMap;
import org.apache.lucene.index.FieldInfo;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.engine.KNNMethod;
Expand All @@ -16,12 +21,14 @@
import org.opensearch.knn.index.engine.ResolvedMethodContext;
import org.opensearch.knn.index.remote.RemoteIndexParameters;

import java.io.IOException;
import java.util.Map;
import java.util.function.Function;

import static org.opensearch.knn.common.KNNConstants.METHOD_HNSW;
import static org.opensearch.knn.common.KNNConstants.METHOD_IVF;
import static org.opensearch.knn.common.KNNConstants.NAME;
import static org.opensearch.knn.common.KNNConstants.PARAMETERS;

/**
* Implements NativeLibrary for the faiss native library
Expand Down Expand Up @@ -121,11 +128,56 @@ public ResolvedMethodContext resolveMethod(
return methodResolver.resolveMethod(knnMethodContext, knnMethodConfigContext, shouldRequireTraining, spaceType);
}

/**
* Use the method name to route the check to the specific method class
*/
@Override
public boolean supportsRemoteIndexBuild() {
return true;
public boolean supportsRemoteIndexBuild(Map<String, String> attributes) {
String parametersJson = attributes.get(PARAMETERS);
if (parametersJson != null) {
String methodName = getMethodName(parametersJson);
if (METHOD_HNSW.equals(methodName)) {
return FaissHNSWMethod.supportsRemoteIndexBuild(attributes);
}
}
return false;
}

/**
* Get method name from a {@link FieldInfo} formatted attributes map.
* <p>
* Example:
* <pre>{@code {
* "index_description": "HNSW12,Flat",
* "spaceType": "l2",
* "name": "hnsw",
* ...
* }}</pre>
*/
private String getMethodName(String parametersJson) {
try {
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, parametersJson.getBytes());

while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
if (NAME.equals(fieldName)) {
// Matched field name (key), next line will move to the value
parser.nextToken();
return parser.text();
}
}
}
return null;
} catch (IOException e) {
return null;
}
}

/**
* {@inheritDoc}
*/
@Override
public RemoteIndexParameters createRemoteIndexingParameters(Map<String, Object> indexInfoParameters) {
if (METHOD_HNSW.equals(indexInfoParameters.get(NAME))) {
Expand Down
Loading
Loading