Skip to content

Commit

Permalink
Add async blob read and download support using multiple streams
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Aug 28, 2023
1 parent a08d588 commit 1250157
Show file tree
Hide file tree
Showing 15 changed files with 747 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Support realtime reads for GET requests ([#9212](https://github.com/opensearch-project/OpenSearch/pull/9212))
- [Feature] Expose term frequency in Painless script score context ([#9081](https://github.com/opensearch-project/OpenSearch/pull/9081))
- Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513))
- APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
Expand Down Expand Up @@ -211,6 +212,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}
}

@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
throw new UnsupportedOperationException();
}

// package private for testing
long getLargeBlobThresholdInBytes() {
return blobStore.bufferSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;

import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -881,6 +882,17 @@ public void onFailure(Exception e) {}
}
}

public void testAsyncBlobDownload() {
final S3BlobStore blobStore = mock(S3BlobStore.class);
final BlobPath blobPath = mock(BlobPath.class);
final String blobName = "test-blob";

final UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> {
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
blobContainer.readBlobAsync(blobName, new PlainActionFuture<>());
});
}

public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException {
testListBlobsByPrefixInLexicographicOrder(-5, 0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;
Expand All @@ -24,6 +25,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -114,6 +117,27 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp

}

@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
new Thread(() -> {
try {
long contentLength = listBlobs().get(blobName).length();
long partSize = contentLength / 10;
int numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1);
List<InputStreamContainer> blobPartStreams = new ArrayList<>();
for (int partNumber = 0; partNumber < numberOfParts; partNumber++) {
long offset = partNumber * partSize;
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
blobPartStreams.add(blobPartStream);
}
ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null);
listener.onResponse(blobReadContext);
} catch (Exception e) {
listener.onFailure(e);
}
}).start();
}

private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.read.listener.ReadContextListener;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;

/**
* An extension of {@link BlobContainer} that adds {@link VerifyingMultiStreamBlobContainer#asyncBlobUpload} to allow
Expand All @@ -31,4 +35,27 @@ public interface VerifyingMultiStreamBlobContainer extends BlobContainer {
* @throws IOException if any of the input streams could not be read, or the target blob could not be written to
*/
void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException;

/**
* Creates an async callback of a {@link ReadContext} containing the multipart streams for a specified blob within the container.
* @param blobName The name of the blob for which the {@link ReadContext} needs to be fetched.
* @param listener Async listener for {@link ReadContext} object which serves the input streams and other metadata for the blob
*
* @opensearch.experimental
*/
void readBlobAsync(String blobName, ActionListener<ReadContext> listener);

