-
Notifications
You must be signed in to change notification settings - Fork 144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Remote Index Client] Implement remote client build awaiting functionality, validate encoder support #2576
base: main
Are you sure you want to change the base?
[Remote Index Client] Implement remote client build awaiting functionality, validate encoder support #2576
Conversation
9a9dfb6
to
53d63d4
Compare
.../java/org/opensearch/knn/index/codec/nativeindex/remote/DefaultVectorRepositoryAccessor.java
Show resolved
Hide resolved
src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/knn/index/codec/nativeindex/remote/RemoteIndexBuildStrategy.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/knn/index/remote/KNNRemoteConstants.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/knn/index/remote/RemoteIndexPoller.java
Outdated
Show resolved
Hide resolved
Thanks @owenhalpert, the overall approach looks good to me, just needs some cleanup in a few places. |
On a high level - looking at the changes - I am not sure if we should add in all the client/HTTP constructs directly into the plugin. The job status constants, client configuration seems like a better fit for a client package. |
BlobPath path = repository.basePath().add(indexSettings.getUUID() + VECTORS_PATH); | ||
BlobContainer blobContainer = repository.blobStore().blobContainer(path); | ||
|
||
assert blobContainer != null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assserts do not work in prod. Can we do null checks instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the logic of getting the blob container comes from OpenSearch core so the assertion is just a sanity check on that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jed326 if assertion fails will we fallback to CPU based index builds? and will there be a useful exception which can tell what happened? If no, please add these information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point is there is not any way for blobContainer
to be null
, and none of the methods in the calling path to retrieve blobContainer
throw any checked exceptions. The only thing that can throw an exception is getRepository
, and creating a blobContainer on top of a repository reference is more or less just setting the file path to be written/read to. This assertion just helps make sure in tests if we do try to test/mock this method we are properly configuring things.
@kotwanikunal , since remote index build service is not an hosted service like S3 which vends out a HTTP client. Also, we don't want to maintain another repo as the maintainence overhead will be high. Hence the client code is better suited to be in k-NN repo only. If you see right now, the code is abstracted in a different java package. Now if you think we should move the java package to a separate module then that is something which can be easily achieved in upcoming iterations. Let us know your thoughts. |
8e6ce08
to
ce7f8a0
Compare
private String getMethodName(String parametersJson) throws IOException { | ||
XContentParser parser = XContentType.JSON.xContent() | ||
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, parametersJson.getBytes()); | ||
|
||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) { | ||
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) { | ||
String fieldName = parser.currentName(); | ||
parser.nextToken(); | ||
if (NAME.equals(fieldName)) { | ||
return parser.text(); | ||
} | ||
} | ||
} | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was mentioned before, but can we track in a GitHub issue somewhere to improve this parsing? I think that's a generic problem in k-NN and not specific to remote index build too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will get this out tomorrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/test/java/org/opensearch/knn/index/engine/KNNEngineTests.java
Outdated
Show resolved
Hide resolved
ce7f8a0
to
1a9577b
Compare
src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexBuildStrategyFactory.java
Show resolved
Hide resolved
BlobPath path = repository.basePath().add(indexSettings.getUUID() + VECTORS_PATH); | ||
BlobContainer blobContainer = repository.blobStore().blobContainer(path); | ||
|
||
assert blobContainer != null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jed326 if assertion fails will we fallback to CPU based index builds? and will there be a useful exception which can tell what happened? If no, please add these information
.../java/org/opensearch/knn/index/codec/nativeindex/remote/DefaultVectorRepositoryAccessor.java
Outdated
Show resolved
Hide resolved
private String getMethodName(String parametersJson) throws IOException { | ||
XContentParser parser = XContentType.JSON.xContent() | ||
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, parametersJson.getBytes()); | ||
|
||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) { | ||
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) { | ||
String fieldName = parser.currentName(); | ||
parser.nextToken(); | ||
if (NAME.equals(fieldName)) { | ||
return parser.text(); | ||
} | ||
} | ||
} | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
src/main/java/org/opensearch/knn/index/engine/faiss/FaissHNSWMethod.java
Outdated
Show resolved
Hide resolved
if (COMPLETED_INDEX_BUILD.equals(builder.taskStatus) && StringUtils.isBlank(builder.fileName)) { | ||
throw new IOException("Invalid response format, missing " + FILE_NAME + " for completed status"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this validation be part of fromXContent() ? I feel we should move this validation out of this function and put it at the place where we are validating the status and other attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the actual content checks to the poller (while the status response object will just be checking the validity of the JSON).
src/main/java/org/opensearch/knn/index/remote/RemoteIndexPoller.java
Outdated
Show resolved
Hide resolved
public RemoteBuildStatusResponse awaitVectorBuild(RemoteBuildResponse remoteBuildResponse) throws InterruptedException, IOException { | ||
RemoteIndexPoller remoteIndexPoller = new RemoteIndexPoller(this); | ||
return remoteIndexPoller.pollRemoteEndpoint(remoteBuildResponse); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when I mentioned that Polling function should not be part of client I mean the Poller class should call the awaitVectorBuild
api from outside. What is being done here is exactly reversed. This is the reason why I still think the api name should be getIndexBuildStatus
rather than awaitVectorBuild
. Because polling is one way to wait for the IndexBuild to complete.
I think RemoteIndexPoller -> getIndexBuildStatus()
rather than awaitVectorBuild() -> RemoteIndexPoller()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks. The new flow will be that the build strategy gets the Waiter implementation from a factory method. In this case it's a Poller. The Poller uses client.getBuildStatus and does the polling all on its own.
The client is specifically responsible for getting and sending requests — the Waiter will use the client to do so when needed.
src/main/java/org/opensearch/knn/index/remote/RemoteIndexPoller.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/knn/index/remote/RemoteIndexHTTPClient.java
Outdated
Show resolved
Hide resolved
Signed-off-by: owenhalpert <ohalpert@gmail.com>
Refactor, use more string constants where possible, fix unit tests after refactor Signed-off-by: owenhalpert <ohalpert@gmail.com>
Signed-off-by: owenhalpert <ohalpert@gmail.com> # Conflicts: # src/test/java/org/opensearch/knn/index/engine/KNNEngineTests.java
Signed-off-by: owenhalpert <ohalpert@gmail.com>
acd6118
to
2a80d32
Compare
src/main/java/org/opensearch/knn/index/engine/faiss/FaissHNSWMethod.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/knn/index/remote/RemoteIndexHTTPClient.java
Show resolved
Hide resolved
src/main/java/org/opensearch/knn/index/remote/RemoteIndexPoller.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/knn/index/remote/RemoteIndexPoller.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/knn/index/remote/RemoteIndexPoller.java
Outdated
Show resolved
Hide resolved
Signed-off-by: owenhalpert <ohalpert@gmail.com>
8c0a04a
to
fbb2d1b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @owenhalpert
src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexBuildStrategyFactory.java
Show resolved
Hide resolved
public boolean supportsRemoteIndexBuild(Map<String, String> attributes) throws IOException { | ||
String parametersJson = attributes.get(PARAMETERS); | ||
if (parametersJson != null) { | ||
String methodName = getMethodName(parametersJson); | ||
if (METHOD_HNSW.equals(methodName)) { | ||
return FaissHNSWMethod.supportsRemoteIndexBuild(attributes); | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
/** | ||
* Get method name from a {@link FieldInfo} formatted attributes map | ||
* Example: | ||
* { | ||
* "index_description": "HNSW12,Flat", | ||
* "spaceType": "l2", | ||
* "name": "hnsw", | ||
* ... | ||
* } | ||
*/ | ||
private String getMethodName(String parametersJson) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets avoid IOException in method signature, it is just making the whole interfaces ugly. Lets catch the exception in the functions and then see if we want throw a proper runtime exception or we want to return some defaults.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think we can catch and return null for the parsing methods on IOExceptions so the supportsRemoteIndexBuild just returns false.
* @return encoder name or null if not found | ||
* @throws IOException if the json string is not valid | ||
*/ | ||
private static String getEncoderName(String parametersJson) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jmazanec15 and @naveentatikonda can you please help in validating this logic to ensure that it is full proof, with all the different combinations that can be possible for the encoders
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm thinking about this a bit more. Do we only need the encoder on write? If so, I think itd be safest to retrieve it from the KnnVectorFieldType via mapper service (https://github.com/opensearch-project/k-NN/blob/main/src/main/java/org/opensearch/knn/index/mapper/KNNVectorFieldType.java). We should expose it as a method in https://github.com/opensearch-project/k-NN/blob/main/src/main/java/org/opensearch/knn/index/engine/KNNLibraryIndexingContext.java which will be available via the mapped field type.
If we need it on read, then we have to parse it out of this. But, from my understanding, we shouldnt need it on read.
public class RemoteBuildStatusRequest { | ||
private final String jobId; | ||
|
||
public RemoteBuildStatusRequest(RemoteBuildResponse remoteBuildResponse) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public RemoteBuildStatusRequest(RemoteBuildResponse remoteBuildResponse) { | |
public static RemoteBuildStatusRequest build(RemoteBuildResponse remoteBuildResponse) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Status Build Request work based on RemoteBuildResponse rather than it has to work with minimum JobId which is enough for get status.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this keeps the design more extensible if the request/response were to contain something in addition to or instead of the job ID (e.g. tenantId).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My Question is why constructor is taking RemoteBuildResponse , and then derive JOBID from here ? this is not flexibility , this is tight coupling with un related Response from some otheer API ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why RemoteBuildStatusRequest is tied to RemoteBuildResponse , both are totally unrelated !! , either it should be tied to JOBID , TanentId and what ever it comes...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I can make this change:
// in NativeIndexBuildStrategy
String jobId = remoteBuildResponse.getJobId();
RemoteBuildStatusRequest status = RemoteBuildStatusRequest.builder().jobId(jobId).build();
Now, instead of the two being related, the build strategy will process the remoteBuildResponse however it needs (in this case, just getting the job ID) and then separately build a status request with all necessary parameters (in this case, just the job ID).
In the future, we can draw more from the response, and easily add more parameters to the builder. Does that look better?
cc @navneet1v
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is perfect !!
src/main/java/org/opensearch/knn/index/remote/RemoteBuildStatusResponse.java
Show resolved
Hide resolved
src/main/java/org/opensearch/knn/index/remote/RemoteIndexWaiterFactory.java
Show resolved
Hide resolved
@owenhalpert please check why CIs are failing. Also, I think code is in pretty good shape and close to be getting shipped. |
@Value | ||
@Builder | ||
public class RemoteBuildStatusResponse { | ||
private static final ParseField TASK_STATUS_FIELD = new ParseField(TASK_STATUS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where we have defined Seervice API contract and how clinet can assume these parsing!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean up Javadocs, improve exception handling, tune defaults Signed-off-by: owenhalpert <ohalpert@gmail.com>
13c62a1
to
a43a1b7
Compare
src/main/java/org/opensearch/knn/index/remote/RemoteBuildStatusRequest.java
Outdated
Show resolved
Hide resolved
public class RemoteBuildStatusRequest { | ||
private final String jobId; | ||
|
||
public RemoteBuildStatusRequest(RemoteBuildResponse remoteBuildResponse) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is perfect !!
…us request from build response Signed-off-by: owenhalpert <ohalpert@gmail.com>
/** | ||
* Implementation of a {@link RemoteIndexWaiter} that awaits the vector build by polling. | ||
*/ | ||
class RemoteIndexPoller implements RemoteIndexWaiter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rewrote the entier class to make polling more intellizent with Jitter , distributed safe with exponential backoff but at the same time it respects user setting values . We can adustst MIN_POLL_INTERVAL_MS and max ones ..Let me know how it looks @navneet1v
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.Random;
class RemoteIndexPoller implements RemoteIndexWaiter {
private static final int INITIAL_DELAY_FACTOR = 3;
private static final int MIN_POLL_INTERVAL_MS = 500;
private static final int MAX_POLL_INTERVAL_MS = 30_000;
private static final int STATUS_CHANGE_RESET = 2; // Reset interval if status changes within this threshold
private static final double JITTER_LOWER = 0.8;
private static final double JITTER_UPPER = 1.2;
private final RemoteIndexClient client;
private final Random random = new Random();
RemoteIndexPoller(RemoteIndexClient client) {
this.client = Objects.requireNonNull(client, "RemoteIndexClient cannot be null");
}
@SuppressWarnings("BusyWait")
public RemoteBuildStatusResponse awaitVectorBuild(RemoteBuildStatusRequest request) throws InterruptedException, IOException {
long startTime = System.nanoTime();
long timeout = KNNSettings.getRemoteBuildClientTimeout().getNanos();
long basePollInterval = KNNSettings.getRemoteBuildClientPollInterval().getMillis();
long pollInterval = basePollInterval * INITIAL_DELAY_FACTOR;
Thread.sleep(applyJitter(pollInterval)); // Initial delay
int consecutiveSameStatus = 0;
String lastStatus = "";
for (int attempt = 1; System.nanoTime() - startTime < timeout; attempt++) {
RemoteBuildStatusResponse response = client.getBuildStatus(request);
String taskStatus = Objects.requireNonNullElse(response.getTaskStatus(), "").trim();
logger.debug("Polling attempt {}: Task status = {}", attempt, taskStatus);
switch (taskStatus) {
case COMPLETED_INDEX_BUILD -> {
String fileName = Objects.requireNonNullElse(response.getFileName(), "").trim();
if (fileName.isEmpty()) {
throw new IOException("Invalid response: Missing file name for completed status.");
}
logger.info("Remote index build completed successfully in {} seconds.",
Duration.ofNanos(System.nanoTime() - startTime).toSeconds());
return response;
}
case FAILED_INDEX_BUILD -> throw new InterruptedException(
"Remote index build failed after " + Duration.ofNanos(System.nanoTime() - startTime).toMinutes() +
" minutes. Error: " + response.getErrorMessage());
case RUNNING_INDEX_BUILD -> {
// Adjust polling intelligently
if (taskStatus.equals(lastStatus)) {
consecutiveSameStatus++;
if (consecutiveSameStatus >= STATUS_CHANGE_RESET) {
pollInterval = Math.min(pollInterval * 2, MAX_POLL_INTERVAL_MS); // Increase polling delay
}
} else {
consecutiveSameStatus = 0; // Reset when status changes
pollInterval = Math.max(pollInterval / 2, MIN_POLL_INTERVAL_MS); // Reduce polling delay
}
lastStatus = taskStatus;
// Prevent polling too close to timeout
long remainingTime = timeout - (System.nanoTime() - startTime);
if (remainingTime < pollInterval * 2) {
pollInterval = MIN_POLL_INTERVAL_MS;
}
long sleepTime = applyJitter(pollInterval);
logger.debug("Task still running. Waiting {} ms before next poll", sleepTime);
Thread.sleep(sleepTime);
}
default -> throw new IOException("Invalid task status: " + taskStatus);
}
}
throw new InterruptedException("Remote index build timed out after " +
Duration.ofNanos(System.nanoTime() - startTime).toMinutes() + " minutes.");
}
private long applyJitter(long baseInterval) {
return (long) (baseInterval * (JITTER_LOWER + (random.nextDouble() * (JITTER_UPPER - JITTER_LOWER))));
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Vikasht34. For the jitter aspect I can agree that would be generically good and I think would be fine to pull into this.
For exponential backoff I don't think we should use that logic for the poller for now and I think this is something we need to revisit based on benchmarking. We want to strike a balance between calling the remote index build service too often and waiting too long after the graph build has completed to get the status. For example, let's take a 1 minute poll interval. After polling 2 times (so we've waited 2 minutes), the build job is still just as likely to finish in 2min + 1ms as 2min + 2min, and I think it's better to over-index against the second case.
Based on benchmarking we should be able to establish p50/90/99 values on how long the remote index build takes for a given segment size and then we can set the polling interval as some function of that value. If the p50/p90/p99 is very narrow then exponential backoff probably wouldn't be necessary, but if it's a very wide range we can revisit this.
Description
This PR is a followup to #2560. This PR adds a RemoteIndexWaiter interface to await the completion of the remote vector build. For the HTTP client, this is implemented as a simple polling mechanism in
RemoteIndexPoller
. While there is a fair amount of files changed due to refactoring, the crux of this PR is in theRemoteIndexPoller
,RemoteIndexHTTPClient
, andRemoteIndexBuildStrategy
classes.This PR also adds a check on the encoder type, enforcing that only
HNSWFlat
is supported by the remote build service.This PR does not include:
which will go in the next PR.
Related Issues
#2560 PR 1
#2518 LLD
#2391 Meta issue
Check List
- [ ] New functionality has been documented.- [ ] API changes companion pull request created.--signoff
.- [ ] Public documentation issue/PR created.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.