diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java index 07a0b49df47ff..f9218de243827 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java @@ -12,12 +12,14 @@ import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.crypto.CryptoHandler; +import org.opensearch.common.crypto.DecryptedRangedStreamProvider; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.core.action.ActionListener; -import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.nio.file.Path; +import java.io.InputStream; +import java.util.List; +import java.util.stream.Collectors; /** * EncryptedBlobContainer is an encrypted BlobContainer that is backed by a @@ -44,12 +46,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp @Override public void readBlobAsync(String blobName, ActionListener listener) { - throw new UnsupportedOperationException(); - } - - @Override - public void asyncBlobDownload(String blobName, Path fileLocation, ThreadPool threadPool, ActionListener completionListener) { - throw new UnsupportedOperationException(); + DecryptingActionListener decryptingActionListener = new DecryptingActionListener(listener, cryptoHandler); + blobContainer.readBlobAsync(blobName, decryptingActionListener); } @Override @@ -108,4 +106,69 @@ public InputStreamContainer provideStream(int partNumber) throws IOException { } } + + static class DecryptedReadContext extends ReadContext { + + private final U cryptoContext; + private final CryptoHandler cryptoHandler; + private final long fileSize; + + public DecryptedReadContext(ReadContext readContext, CryptoHandler cryptoHandler) { + super(readContext); + this.cryptoHandler = cryptoHandler; + try { + this.cryptoContext = this.cryptoHandler.loadEncryptionMetadata(null); + } catch (IOException e) { + throw new RuntimeException(e); + } + this.fileSize = this.cryptoHandler.estimateDecryptedLength(cryptoContext, readContext.getBlobSize()); + } + + @Override + public long getBlobSize() { + return fileSize; + } + + @Override + public List getPartStreams() { + return super.getPartStreams().stream().map(this::decrpytInputStreamContainer).collect(Collectors.toList()); + } + + private InputStreamContainer decrpytInputStreamContainer(InputStreamContainer inputStreamContainer) { + long startOfStream = inputStreamContainer.getOffset(); + long endOfStream = startOfStream + inputStreamContainer.getContentLength(); + DecryptedRangedStreamProvider rangedStreamProvider = cryptoHandler.createDecryptingStreamOfRange( + cryptoContext, + startOfStream, + endOfStream + ); + + InputStream decryptedStream = cryptoHandler.createDecryptingStream(inputStreamContainer.getInputStream()); + long offset = rangedStreamProvider.getAdjustedRange()[0]; + long contentLength = rangedStreamProvider.getAdjustedRange()[1]; + return new InputStreamContainer(decryptedStream, contentLength, offset); + } + } + + static class DecryptingActionListener implements ActionListener { + + private final ActionListener completionListener; + private final CryptoHandler cryptoHandler; + + public DecryptingActionListener(ActionListener completionListener, CryptoHandler cryptoHandler) { + this.completionListener = completionListener; + this.cryptoHandler = cryptoHandler; + } + + @Override + public void onResponse(ReadContext readContext) { + DecryptedReadContext decryptedReadContext = new DecryptedReadContext(readContext, cryptoHandler); + completionListener.onResponse(decryptedReadContext); + } + + @Override + public void onFailure(Exception e) { + completionListener.onFailure(e); + } + } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java index 4ba17959f8040..2c305fb03c475 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -28,6 +28,12 @@ public ReadContext(long blobSize, List partStreams, String this.blobChecksum = blobChecksum; } + public ReadContext(ReadContext readContext) { + this.blobSize = readContext.blobSize; + this.partStreams = readContext.partStreams; + this.blobChecksum = readContext.blobChecksum; + } + public String getBlobChecksum() { return blobChecksum; }