From 72707c14f98408f7c78ae4fbf11b70c8b6ca5df0 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 12 Aug 2024 21:09:46 +0530 Subject: [PATCH] Async deletion with S3 Signed-off-by: Ashish Singh --- .../repositories/s3/S3BlobContainer.java | 142 ++++++++++++++++++ .../AsyncMultiStreamBlobContainer.java | 5 + ...syncMultiStreamEncryptedBlobContainer.java | 10 ++ 3 files changed, 157 insertions(+) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index b489a3cc85037..a2f6fadefbeca 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -32,6 +32,8 @@ package org.opensearch.repositories.s3; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.exception.SdkException; @@ -62,6 +64,7 @@ import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; +import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher; import software.amazon.awssdk.utils.CollectionUtils; import org.apache.logging.log4j.LogManager; @@ -875,4 +878,143 @@ CompletableFuture getBlobMetadata(S3AsyncClient s3A return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest)); } + + @Override + public void deleteAsync(ActionListener completionListener) { + final AtomicLong deletedBlobs = new AtomicLong(); + final AtomicLong deletedBytes = new AtomicLong(); + + try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) { + S3AsyncClient s3AsyncClient = asyncClientReference.get().client(); + + ListObjectsV2Request listRequest = ListObjectsV2Request.builder() + .bucket(blobStore.bucket()) + .prefix(keyPath) + .build(); + + ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest); + + CompletableFuture listingFuture = new CompletableFuture<>(); + + listPublisher.subscribe(new Subscriber<>() { + private Subscription subscription; + private final List objectsToDelete = new ArrayList<>(); + private final List> deleteFutures = new ArrayList<>(); + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + subscription.request(1); + } + + @Override + public void onNext(ListObjectsV2Response response) { + response.contents().forEach(s3Object -> { + deletedBlobs.incrementAndGet(); + deletedBytes.addAndGet(s3Object.size()); + objectsToDelete.add(ObjectIdentifier.builder().key(s3Object.key()).build()); + }); + + if (objectsToDelete.size() > blobStore.getBulkDeletesSize()) { + int bulkDeleteSize = blobStore.getBulkDeletesSize(); + while (objectsToDelete.size() >= bulkDeleteSize) { + List batch = new ArrayList<>(objectsToDelete.subList(0, bulkDeleteSize)); + deleteFutures.add(deleteObjects(batch)); + objectsToDelete.subList(0, bulkDeleteSize).clear(); + } + } + + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + listingFuture.completeExceptionally(new IOException("Failed to list objects for deletion", t)); + } + + @Override + public void onComplete() { + if (!objectsToDelete.isEmpty()) { + deleteFutures.add(deleteObjects(objectsToDelete)); + } + + CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])) + .whenComplete((v, throwable) -> { + if (throwable != null) { + listingFuture.completeExceptionally(new IOException("Failed to delete some objects", throwable)); + } else { + listingFuture.complete(null); + } + }); + } + + private CompletableFuture deleteObjects(List objects) { + DeleteObjectsRequest deleteRequest = DeleteObjectsRequest.builder() + .bucket(blobStore.bucket()) + .delete(Delete.builder().objects(objects).quiet(true).build()) + .build(); + + return s3AsyncClient.deleteObjects(deleteRequest); + } + }); + + listingFuture.whenComplete((v, throwable) -> { + if (throwable != null) { + completionListener.onFailure(throwable instanceof Exception ? (Exception) throwable + : new IOException("Unexpected error during async deletion", throwable)); + } else { + completionListener.onResponse(new DeleteResult(deletedBlobs.get(), deletedBytes.get())); + } + }); + } catch (Exception e) { + completionListener.onFailure(new IOException("Failed to initiate async deletion", e)); + } + } + + + + @Override + public void deleteBlobsAsyncIgnoringIfNotExists(List blobNames, ActionListener completionListener) { + if (blobNames.isEmpty()) { + completionListener.onResponse(null); + return; + } + + try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) { + S3AsyncClient s3AsyncClient = asyncClientReference.get().client(); + + List objectsToDelete = blobNames.stream() + .map(name -> ObjectIdentifier.builder().key(buildKey(name)).build()) + .collect(Collectors.toList()); + + // Use blobStore.getBulkDeletesSize() instead of hardcoded 1000 + int bulkDeleteSize = blobStore.getBulkDeletesSize(); + List> batches = new ArrayList<>(); + for (int i = 0; i < objectsToDelete.size(); i += bulkDeleteSize) { + batches.add(objectsToDelete.subList(i, Math.min(objectsToDelete.size(), i + bulkDeleteSize))); + } + + List> deleteFutures = new ArrayList<>(); + + for (List batch : batches) { + DeleteObjectsRequest deleteRequest = DeleteObjectsRequest.builder() + .bucket(blobStore.bucket()) + .delete(Delete.builder().objects(batch).quiet(true).build()) + .build(); + deleteFutures.add(s3AsyncClient.deleteObjects(deleteRequest)); + } + + CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])) + .whenComplete((v, throwable) -> { + if (throwable != null) { + completionListener.onFailure(throwable instanceof Exception ? (Exception) throwable + : new IOException("Failed to delete some blobs", throwable)); + } else { + completionListener.onResponse(null); + } + }); + } catch (Exception e) { + completionListener.onFailure(new IOException("Failed to initiate async blob deletion", e)); + } + } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java index 97f304d776f5c..b769cdc2fe7ab 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java @@ -14,6 +14,7 @@ import org.opensearch.core.action.ActionListener; import java.io.IOException; +import java.util.List; /** * An extension of {@link BlobContainer} that adds {@link AsyncMultiStreamBlobContainer#asyncBlobUpload} to allow @@ -48,4 +49,8 @@ public interface AsyncMultiStreamBlobContainer extends BlobContainer { * by underlying blobContainer. In this case, caller doesn't need to ensure integrity of data. */ boolean remoteIntegrityCheckSupported(); + + void deleteAsync(ActionListener completionListener); + + void deleteBlobsAsyncIgnoringIfNotExists(List blobNames, ActionListener completionListener); } diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java index 82bc7a0baed50..286c01f9dca44 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java @@ -171,4 +171,14 @@ private InputStreamContainer decryptInputStreamContainer(InputStreamContainer in return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos); } } + + @Override + public void deleteAsync(ActionListener completionListener) { + blobContainer.deleteAsync(completionListener); + } + + @Override + public void deleteBlobsAsyncIgnoringIfNotExists(List blobNames, ActionListener completionListener) { + blobContainer.deleteBlobsAsyncIgnoringIfNotExists(blobNames, completionListener); + } }