Skip to content

Commit

Permalink
Improve exception handling in S3BlobContainer synchronous operations (#…
Browse files Browse the repository at this point in the history
…17049) (#17056)

(cherry picked from commit 1b4a817)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 8776e47 commit 051d298
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -373,17 +374,31 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
}

@Override
public DeleteResult delete() {
public DeleteResult delete() throws IOException {
PlainActionFuture<DeleteResult> future = new PlainActionFuture<>();
deleteAsync(future);
return future.actionGet();
return getFutureValue(future);
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) {
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
PlainActionFuture<Void> future = new PlainActionFuture<>();
deleteBlobsAsyncIgnoringIfNotExists(blobNames, future);
future.actionGet();
getFutureValue(future);
}

private <T> T getFutureValue(PlainActionFuture<T> future) throws IOException {
try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new RuntimeException(e.getCause());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1947,6 +1947,116 @@ public void onFailure(Exception e) {
assertEquals(simulatedFailure, exceptionRef.get().getCause());
}

public void testDeleteWithInterruptedException() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));

// Mock the list operation to block indefinitely
final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
doAnswer(invocation -> {
Thread.currentThread().interrupt();
return null;
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());

when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

IllegalStateException e = expectThrows(IllegalStateException.class, blobContainer::delete);
assertEquals("Future got interrupted", e.getMessage());
assertTrue(Thread.interrupted()); // Clear interrupted state
}

public void testDeleteWithExecutionException() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));

RuntimeException simulatedError = new RuntimeException("Simulated error");
final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
doAnswer(invocation -> {
Subscriber<? super ListObjectsV2Response> subscriber = invocation.getArgument(0);
subscriber.onError(simulatedError);
return null;
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());

when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

IOException e = expectThrows(IOException.class, blobContainer::delete);
assertEquals("Failed to list objects for deletion", e.getMessage());
assertEquals(simulatedError, e.getCause());
}

public void testDeleteBlobsIgnoringIfNotExistsWithInterruptedException() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(5);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));

// Mock deleteObjects to block indefinitely
when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenAnswer(invocation -> {
Thread.currentThread().interrupt();
return null;
});

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
List<String> blobNames = Arrays.asList("test1", "test2");

IllegalStateException e = expectThrows(IllegalStateException.class, () -> blobContainer.deleteBlobsIgnoringIfNotExists(blobNames));
assertEquals("Future got interrupted", e.getMessage());
assertTrue(Thread.interrupted()); // Clear interrupted state
}

public void testDeleteBlobsIgnoringIfNotExistsWithExecutionException() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(5);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null));

RuntimeException simulatedError = new RuntimeException("Simulated delete error");
CompletableFuture<DeleteObjectsResponse> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(simulatedError);
when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(failedFuture);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
List<String> blobNames = Arrays.asList("test1", "test2");

IOException e = expectThrows(IOException.class, () -> blobContainer.deleteBlobsIgnoringIfNotExists(blobNames));
assertEquals("Failed to delete blobs " + blobNames, e.getMessage());
assertEquals(simulatedError, e.getCause().getCause());
}

private void mockObjectResponse(S3AsyncClient s3AsyncClient, String bucketName, String blobName, int objectSize) {

final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(objectSize));
Expand Down

0 comments on commit 051d298

Please sign in to comment.