/**
* Asynchronously downloads the blob to the specified location using an executor from the thread pool.
* @param blobName The name of the blob for which needs to be downloaded.
* @param fileLocation The path on local disk where the blob needs to be downloaded.
* @param threadPool The threadpool instance which will provide the executor for performing a multipart download.
* @param completionListener Listener which will be notified when the download is complete.
*
* @opensearch.experimental
*/
default void asyncBlobDownload(String blobName, Path fileLocation, ThreadPool threadPool, ActionListener<String> completionListener) {
ReadContextListener readContextListener = new ReadContextListener(blobName, fileLocation, threadPool, completionListener);
readBlobAsync(blobName, readContextListener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read;

import org.opensearch.common.io.InputStreamContainer;

import java.util.List;

/**
* ReadContext is used to encapsulate all data needed by <code>BlobContainer#readBlobAsync</code>
*
* @opensearch.internal
*/
public class ReadContext {
private final long blobSize;
private final List<InputStreamContainer> partStreams;
private final String blobChecksum;

public ReadContext(long blobSize, List<InputStreamContainer> partStreams, String blobChecksum) {
this.blobSize = blobSize;
this.partStreams = partStreams;
this.blobChecksum = blobChecksum;
}

public String getBlobChecksum() {
return blobChecksum;
}

public int getNumberOfParts() {
return partStreams.size();
}

public long getBlobSize() {
return blobSize;
}

public List<InputStreamContainer> getPartStreams() {
return partStreams;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read.listener;

import org.opensearch.core.action.ActionListener;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* FileCompletionListener listens for completion of fetch on all the streams for a file, where
* individual streams are handled using {@link StreamCompletionListener}. The {@link StreamCompletionListener}(s)
* hold a reference to the file completion listener to be notified.
*
* @opensearch.internal
*/
public class FileCompletionListener implements ActionListener<Integer> {
private final int numberOfParts;
private final String fileName;
private final Set<Integer> completedParts;
private final ActionListener<String> completionListener;

public FileCompletionListener(int numberOfParts, String fileName, ActionListener<String> completionListener) {
this.completedParts = Collections.synchronizedSet(new HashSet<>());
this.numberOfParts = numberOfParts;
this.fileName = fileName;
this.completionListener = completionListener;
}

@Override
public void onResponse(Integer partNumber) {
completedParts.add(partNumber);
if (completedParts.size() == numberOfParts) {
completionListener.onResponse(fileName);
}
}

@Override
public void onFailure(Exception e) {
completionListener.onFailure(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read.listener;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;

import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* ReadContextListener orchestrates the async file fetch from the {@link org.opensearch.common.blobstore.BlobContainer}
* using a {@link ReadContext} callback. On response, it spawns off the download using multiple streams which are
* spread across a {@link ThreadPool} executor.
*
* @opensearch.internal
*/
public class ReadContextListener implements ActionListener<ReadContext> {

private final String fileName;
private final Path fileLocation;
private final ThreadPool threadPool;
private final ActionListener<String> completionListener;
private static final Logger logger = LogManager.getLogger(ReadContextListener.class);

public ReadContextListener(String fileName, Path fileLocation, ThreadPool threadPool, ActionListener<String> completionListener) {
this.fileName = fileName;
this.fileLocation = fileLocation;
this.threadPool = threadPool;
this.completionListener = completionListener;
}

@Override
public void onResponse(ReadContext readContext) {
final int numParts = readContext.getNumberOfParts();
final AtomicBoolean anyPartStreamFailed = new AtomicBoolean();
FileCompletionListener fileCompletionListener = new FileCompletionListener(numParts, fileName, completionListener);

for (int partNumber = 0; partNumber < numParts; partNumber++) {
StreamCompletionListener streamCompletionListener = new StreamCompletionListener(
partNumber,
readContext.getPartStreams().get(partNumber),
fileLocation,
anyPartStreamFailed,
fileCompletionListener
);
new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, streamCompletionListener, false).onResponse(null);
}
}

@Override
public void onFailure(Exception e) {
completionListener.onFailure(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read.listener;

import org.opensearch.common.io.Channels;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* StreamCompletionListener transfers the provided stream into the specified file path using a {@link FileChannel}
* instance. It performs offset based writes to the file and notifies the {@link FileCompletionListener} on completion.
*
* @opensearch.internal
*/
public class StreamCompletionListener implements ActionListener<Void> {
private final int partNumber;
private final InputStreamContainer blobPartStreamContainer;
private final Path fileLocation;
private final AtomicBoolean anyPartStreamFailed;
private final FileCompletionListener fileCompletionListener;

// 8 MB buffer for transfer
private static final int BUFFER_SIZE = 8 * 1024 * 2024;

public StreamCompletionListener(
int partNumber,
InputStreamContainer blobPartStreamContainer,
Path fileLocation,
AtomicBoolean anyPartStreamFailed,
FileCompletionListener fileCompletionListener
) {
this.partNumber = partNumber;
this.blobPartStreamContainer = blobPartStreamContainer;
this.fileLocation = fileLocation;
this.anyPartStreamFailed = anyPartStreamFailed;
this.fileCompletionListener = fileCompletionListener;
}

@Override
public void onResponse(Void unused) {
// Ensures no writes to the file if any stream fails.
if (!anyPartStreamFailed.get()) {
try (FileChannel outputFileChannel = FileChannel.open(fileLocation, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
try (InputStream inputStream = blobPartStreamContainer.getInputStream()) {
outputFileChannel.position(blobPartStreamContainer.getOffset());

final byte[] buffer = new byte[BUFFER_SIZE];
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
int bytesRead;

while ((bytesRead = inputStream.read(buffer)) != -1) {
byteBuffer.limit(bytesRead);
Channels.writeToChannel(byteBuffer, outputFileChannel);
byteBuffer.clear();
}
}
} catch (IOException e) {
onFailure(e);
return;
}
fileCompletionListener.onResponse(partNumber);
}
}

@Override
public void onFailure(Exception e) {
try {
if (Files.exists(fileLocation)) {
Files.delete(fileLocation);
}
} catch (IOException ex) {
// Die silently
}
if (!anyPartStreamFailed.get()) {
anyPartStreamFailed.compareAndSet(false, true);
fileCompletionListener.onFailure(e);
}
}
}
Loading

0 comments on commit 1250157

Please sign in to comment.