Skip to content

Commit

Permalink
Adding more description to MAX_SHARD_BLOB_DELETE_BATCH_SIZE
Browse files Browse the repository at this point in the history
Signed-off-by: Piyush Daftary <pdaftary@amazon.com>
  • Loading branch information
piyushdaftary committed Jul 19, 2022
1 parent c9d84ae commit fb5b074
Showing 1 changed file with 11 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -236,11 +237,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
);

/**
* Setting to set batch size of stale blobs to be deleted.
* Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion.
* For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation
* Most cloud storage support upto 1000 key(s) deletion in single operation, thus keeping default value to be 1000.
*/
public static final Setting<Integer> MAX_SHARD_BLOB_DELETE_BATCH_SIZE = Setting.intSetting(
"max_shard_blob_delete_batch_size",
1000,
1000, // the default maximum batch size of stale snapshot shard blobs deletion
Setting.Property.NodeScope
);

Expand Down Expand Up @@ -915,20 +918,13 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
return;
}

final BlockingQueue<List<String>> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>();
final List<String> partition = new ArrayList<>();

try {
for (String key : filesToDelete) {
partition.add(key);
if (maxShardBlobDeleteBatch == partition.size()) {
staleFilesToDeleteInBatch.add(new ArrayList<>(partition));
partition.clear();
}
}
if (partition.isEmpty() == false) {
staleFilesToDeleteInBatch.add(new ArrayList<>(partition));
}
AtomicInteger counter = new AtomicInteger();
Collection<List<String>> subList = filesToDelete.stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / maxShardBlobDeleteBatch))
.values();
final BlockingQueue<List<String>> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList);

final GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(
ActionListener.wrap(r -> { listener.onResponse(null); }, listener::onFailure),
staleFilesToDeleteInBatch.size()
Expand Down

0 comments on commit fb5b074

Please sign in to comment.