Skip to content

Commit

Permalink
Initial revisions, add check on encoder type
Browse files Browse the repository at this point in the history
Signed-off-by: owenhalpert <ohalpert@gmail.com>
  • Loading branch information
owenhalpert committed Mar 5, 2025
1 parent 53d63d4 commit 1a9577b
Show file tree
Hide file tree
Showing 16 changed files with 291 additions and 57 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* [Remote Vector Index Build] Implement vector data upload and vector data size threshold setting [#2550](https://github.com/opensearch-project/k-NN/pull/2550)
* [Remote Vector Index Build] Implement data download and IndexOutput write functionality [#2554](https://github.com/opensearch-project/k-NN/pull/2554)
* [Remote Vector Index Build] Introduce Client Skeleton + basic Build Request implementation [#2560](https://github.com/opensearch-project/k-NN/pull/2560)
* [Remote Vector Index Build] Client polling mechanism
* [Remote Vector Index Build] Client polling mechanism [#2576](https://github.com/opensearch-project/k-NN/pull/2576)
* Add concurrency optimizations with native memory graph loading and force eviction (#2265) [https://github.com/opensearch-project/k-NN/pull/2345]
### Enhancements
* Introduce node level circuit breakers for k-NN [#2509](https://github.com/opensearch-project/k-NN/pull/2509)
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ public class KNNSettings {
public static final String KNN_REMOTE_VECTOR_REPO = "knn.remote_index_build.vector_repo";
public static final String KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD = "index.knn.remote_index_build.size_threshold";
public static final String KNN_REMOTE_BUILD_SERVICE_ENDPOINT = "knn.remote_index_build.client.endpoint";
public static final String KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SECONDS = "knn.remote_index_build.client.poll_interval.seconds";
public static final String KNN_REMOTE_BUILD_CLIENT_TIMEOUT_MINUTES = "knn.remote_index_build.client.timeout.hours";
public static final String KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL = "knn.remote_index_build.client.poll_interval";
public static final String KNN_REMOTE_BUILD_CLIENT_TIMEOUT = "knn.remote_index_build.client.timeout";
public static final String KNN_REMOTE_BUILD_CLIENT_USERNAME = "knn.remote_index_build.client.username";
public static final String KNN_REMOTE_BUILD_CLIENT_PASSWORD = "knn.remote_index_build.client.password";

Expand Down Expand Up @@ -432,18 +432,18 @@ public class KNNSettings {
/**
* Time the remote build service client will wait before falling back to CPU index build.
*/
public static final Setting<TimeValue> KNN_REMOTE_BUILD_CLIENT_TIMEOUT_MINUTES_SETTING = Setting.timeSetting(
KNN_REMOTE_BUILD_CLIENT_TIMEOUT_MINUTES,
TimeValue.timeValueHours(KNN_DEFAULT_REMOTE_BUILD_CLIENT_TIMEOUT_MINUTES),
public static final Setting<TimeValue> KNN_REMOTE_BUILD_CLIENT_TIMEOUT_SETTING = Setting.timeSetting(
KNN_REMOTE_BUILD_CLIENT_TIMEOUT,
TimeValue.timeValueMinutes(KNN_DEFAULT_REMOTE_BUILD_CLIENT_TIMEOUT_MINUTES),
NodeScope,
Dynamic
);

/**
* Setting to control how often the remote build service client polls the build service for the status of the job.
*/
public static final Setting<TimeValue> KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SECONDS_SETTING = Setting.timeSetting(
KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SECONDS,
public static final Setting<TimeValue> KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SETTING = Setting.timeSetting(
KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL,
TimeValue.timeValueSeconds(KNN_DEFAULT_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SECONDS),
NodeScope,
Dynamic
Expand Down Expand Up @@ -656,12 +656,12 @@ private Setting<?> getSetting(String key) {
return KNN_REMOTE_BUILD_SERVICE_ENDPOINT_SETTING;
}

if (KNN_REMOTE_BUILD_CLIENT_TIMEOUT_MINUTES.equals(key)) {
return KNN_REMOTE_BUILD_CLIENT_TIMEOUT_MINUTES_SETTING;
if (KNN_REMOTE_BUILD_CLIENT_TIMEOUT.equals(key)) {
return KNN_REMOTE_BUILD_CLIENT_TIMEOUT_SETTING;
}

if (KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SECONDS.equals(key)) {
return KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SECONDS_SETTING;
if (KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL.equals(key)) {
return KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SETTING;
}

if (KNN_REMOTE_BUILD_CLIENT_USERNAME.equals(key)) {
Expand Down Expand Up @@ -702,8 +702,8 @@ public List<Setting<?>> getSettings() {
KNN_REMOTE_VECTOR_REPO_SETTING,
KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING,
KNN_REMOTE_BUILD_SERVICE_ENDPOINT_SETTING,
KNN_REMOTE_BUILD_CLIENT_TIMEOUT_MINUTES_SETTING,
KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SECONDS_SETTING,
KNN_REMOTE_BUILD_CLIENT_TIMEOUT_SETTING,
KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SETTING,
KNN_REMOTE_BUILD_CLIENT_USERNAME_SETTING,
KNN_REMOTE_BUILD_CLIENT_PASSWORD_SETTING
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public NativeIndexBuildStrategy getBuildStrategy(
if (KNNFeatureFlags.isKNNRemoteVectorBuildEnabled()
&& repositoriesServiceSupplier != null
&& indexSettings != null
&& knnEngine.supportsRemoteIndexBuild()
&& knnEngine.supportsRemoteIndexBuild(fieldInfo.attributes())
&& RemoteIndexBuildStrategy.shouldBuildIndexRemotely(indexSettings, vectorBlobLength)) {
return new RemoteIndexBuildStrategy(repositoriesServiceSupplier, strategy, indexSettings);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void writeToRepository(
VectorDataType vectorDataType,
Supplier<KNNVectorValues<?>> knnVectorValuesSupplier
) throws IOException, InterruptedException {
assert blobContainer != null;
KNNVectorValues<?> knnVectorValues = knnVectorValuesSupplier.get();
initializeVectorValues(knnVectorValues);
long vectorBlobLength = (long) knnVectorValues.bytesPerVector() * totalLiveDocs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.opensearch.common.StopWatch;
import org.opensearch.common.UUIDs;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.knn.index.KNNSettings;
Expand Down Expand Up @@ -116,7 +115,9 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
try {
BlobStoreRepository repository = getRepository();
BlobPath blobPath = repository.basePath().add(indexSettings.getUUID() + VECTORS_PATH);
VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor(getBlobContainer(repository, blobPath));
VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor(
repository.blobStore().blobContainer(blobPath)
);
stopWatch = new StopWatch().start();
// We create a new time based UUID per file in order to avoid conflicts across shards. It is also very difficult to get the
// shard id in this context.
Expand All @@ -135,7 +136,7 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
indexSettings,
indexInfo,
repository.getMetadata(),
blobPath.buildAsString() + blobName
blobPath.buildAsString()
);
stopWatch = new StopWatch().start();
RemoteBuildResponse remoteBuildResponse = client.submitVectorBuild(request);
Expand Down Expand Up @@ -174,12 +175,4 @@ private BlobStoreRepository getRepository() throws RepositoryMissingException {
return (BlobStoreRepository) repository;
}

/**
* @param blobStoreRepository {@link BlobStoreRepository} containing the blob container
* @return {@link BlobContainer} referencing the location for vector upload and graph download
* @throws RepositoryMissingException if repository is not registered or if {@link KNNSettings#KNN_REMOTE_VECTOR_REPO_SETTING} is not set
*/
private BlobContainer getBlobContainer(BlobStoreRepository blobStoreRepository, BlobPath path) throws RepositoryMissingException {
return blobStoreRepository.blobStore().blobContainer(path);
}
}
5 changes: 3 additions & 2 deletions src/main/java/org/opensearch/knn/index/engine/KNNEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.knn.index.engine.nmslib.Nmslib;
import org.opensearch.knn.index.remote.RemoteIndexParameters;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -214,8 +215,8 @@ public ResolvedMethodContext resolveMethod(
}

@Override
public boolean supportsRemoteIndexBuild() {
return knnLibrary.supportsRemoteIndexBuild();
public boolean supportsRemoteIndexBuild(Map<String, String> attributes) throws IOException {
return knnLibrary.supportsRemoteIndexBuild(attributes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.remote.RemoteIndexParameters;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -146,7 +147,7 @@ default List<String> mmapFileExtensions() {
* Returns whether or not the engine implementation supports remote index build
* @return true if remote index build is supported, false otherwise
*/
default boolean supportsRemoteIndexBuild() {
default boolean supportsRemoteIndexBuild(Map<String, String> attributes) throws IOException {
return false;
}

Expand Down
34 changes: 31 additions & 3 deletions src/main/java/org/opensearch/knn/index/engine/faiss/Faiss.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
package org.opensearch.knn.index.engine.faiss;

import com.google.common.collect.ImmutableMap;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.engine.KNNMethod;
Expand All @@ -16,12 +20,14 @@
import org.opensearch.knn.index.engine.ResolvedMethodContext;
import org.opensearch.knn.index.remote.RemoteIndexParameters;

import java.io.IOException;
import java.util.Map;
import java.util.function.Function;

import static org.opensearch.knn.common.KNNConstants.METHOD_HNSW;
import static org.opensearch.knn.common.KNNConstants.METHOD_IVF;
import static org.opensearch.knn.common.KNNConstants.NAME;
import static org.opensearch.knn.common.KNNConstants.PARAMETERS;

/**
* Implements NativeLibrary for the faiss native library
Expand Down Expand Up @@ -121,9 +127,31 @@ public ResolvedMethodContext resolveMethod(
return methodResolver.resolveMethod(knnMethodContext, knnMethodConfigContext, shouldRequireTraining, spaceType);
}

@Override
public boolean supportsRemoteIndexBuild() {
return true;
public boolean supportsRemoteIndexBuild(Map<String, String> attributes) throws IOException {
String parametersJson = attributes.get(PARAMETERS);
if (parametersJson != null) {
String methodName = getMethodName(parametersJson);
if (METHOD_HNSW.equals(methodName)) {
return FaissHNSWMethod.supportsRemoteIndexBuild(attributes);
}
}
return false;
}

private String getMethodName(String parametersJson) throws IOException {
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, parametersJson.getBytes());

while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
parser.nextToken();
if (NAME.equals(fieldName)) {
return parser.text();
}
}
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
package org.opensearch.knn.index.engine.faiss;

import com.google.common.collect.ImmutableSet;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.SpaceType;
Expand All @@ -22,6 +26,7 @@
import org.opensearch.knn.index.remote.RemoteFaissHNSWIndexParameters;
import org.opensearch.knn.index.remote.RemoteIndexParameters;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -37,6 +42,7 @@
import static org.opensearch.knn.common.KNNConstants.METHOD_PARAMETER_EF_CONSTRUCTION;
import static org.opensearch.knn.common.KNNConstants.METHOD_PARAMETER_EF_SEARCH;
import static org.opensearch.knn.common.KNNConstants.METHOD_PARAMETER_M;
import static org.opensearch.knn.common.KNNConstants.NAME;
import static org.opensearch.knn.common.KNNConstants.PARAMETERS;
import static org.opensearch.knn.common.KNNConstants.SPACE_TYPE;

Expand Down Expand Up @@ -187,4 +193,58 @@ private static int getMFromIndexDescription(String indexDescription) {
String hnswPart = indexDescription.substring(0, commaIndex);
return Integer.parseInt(hnswPart.substring(4));
}

/**
* Return whether this engine/method supports remote build.
* @param attributes
* @return true if remote build is supported, false otherwise
* @throws IOException
*/
static boolean supportsRemoteIndexBuild(Map<String, String> attributes) throws IOException {
String parametersJson = attributes.get("parameters");
String encoderName = getEncoderName(parametersJson);
return "flat".equals(encoderName);
}

/**
* Gets encoder name from a {@FieldInfo parameters} map
*
* @param parametersJson
* @return encoder name
* @throws IOException
*/
private static String getEncoderName(String parametersJson) throws IOException {
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, parametersJson.getBytes());

while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
parser.nextToken();

if (PARAMETERS.equals(fieldName) && parser.currentToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String paramName = parser.currentName();
parser.nextToken();

if (METHOD_ENCODER_PARAMETER.equals(paramName) && parser.currentToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String encoderField = parser.currentName();
parser.nextToken();

if (NAME.equals(encoderField)) {
return parser.text();
}
}
}
}
}
}
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,4 @@ public class KNNRemoteConstants {
public static final String FAILED_INDEX_BUILD = "FAILED_INDEX_BUILD";
public static final String FILE_NAME = "file_name";
public static final String ERROR_MESSAGE = "error_message";
public static final String MOCK_FILE_NAME = "graph.faiss";
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

import java.io.IOException;

import static org.opensearch.knn.index.remote.KNNRemoteConstants.*;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.COMPLETED_INDEX_BUILD;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.ERROR_MESSAGE;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.FILE_NAME;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.TASK_STATUS;

/**
* Response from the remote index build service. This class is used to parse the response from the remote index build service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import static org.opensearch.knn.index.remote.KNNRemoteConstants.FAILED_INDEX_BUILD;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.RUNNING_INDEX_BUILD;

public class RemoteIndexPoller {
class RemoteIndexPoller {
private final RemoteIndexClient client;

public RemoteIndexPoller(RemoteIndexClient client) {
RemoteIndexPoller(RemoteIndexClient client) {
this.client = client;
}

Expand All @@ -30,10 +30,10 @@ public RemoteIndexPoller(RemoteIndexClient client) {
* @throws IOException if an I/O error occurs
*/
@SuppressWarnings("BusyWait")
public RemoteBuildStatusResponse pollRemoteEndpoint(RemoteBuildResponse remoteBuildResponse) throws InterruptedException, IOException {
RemoteBuildStatusResponse pollRemoteEndpoint(RemoteBuildResponse remoteBuildResponse) throws InterruptedException, IOException {
long startTime = System.currentTimeMillis();
long timeout = ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_BUILD_CLIENT_TIMEOUT_MINUTES)).getMillis();
long pollInterval = ((TimeValue) (KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SECONDS)))
long timeout = ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_BUILD_CLIENT_TIMEOUT)).getMillis();
long pollInterval = ((TimeValue) (KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL)))
.getMillis();

// Initial delay to allow build service to process the job and store the ID before getting its status.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.DOC_ID_FILE_EXTENSION;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.VECTOR_BLOB_FILE_EXTENSION;
import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues;

public class DefaultVectorRepositoryAccessorTests extends RemoteIndexBuildTests {

Expand Down
Loading

0 comments on commit 1a9577b

Please sign in to comment.