Skip to content

Commit

Permalink
[S3 Repository] Make bulk delete size configurable (#10445)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna authored Oct 11, 2023
1 parent ed1b624 commit 9bcd7ea
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,6 @@ class S3BlobContainer extends AbstractBlobContainer implements AsyncMultiStreamB

private static final Logger logger = LogManager.getLogger(S3BlobContainer.class);

/**
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
*
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
*/
private static final int MAX_BULK_DELETES = 1000;

private final S3BlobStore blobStore;
private final String keyPath;

Expand Down Expand Up @@ -339,12 +332,12 @@ private void doDeleteBlobs(List<String> blobNames, boolean relative) throws IOEx
outstanding = new HashSet<>(blobNames);
}
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
// S3 API allows 1k blobs per delete so we split up the given blobs into requests of bulk size deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
final List<String> partition = new ArrayList<>();
for (String key : outstanding) {
partition.add(key);
if (partition.size() == MAX_BULK_DELETES) {
if (partition.size() == blobStore.getBulkDeletesSize()) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
partition.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import static org.opensearch.repositories.s3.S3Repository.BUCKET_SETTING;
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
Expand All @@ -74,6 +75,8 @@ class S3BlobStore implements BlobStore {

private volatile StorageClass storageClass;

private volatile int bulkDeletesSize;

private volatile RepositoryMetadata repositoryMetadata;

private final StatsMetricPublisher statsMetricPublisher = new StatsMetricPublisher();
Expand All @@ -92,6 +95,7 @@ class S3BlobStore implements BlobStore {
ByteSizeValue bufferSize,
String cannedACL,
String storageClass,
int bulkDeletesSize,
RepositoryMetadata repositoryMetadata,
AsyncTransferManager asyncTransferManager,
AsyncExecutorContainer priorityExecutorBuilder,
Expand All @@ -105,6 +109,7 @@ class S3BlobStore implements BlobStore {
this.bufferSize = bufferSize;
this.cannedACL = initCannedACL(cannedACL);
this.storageClass = initStorageClass(storageClass);
this.bulkDeletesSize = bulkDeletesSize;
this.repositoryMetadata = repositoryMetadata;
this.asyncTransferManager = asyncTransferManager;
this.normalExecutorBuilder = normalExecutorBuilder;
Expand All @@ -119,6 +124,7 @@ public void reload(RepositoryMetadata repositoryMetadata) {
this.bufferSize = BUFFER_SIZE_SETTING.get(repositoryMetadata.settings());
this.cannedACL = initCannedACL(CANNED_ACL_SETTING.get(repositoryMetadata.settings()));
this.storageClass = initStorageClass(STORAGE_CLASS_SETTING.get(repositoryMetadata.settings()));
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
}

@Override
Expand Down Expand Up @@ -150,6 +156,10 @@ public long bufferSizeInBytes() {
return bufferSize.getBytes();
}

public int getBulkDeletesSize() {
return bulkDeletesSize;
}

@Override
public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ class S3Repository extends MeteredBlobStoreRepository {
new ByteSizeValue(5, ByteSizeUnit.TB)
);

/**
* Maximum number of deletes in a DeleteObjectsRequest.
*
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
*/
static final Setting<Integer> BULK_DELETE_SIZE = Setting.intSetting("bulk_delete_size", 1000, 1, 1000);

/**
* Sets the S3 storage class type for the backup files. Values may be standard, reduced_redundancy,
* standard_ia, onezone_ia and intelligent_tiering. Defaults to standard.
Expand Down Expand Up @@ -231,6 +238,8 @@ class S3Repository extends MeteredBlobStoreRepository {
private final AsyncExecutorContainer normalExecutorBuilder;
private final Path pluginConfigPath;

private volatile int bulkDeletesSize;

// Used by test classes
S3Repository(
final RepositoryMetadata metadata,
Expand Down Expand Up @@ -340,6 +349,7 @@ protected S3BlobStore createBlobStore() {
bufferSize,
cannedACL,
storageClass,
bulkDeletesSize,
metadata,
asyncUploadUtils,
priorityExecutorBuilder,
Expand Down Expand Up @@ -399,6 +409,7 @@ private void readRepositoryMetadata() {
this.serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(metadata.settings());
this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings());
this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings());
this.bulkDeletesSize = BULK_DELETE_SIZE.get(metadata.settings());
if (S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) {
// provided repository settings
deprecationLogger.deprecate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import org.mockito.invocation.InvocationOnMock;

import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -387,6 +388,7 @@ private S3BlobStore createBlobStore() {
S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY),
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
BULK_DELETE_SIZE.get(Settings.EMPTY),
repositoryMetadata,
new AsyncTransferManager(
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import static org.opensearch.repositories.s3.S3ClientSettings.MAX_RETRIES_SETTING;
import static org.opensearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING;
import static org.opensearch.repositories.s3.S3ClientSettings.REGION;
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -215,6 +216,7 @@ protected AsyncMultiStreamBlobContainer createBlobContainer(
bufferSize == null ? S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY) : bufferSize,
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
BULK_DELETE_SIZE.get(Settings.EMPTY),
repositoryMetadata,
new AsyncTransferManager(
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,12 @@ public void testDelete() throws IOException {
final String bucketName = randomAlphaOfLengthBetween(1, 10);

final BlobPath blobPath = new BlobPath();
int bulkDeleteSize = 5;

final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(bulkDeleteSize);

final S3Client client = mock(S3Client.class);
doAnswer(invocation -> new AmazonS3Reference(client)).when(blobStore).clientReference();
Expand All @@ -297,8 +299,11 @@ public void testDelete() throws IOException {
when(client.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Iterable);

final List<String> keysDeleted = new ArrayList<>();
AtomicInteger deleteCount = new AtomicInteger();
doAnswer(invocation -> {
DeleteObjectsRequest deleteObjectsRequest = invocation.getArgument(0);
deleteCount.getAndIncrement();
logger.info("Object sizes are{}", deleteObjectsRequest.delete().objects().size());
keysDeleted.addAll(deleteObjectsRequest.delete().objects().stream().map(ObjectIdentifier::key).collect(Collectors.toList()));
return DeleteObjectsResponse.builder().build();
}).when(client).deleteObjects(any(DeleteObjectsRequest.class));
Expand All @@ -311,6 +316,8 @@ public void testDelete() throws IOException {
// keysDeleted will have blobPath also
assertEquals(listObjectsV2ResponseIterator.getKeysListed().size(), keysDeleted.size() - 1);
assertTrue(keysDeleted.contains(blobPath.buildAsString()));
// keysDeleted will have blobPath also
assertEquals((int) Math.ceil(((double) keysDeleted.size() + 1) / bulkDeleteSize), deleteCount.get());
keysDeleted.remove(blobPath.buildAsString());
assertEquals(new HashSet<>(listObjectsV2ResponseIterator.getKeysListed()), new HashSet<>(keysDeleted));
}
Expand Down

0 comments on commit 9bcd7ea

Please sign in to comment.