diff --git a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index b00936d3efa81..b62b83d7a971e 100644 --- a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -24,6 +24,7 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import fixture.azure.AzureHttpHandler; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; @@ -39,14 +40,18 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Base64; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.function.Predicate; import java.util.regex.Pattern; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { @@ -231,4 +236,26 @@ public void testLargeBlobCountDeletion() throws Exception { assertThat(container.listBlobs().size(), equalTo(0)); } } + + public void testDeleteBlobsIgnoringIfNotExists() throws Exception { + try (BlobStore store = newBlobStore()) { + final BlobContainer container = store.blobContainer(new BlobPath()); + List blobsToDelete = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + byte[] bytes = randomBytes(randomInt(100)); + String blobName = randomAlphaOfLength(10); + container.writeBlob(blobName, new BytesArray(bytes), false); + blobsToDelete.add(blobName); + } + + // Try to delete non existent blobs + for (int i = 0; i < 10; i++) { + blobsToDelete.add(randomName()); + } + + Randomness.shuffle(blobsToDelete); + container.deleteBlobsIgnoringIfNotExists(blobsToDelete); + assertThat(container.listBlobs(), is(anEmptyMap())); + } + } } diff --git a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java index a96522d451f3f..3997cf37b1f34 100644 --- a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java +++ b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java @@ -122,11 +122,4 @@ private void ensureSasTokenPermissions() { })); future.actionGet(); } - - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/66633") - @Override - // This override is only here so we can mute the test without muting the whole suite, remove it when the test is fixed - public void testCleanup() throws Exception { - super.testCleanup(); - } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index e411552d6d10f..790d4563e99a2 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -275,24 +275,25 @@ void deleteBlobList(List blobs) throws IOException { List> deleteTasks = new ArrayList<>(blobs.size()); final BlobContainerAsyncClient blobContainerClient = asyncClient.getBlobContainerAsyncClient(container); for (String blob : blobs) { - deleteTasks.add(blobContainerClient.getBlobAsyncClient(blob).delete()); + final Mono deleteTask = blobContainerClient.getBlobAsyncClient(blob) + .delete() + // Ignore not found blobs + .onErrorResume(e -> (e instanceof BlobStorageException) && ((BlobStorageException) e).getStatusCode() == 404, + throwable -> Mono.empty()); + deleteTasks.add(deleteTask); } executeDeleteTasks(deleteTasks); }); - } catch (BlobStorageException e) { - if (e.getStatusCode() != 404) { - throw new IOException("Unable to delete blobs " + blobs, e); - } } catch (Exception e) { throw new IOException("Unable to delete blobs " + blobs, e); } } private void executeDeleteTasks(List> deleteTasks) { - Flux.merge(deleteTasks) - .collectList() - .block(); + // zipDelayError executes all tasks in parallel and delays + // error propagation until all tasks have finished. + Mono.zipDelayError(deleteTasks, results -> null).block(); } public InputStream getInputStream(String blob, long position, final @Nullable Long length) throws IOException { diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java index 9e53a9bc96e9c..7f0c259a4ed9d 100644 --- a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java +++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java @@ -152,8 +152,12 @@ public void handle(final HttpExchange exchange) throws IOException { } else if (Regex.simpleMatch("DELETE /" + account + "/" + container + "/*", request)) { // Delete Blob (https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob) - blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath())); - exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1); + final boolean deleted = blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath())); + if (deleted) { + exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1); + } else { + exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); + } } else if (Regex.simpleMatch("GET /" + account + "/" + container + "?*restype=container*comp=list*", request)) { // List Blobs (https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs)