diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index e2b0140ad6fe3..81c086a839f7d 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -147,6 +147,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -236,11 +237,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp ); /** - * Setting to set batch size of stale blobs to be deleted. + * Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion. + * For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation + * Most cloud storage support upto 1000 key(s) deletion in single operation, thus keeping default value to be 1000. */ public static final Setting MAX_SHARD_BLOB_DELETE_BATCH_SIZE = Setting.intSetting( "max_shard_blob_delete_batch_size", - 1000, + 1000, // the default maximum batch size of stale snapshot shard blobs deletion Setting.Property.NodeScope ); @@ -915,20 +918,13 @@ private void asyncCleanupUnlinkedShardLevelBlobs( return; } - final BlockingQueue> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(); - final List partition = new ArrayList<>(); - try { - for (String key : filesToDelete) { - partition.add(key); - if (maxShardBlobDeleteBatch == partition.size()) { - staleFilesToDeleteInBatch.add(new ArrayList<>(partition)); - partition.clear(); - } - } - if (partition.isEmpty() == false) { - staleFilesToDeleteInBatch.add(new ArrayList<>(partition)); - } + AtomicInteger counter = new AtomicInteger(); + Collection> subList = filesToDelete.stream() + .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / maxShardBlobDeleteBatch)) + .values(); + final BlockingQueue> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList); + final GroupedActionListener groupedListener = new GroupedActionListener<>( ActionListener.wrap(r -> { listener.onResponse(null); }, listener::onFailure), staleFilesToDeleteInBatch.size()