Skip to content

Commit

Permalink
File System Caching for remote store
Browse files Browse the repository at this point in the history
Part of this commit was developed from code and concepts initially implemented
in Amazon OpenSearch Service as part of the UltraWarm feature. Thank you to the
following developers and the entire UltraWarm team.

Co-authored-by: Min Zhou <minzho@amazon.com>
Co-authored-by: Ankit Malpani <malpani@amazon.com>
Co-authored-by: Rohit Nair <rohinair@amazon.com>
Co-authored-by: Sorabh Hamirwasia <hsorabh@amazon.com>
Co-authored-by: Ankit Jain <akjain@amazon.com>
Co-authored-by: Tianru Zhou <tianruz@amazon.com>
Co-authored-by: Neetika Singhal <neetiks@amazon.com>
Co-authored-by: Amit Khandelwal <mkhnde@amazon.com>
Co-authored-by: Vigya Sharma <vigyas@amazon.com>
Co-authored-by: Prateek Sharma <shrprat@amazon.com>
Co-authored-by: Venkata Jyothsna Donapati <donapv@amazon.com>
Co-authored-by: Vlad Rozov <vrozov@amazon.com>
Co-authored-by: Mohit Agrawal <agramohi@amazon.com>
Co-authored-by: Shweta Thareja <tharejas@amazon.com>
Co-authored-by: Palash Hedau <phhedau@amazon.com>
Co-authored-by: Saurabh Singh <sisurab@amazon.com>
Co-authored-by: Piyush Daftary <pdaftary@amazon.com>
Co-authored-by: Payal Maheshwari <pmaheshw@amazon.com>
Co-authored-by: Kunal Khatua <kkhatua@amazon.com>
Co-authored-by: Gulshan Kumar <kumargu@amazon.com>
Co-authored-by: Rushi Agrawal <agrrushi@amazon.com>
Co-authored-by: Ketan Verma <vermketa@amazon.com>
Co-authored-by: Gaurav Chandani <chngau@amazon.com>
Co-authored-by: Dharmesh Singh <sdharms@amazon.com>

Signed-off-by: Ahmad AbuKhalil <abukhali@amazon.com>
  • Loading branch information
aabukhalil committed Feb 9, 2023
1 parent dfe5d2b commit f3f4f04
Show file tree
Hide file tree
Showing 34 changed files with 2,895 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testMultivaluedGeoPointsAggregation() throws Exception {
*
* @param geometry {@link Geometry}
* @param geoShapeDocValue {@link GeoShapeDocValue}
* @param intersectingWithBB
* @param intersectingWithBB enable intersectingWithBB
* @return A {@link Set} of {@link String} which represents the buckets.
*/
@Override
Expand Down
28 changes: 28 additions & 0 deletions server/src/main/java/org/opensearch/ExceptionsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.opensearch.action.ShardOperationFailedException;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.Nullable;
import org.opensearch.common.compress.NotXContentException;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
Expand Down Expand Up @@ -338,6 +340,32 @@ public static void maybeDieOnAnotherThread(final Throwable throwable) {
});
}

/**
* Run passed runnable and catch exception and translate exception into runtime exception using
* {@link ExceptionsHelper#convertToRuntime(Exception)}
* @param supplier to run
*/
public static <R, E extends Exception> R catchAsRuntimeException(CheckedSupplier<R, E> supplier) {
try {
return supplier.get();
} catch (Exception e) {
throw convertToRuntime(e);
}
}

/**
* Run passed runnable and catch exception and translate exception into runtime exception using
* {@link ExceptionsHelper#convertToRuntime(Exception)}
* @param runnable to run
*/
public static void catchAsRuntimeException(CheckedRunnable<Exception> runnable) {
try {
runnable.run();
} catch (Exception e) {
throw convertToRuntime(e);
}
}

/**
* Deduplicate the failures by exception message and index.
*/
Expand Down
20 changes: 9 additions & 11 deletions server/src/main/java/org/opensearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -513,17 +513,15 @@ private void put(K key, V value, long now) {
promote(tuple.v1(), now);
}
if (replaced) {
removalListener.onRemoval(
new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalNotification.RemovalReason.REPLACED)
);
removalListener.onRemoval(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED));
}
}

