Skip to content

Commit

Permalink
Add Clear Cache API
Browse files Browse the repository at this point in the history
Signed-off-by: Naveen Tatikonda <navtat@amazon.com>
  • Loading branch information
naveentatikonda committed Aug 9, 2023
1 parent 0a7fc4e commit 24f2309
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 2 deletions.
3 changes: 3 additions & 0 deletions src/main/java/org/opensearch/knn/common/KNNConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,7 @@ public class KNNConstants {
private static final String JNI_LIBRARY_PREFIX = "opensearchknn_";
public static final String FAISS_JNI_LIBRARY_NAME = JNI_LIBRARY_PREFIX + FAISS_NAME;
public static final String NMSLIB_JNI_LIBRARY_NAME = JNI_LIBRARY_PREFIX + NMSLIB_NAME;

// API Constants
public static final String CLEAR_CACHE = "clear_cache";
}
28 changes: 28 additions & 0 deletions src/main/java/org/opensearch/knn/index/KNNIndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.knn.index.mapper.KNNVectorFieldMapper;
import org.opensearch.knn.index.memory.NativeMemoryAllocation;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.memory.NativeMemoryEntryContext;
import org.opensearch.knn.index.memory.NativeMemoryLoadStrategy;
Expand All @@ -27,6 +28,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -100,6 +102,32 @@ public void warmup() throws IOException {
}
}

/**
* Removes all the k-NN segments for this shard from the cache.
* Adding write lock onto the NativeMemoryAllocation of the index that needs to be evicted from cache.
* Write lock will be unlocked after the index is evicted. This locking mechanism is used to avoid
* conflicts with queries fired on this index when the index is being evicted from cache.
*/
public void clearCache() {
String indexName = getIndexName();
Optional<NativeMemoryAllocation> indexAllocationOptional;
NativeMemoryAllocation indexAllocation;
indexAllocationOptional = nativeMemoryCacheManager.getIndexMemoryAllocation(indexName);
if (indexAllocationOptional.isPresent()) {
indexAllocation = indexAllocationOptional.get();
indexAllocation.writeLock();
logger.info("[KNN] Evicting index from cache: [{}]", indexName);
try (Engine.Searcher searcher = indexShard.acquireSearcher("knn-clear-cache")) {
getAllEnginePaths(searcher.getIndexReader()).forEach((key, value) -> nativeMemoryCacheManager.invalidate(key));
} catch (IOException ex) {
logger.error("[KNN] Failed to evict index from cache: [{}]", indexName);
throw new RuntimeException(ex);
} finally {
indexAllocation.writeUnlock();
}
}
}

/**
* For the given shard, get all of its engine paths
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -303,6 +304,23 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC
return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load);
}

/**
* Returns the NativeMemoryAllocation associated with given index
* @param indexName name of OpenSearch index
* @return NativeMemoryAllocation associated with given index
*/
public Optional<NativeMemoryAllocation> getIndexMemoryAllocation(String indexName) {
Validate.notNull(indexName, "Index name cannot be null");
return cache.asMap()
.values()
.stream()
.filter(nativeMemoryAllocation -> nativeMemoryAllocation instanceof NativeMemoryAllocation.IndexAllocation)
.filter(
indexAllocation -> indexName.equals(((NativeMemoryAllocation.IndexAllocation) indexAllocation).getOpenSearchIndexName())
)
.findFirst();
}

/**
* Invalidate entry from the cache.
*
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/org/opensearch/knn/plugin/KNNPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.knn.plugin.rest.RestKNNWarmupHandler;
import org.opensearch.knn.plugin.rest.RestSearchModelHandler;
import org.opensearch.knn.plugin.rest.RestTrainModelHandler;
import org.opensearch.knn.plugin.rest.RestClearCacheHandler;
import org.opensearch.knn.plugin.script.KNNScoringScriptEngine;
import org.opensearch.knn.plugin.stats.KNNStats;
import org.opensearch.knn.plugin.transport.DeleteModelAction;
Expand All @@ -40,6 +41,8 @@
import org.opensearch.knn.plugin.transport.KNNStatsTransportAction;
import org.opensearch.knn.plugin.transport.KNNWarmupAction;
import org.opensearch.knn.plugin.transport.KNNWarmupTransportAction;
import org.opensearch.knn.plugin.transport.ClearCacheAction;
import org.opensearch.knn.plugin.transport.ClearCacheTransportAction;
import com.google.common.collect.ImmutableList;

import org.opensearch.action.ActionRequest;
Expand Down Expand Up @@ -231,14 +234,16 @@ public List<RestHandler> getRestHandlers(
RestDeleteModelHandler restDeleteModelHandler = new RestDeleteModelHandler();
RestTrainModelHandler restTrainModelHandler = new RestTrainModelHandler();
RestSearchModelHandler restSearchModelHandler = new RestSearchModelHandler();
RestClearCacheHandler restClearCacheHandler = new RestClearCacheHandler(clusterService, indexNameExpressionResolver);

return ImmutableList.of(
restKNNStatsHandler,
restKNNWarmupHandler,
restGetModelHandler,
restDeleteModelHandler,
restTrainModelHandler,
restSearchModelHandler
restSearchModelHandler,
restClearCacheHandler
);
}

Expand All @@ -258,7 +263,8 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(TrainingModelAction.INSTANCE, TrainingModelTransportAction.class),
new ActionHandler<>(RemoveModelFromCacheAction.INSTANCE, RemoveModelFromCacheTransportAction.class),
new ActionHandler<>(SearchModelAction.INSTANCE, SearchModelTransportAction.class),
new ActionHandler<>(UpdateModelGraveyardAction.INSTANCE, UpdateModelGraveyardTransportAction.class)
new ActionHandler<>(UpdateModelGraveyardAction.INSTANCE, UpdateModelGraveyardTransportAction.class),
new ActionHandler<>(ClearCacheAction.INSTANCE, ClearCacheTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.rest;

import com.google.common.collect.ImmutableList;
import lombok.AllArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.index.Index;
import org.opensearch.knn.common.exception.KNNInvalidIndicesException;
import org.opensearch.knn.plugin.KNNPlugin;
import org.opensearch.knn.plugin.transport.ClearCacheAction;
import org.opensearch.knn.plugin.transport.ClearCacheRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

import static org.opensearch.action.support.IndicesOptions.strictExpandOpen;
import static org.opensearch.knn.common.KNNConstants.CLEAR_CACHE;
import static org.opensearch.knn.index.KNNSettings.KNN_INDEX;

/**
* RestHandler for k-NN Clear Cache API. API provides the ability for a user to evict those indices from Cache.
*/
@AllArgsConstructor
public class RestClearCacheHandler extends BaseRestHandler {
private static final Logger logger = LogManager.getLogger(RestClearCacheHandler.class);

private static final String INDEX = "index";
public static String NAME = "knn_clear_cache_action";
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;

/**
* @return name of Clear Cache API action
*/
@Override
public String getName() {
return NAME;
}

/**
* @return Immutable List of Clear Cache API endpoint
*/
@Override
public List<Route> routes() {
return ImmutableList.of(
new Route(RestRequest.Method.POST, String.format(Locale.ROOT, "%s/%s/{%s}", KNNPlugin.KNN_BASE_URI, CLEAR_CACHE, INDEX))
);
}

/**
* @param request RestRequest
* @param client NodeClient
* @return RestChannelConsumer
*/
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
ClearCacheRequest clearCacheRequest = createClearCacheRequest(request);
logger.info("[KNN] ClearCache started for the following indices: [{}]", String.join(",", clearCacheRequest.indices()));
return channel -> client.execute(ClearCacheAction.INSTANCE, clearCacheRequest, new RestToXContentListener<>(channel));
}

