Skip to content

Commit

Permalink
Make cacheEntry.getIndexInput() privileged when fetching blobs from r…
Browse files Browse the repository at this point in the history
…emote snapshot (#16544)

* Make cacheEntry.getIndexInput() privileged when fetching blobs from remote store

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Rebase

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Spotless apply

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Clean up doPrivileged calls

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Comment

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Move fetchBlob to PrivilegedExceptionAction. Catch and unwrap IOException.

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Unused import

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Update server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java

Co-authored-by: Andriy Redko <drreta@gmail.com>
Signed-off-by: Finn <finnegancarroll94@gmail.com>

* Typo 'thrown'. Catch and throw unknown exception as IOException.

Signed-off-by: Finn Carroll <carrofin@amazon.com>

---------

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Signed-off-by: Finn <finnegancarroll94@gmail.com>
Co-authored-by: Andriy Redko <drreta@gmail.com>
  • Loading branch information
finnegancarroll and reta authored Nov 5, 2024
1 parent e07499a commit 4213cc2
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove resource usages object from search response headers ([#16532](https://github.com/opensearch-project/OpenSearch/pull/16532))
- Support retrieving doc values of unsigned long field ([#16543](https://github.com/opensearch-project/OpenSearch/pull/16543))
- Fix rollover alias supports restored searchable snapshot index([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
- Fix permissions error on scripted query against remote snapshot ([#16544](https://github.com/opensearch-project/OpenSearch/pull/16544))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -56,39 +57,52 @@ public TransferManager(final StreamReader streamReader, final FileCache fileCach

/**
* Given a blobFetchRequestList, return it's corresponding IndexInput.
*
* Note: Scripted queries/aggs may trigger a blob fetch within a new security context.
* As such the following operations require elevated permissions.
*
* cacheEntry.getIndexInput() downloads new blobs from the remote store to local fileCache.
* fileCache.compute() as inserting into the local fileCache may trigger an eviction.
*
* @param blobFetchRequest to fetch
* @return future of IndexInput augmented with internal caching maintenance tasks
*/
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException {

final Path key = blobFetchRequest.getFilePath();
logger.trace("fetchBlob called for {}", key.toString());

// We need to do a privileged action here in order to fetch from remote
// and write/evict from local file cache in case this is invoked as a side
// effect of a plugin (such as a scripted search) that doesn't have the
// necessary permissions.
final CachedIndexInput cacheEntry = AccessController.doPrivileged((PrivilegedAction<CachedIndexInput>) () -> {
return fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
logger.trace("Transfer Manager - IndexInput closed or not in cache");
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
} else {
logger.trace("Transfer Manager - Already in cache");
// already in the cache and ready to be used (open)
return cachedIndexInput;
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<IndexInput>) () -> {
CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
logger.trace("Transfer Manager - IndexInput closed or not in cache");
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
} else {
logger.trace("Transfer Manager - Already in cache");
// already in the cache and ready to be used (open)
return cachedIndexInput;
}
});

// Cache entry was either retrieved from the cache or newly added, either
// way the reference count has been incremented by one. We can only
// decrement this reference _after_ creating the clone to be returned.
try {
return cacheEntry.getIndexInput().clone();
} finally {
fileCache.decRef(key);
}
});
});

// Cache entry was either retrieved from the cache or newly added, either
// way the reference count has been incremented by one. We can only
// decrement this reference _after_ creating the clone to be returned.
try {
return cacheEntry.getIndexInput().clone();
} finally {
fileCache.decRef(key);
} catch (PrivilegedActionException e) {
final Exception cause = e.getException();
if (cause instanceof IOException) {
throw (IOException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new IOException(cause);
}
}
}

Expand Down

0 comments on commit 4213cc2

Please sign in to comment.