private final Consumer<CompletableFuture<Entry<K, V>>> invalidationConsumer = f -> {
try {
Entry<K, V> entry = f.get();
try (ReleasableLock ignored = lruLock.acquire()) {
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
delete(entry, RemovalReason.INVALIDATED);
}
} catch (ExecutionException e) {
// ok
Expand All @@ -534,7 +532,7 @@ private void put(K key, V value, long now) {

/**
* Invalidate the association for the specified key. A removal notification will be issued for invalidated
* entries with {@link org.opensearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
* entries with {@link RemovalReason} INVALIDATED.
*
* @param key the key whose mapping is to be invalidated from the cache
*/
Expand All @@ -546,7 +544,7 @@ public void invalidate(K key) {
/**
* Invalidate the entry for the specified key and value. If the value provided is not equal to the value in
* the cache, no removal will occur. A removal notification will be issued for invalidated
* entries with {@link org.opensearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
* entries with {@link RemovalReason} INVALIDATED.
*
* @param key the key whose mapping is to be invalidated from the cache
* @param value the expected value that should be associated with the key
Expand All @@ -558,7 +556,7 @@ public void invalidate(K key, V value) {

/**
* Invalidate all cache entries. A removal notification will be issued for invalidated entries with
* {@link org.opensearch.common.cache.RemovalNotification.RemovalReason} INVALIDATED.
* {@link RemovalReason} INVALIDATED.
*/
public void invalidateAll() {
Entry<K, V> h;
Expand Down Expand Up @@ -589,7 +587,7 @@ public void invalidateAll() {
}
}
while (h != null) {
removalListener.onRemoval(new RemovalNotification<>(h.key, h.value, RemovalNotification.RemovalReason.INVALIDATED));
removalListener.onRemoval(new RemovalNotification<>(h.key, h.value, RemovalReason.INVALIDATED));
h = h.after;
}
}
Expand Down Expand Up @@ -707,7 +705,7 @@ public void remove() {
segment.remove(entry.key, entry.value, f -> {});
try (ReleasableLock ignored = lruLock.acquire()) {
current = null;
delete(entry, RemovalNotification.RemovalReason.INVALIDATED);
delete(entry, RemovalReason.INVALIDATED);
}
}
}
Expand Down Expand Up @@ -796,10 +794,10 @@ private void evictEntry(Entry<K, V> entry) {
if (segment != null) {
segment.remove(entry.key, entry.value, f -> {});
}
delete(entry, RemovalNotification.RemovalReason.EVICTED);
delete(entry, RemovalReason.EVICTED);
}

private void delete(Entry<K, V> entry, RemovalNotification.RemovalReason removalReason) {
private void delete(Entry<K, V> entry, RemovalReason removalReason) {
assert lruLock.isHeldByCurrentThread();

if (unlink(entry)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,6 @@
* @opensearch.internal
*/
public class RemovalNotification<K, V> {
/**
* Reason for notification removal
*
* @opensearch.internal
*/
public enum RemovalReason {
REPLACED,
INVALIDATED,
EVICTED
}

private final K key;
private final V value;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

/**
* Reason for notification removal
*
* @opensearch.internal
*/
public enum RemovalReason {
REPLACED,
INVALIDATED,
EVICTED,
EXPLICIT,
CAPACITY
}
27 changes: 27 additions & 0 deletions server/src/main/java/org/opensearch/common/cache/Weigher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache;

/**
* A class that can determine the weight of a value. The total weight threshold
* is used to determine when an eviction is required.
*
* @opensearch.internal
*/
public interface Weigher<V> {

/**
* Measures an object's weight to determine how many units of capacity that
* the value consumes. A value must consume a minimum of one unit.
*
* @param value the object to weigh
* @return the object's weight
*/
long weightOf(V value);
}
9 changes: 7 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -661,7 +662,8 @@ private void ensureNotFrozen() {

public static Map<String, IndexStorePlugin.DirectoryFactory> createBuiltInDirectoryFactories(
Supplier<RepositoriesService> repositoriesService,
ThreadPool threadPool
ThreadPool threadPool,
FileCache remoteStoreFileCache
) {
final Map<String, IndexStorePlugin.DirectoryFactory> factories = new HashMap<>();
for (Type type : Type.values()) {
Expand All @@ -674,7 +676,10 @@ public static Map<String, IndexStorePlugin.DirectoryFactory> createBuiltInDirect
factories.put(type.getSettingsKey(), DEFAULT_DIRECTORY_FACTORY);
break;
case REMOTE_SNAPSHOT:
factories.put(type.getSettingsKey(), new RemoteSnapshotDirectoryFactory(repositoriesService, threadPool));
factories.put(
type.getSettingsKey(),
new RemoteSnapshotDirectoryFactory(repositoriesService, threadPool, remoteStoreFileCache)
);
break;
default:
throw new IllegalStateException("No directory factory mapping for built-in type " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.TransferManager;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -42,9 +43,16 @@ public final class RemoteSnapshotDirectoryFactory implements IndexStorePlugin.Di
private final Supplier<RepositoriesService> repositoriesService;
private final ThreadPool threadPool;

public RemoteSnapshotDirectoryFactory(Supplier<RepositoriesService> repositoriesService, ThreadPool threadPool) {
private final FileCache remoteStoreFileCache;

public RemoteSnapshotDirectoryFactory(
Supplier<RepositoriesService> repositoriesService,
ThreadPool threadPool,
FileCache remoteStoreFileCache
) {
this.repositoriesService = repositoriesService;
this.threadPool = threadPool;
this.remoteStoreFileCache = remoteStoreFileCache;
}

@Override
Expand Down Expand Up @@ -81,7 +89,11 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
return threadPool.executor(ThreadPool.Names.SNAPSHOT).submit(() -> {
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(blobPath);
final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId);
TransferManager transferManager = new TransferManager(blobContainer, threadPool.executor(ThreadPool.Names.SEARCH));
TransferManager transferManager = new TransferManager(
blobContainer,
threadPool.executor(ThreadPool.Names.SEARCH),
remoteStoreFileCache
);
return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAcces

// Variables for actual held open block
/**
* Current block for read, it should be a cloned block always
* Current block for read, it should be a cloned block always. In current implementation this will be a FileCachedIndexInput
*/
protected IndexInput currentBlock;

Expand Down Expand Up @@ -366,9 +366,10 @@ public static Builder builder() {
return new Builder();
}

static class Builder {
public static class Builder {
// Block size shift (default value is 13 = 8KB)
public static final int DEFAULT_BLOCK_SIZE_SHIFT = 13;
public static final int DEFAULT_BLOCK_SIZE = 1 << DEFAULT_BLOCK_SIZE_SHIFT;;

private String resourceDescription;
private boolean isClone;
Expand Down Expand Up @@ -400,7 +401,7 @@ public Builder length(long length) {
return this;
}

public Builder blockSizeShift(int blockSizeShift) {
Builder blockSizeShift(int blockSizeShift) {
assert blockSizeShift < 31 : "blockSizeShift must be < 31";
this.blockSizeShift = blockSizeShift;
this.blockSize = 1 << blockSizeShift;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.filecache;

import org.apache.lucene.store.IndexInput;

/**
* Base IndexInput whose instances will be maintained in cache.
*
* @opensearch.internal
*/
public abstract class CachedIndexInput extends IndexInput {

/**
* resourceDescription should be a non-null, opaque string
* describing this resource; it's returned from
* {@link #toString}.
*/
protected CachedIndexInput(String resourceDescription) {
super(resourceDescription);
}

/**
* return true this index input is closed, false if not
* @return true this index input is closed, false if not
*/
public abstract boolean isClosed();
}
Loading

0 comments on commit f3f4f04

Please sign in to comment.