// Create a clear cache request by processing the rest request and validating the indices
private ClearCacheRequest createClearCacheRequest(RestRequest request) {
String[] indexNames = Strings.splitStringByCommaToArray(request.param("index"));
Index[] indices = indexNameExpressionResolver.concreteIndices(clusterService.state(), strictExpandOpen(), indexNames);
validateIndices(indices);

return new ClearCacheRequest(indexNames);
}

// Validate if the given indices are k-NN indices or not. If there are any invalid indices,
// the request is rejected and an exception is thrown.
private void validateIndices(Index[] indices) {
List<String> invalidIndexNames = Arrays.stream(indices)
.filter(index -> !"true".equals(clusterService.state().metadata().getIndexSafe(index).getSettings().get(KNN_INDEX)))
.map(Index::getName)
.collect(Collectors.toList());

if (!invalidIndexNames.isEmpty()) {
throw new KNNInvalidIndicesException(
invalidIndexNames,
"ClearCache request rejected. One or more indices have 'index.knn' set to false."
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.transport;

import org.opensearch.action.ActionType;
import org.opensearch.common.io.stream.Writeable;

/**
* Action associated with ClearCache
*/
public class ClearCacheAction extends ActionType<ClearCacheResponse> {

public static final ClearCacheAction INSTANCE = new ClearCacheAction();
public static final String NAME = "cluster:admin/clear_cache_action";

private ClearCacheAction() {
super(NAME, ClearCacheResponse::new);
}

@Override
public Writeable.Reader<ClearCacheResponse> getResponseReader() {
return ClearCacheResponse::new;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.transport;

import org.opensearch.action.support.broadcast.BroadcastRequest;
import org.opensearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* Clear Cache Request. This request contains a list of indices which needs to be evicted from Cache.
*/
public class ClearCacheRequest extends BroadcastRequest<ClearCacheRequest> {

/**
* Constructor
*
* @param in input stream
* @throws IOException if read from stream fails
*/
public ClearCacheRequest(StreamInput in) throws IOException {
super(in);
}

/**
* Constructor
*
* @param indices list of indices which needs to be evicted from cache
*/
public ClearCacheRequest(String... indices) {
super(indices);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.transport;

import org.opensearch.action.support.DefaultShardOperationFailedException;
import org.opensearch.action.support.broadcast.BroadcastResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.core.xcontent.ToXContentObject;

import java.io.IOException;
import java.util.List;

/**
* {@link ClearCacheResponse} represents Response returned by {@link ClearCacheRequest}.
* Returns total number of shards on which ClearCache was performed on, as well as
* the number of shards that succeeded and the number of shards that failed.
*/
public class ClearCacheResponse extends BroadcastResponse implements ToXContentObject {

/**
* Constructor
*
* @param in input stream
* @throws IOException if read from stream fails
*/
public ClearCacheResponse(StreamInput in) throws IOException {
super(in);
}

/**
* Constructor
*
* @param totalShards total number of shards on which ClearCache was performed
* @param successfulShards number of shards that succeeded
* @param failedShards number of shards that failed
* @param shardFailures list of shard failure exceptions
*/
public ClearCacheResponse(
int totalShards,
int successfulShards,
int failedShards,
List<DefaultShardOperationFailedException> shardFailures
) {
super(totalShards, successfulShards, failedShards, shardFailures);
}
}
Loading

0 comments on commit 24f2309

Please sign in to comment.