Skip to content

Commit

Permalink
Avoid early task cancellation during azure parallel blob deletions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez authored Jan 5, 2021
1 parent 3312c2e commit f1ebe11
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.azure.AzureHttpHandler;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -39,14 +40,18 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Pattern;

import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
Expand Down Expand Up @@ -231,4 +236,26 @@ public void testLargeBlobCountDeletion() throws Exception {
assertThat(container.listBlobs().size(), equalTo(0));
}
}

public void testDeleteBlobsIgnoringIfNotExists() throws Exception {
try (BlobStore store = newBlobStore()) {
final BlobContainer container = store.blobContainer(new BlobPath());
List<String> blobsToDelete = new ArrayList<>();
for (int i = 0; i < 10; i++) {
byte[] bytes = randomBytes(randomInt(100));
String blobName = randomAlphaOfLength(10);
container.writeBlob(blobName, new BytesArray(bytes), false);
blobsToDelete.add(blobName);
}

// Try to delete non existent blobs
for (int i = 0; i < 10; i++) {
blobsToDelete.add(randomName());
}

Randomness.shuffle(blobsToDelete);
container.deleteBlobsIgnoringIfNotExists(blobsToDelete);
assertThat(container.listBlobs(), is(anEmptyMap()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,4 @@ private void ensureSasTokenPermissions() {
}));
future.actionGet();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/66633")
@Override
// This override is only here so we can mute the test without muting the whole suite, remove it when the test is fixed
public void testCleanup() throws Exception {
super.testCleanup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,24 +275,25 @@ void deleteBlobList(List<String> blobs) throws IOException {
List<Mono<Void>> deleteTasks = new ArrayList<>(blobs.size());
final BlobContainerAsyncClient blobContainerClient = asyncClient.getBlobContainerAsyncClient(container);
for (String blob : blobs) {
deleteTasks.add(blobContainerClient.getBlobAsyncClient(blob).delete());
final Mono<Void> deleteTask = blobContainerClient.getBlobAsyncClient(blob)
.delete()
// Ignore not found blobs
.onErrorResume(e -> (e instanceof BlobStorageException) && ((BlobStorageException) e).getStatusCode() == 404,
throwable -> Mono.empty());
deleteTasks.add(deleteTask);
}

executeDeleteTasks(deleteTasks);
});
} catch (BlobStorageException e) {
if (e.getStatusCode() != 404) {
throw new IOException("Unable to delete blobs " + blobs, e);
}
} catch (Exception e) {
throw new IOException("Unable to delete blobs " + blobs, e);
}
}

private void executeDeleteTasks(List<Mono<Void>> deleteTasks) {
Flux.merge(deleteTasks)
.collectList()
.block();
// zipDelayError executes all tasks in parallel and delays
// error propagation until all tasks have finished.
Mono.zipDelayError(deleteTasks, results -> null).block();
}

public InputStream getInputStream(String blob, long position, final @Nullable Long length) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ public void handle(final HttpExchange exchange) throws IOException {

} else if (Regex.simpleMatch("DELETE /" + account + "/" + container + "/*", request)) {
// Delete Blob (https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob)
blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath()));
exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1);
final boolean deleted = blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath()));
if (deleted) {
exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1);
} else {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
}

} else if (Regex.simpleMatch("GET /" + account + "/" + container + "?*restype=container*comp=list*", request)) {
// List Blobs (https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs)
Expand Down

0 comments on commit f1ebe11

Please sign in to comment.