diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 8690a5c91680a..d5cf201b171bb 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -99,6 +99,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -373,17 +374,31 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS } @Override - public DeleteResult delete() { + public DeleteResult delete() throws IOException { PlainActionFuture future = new PlainActionFuture<>(); deleteAsync(future); - return future.actionGet(); + return getFutureValue(future); } @Override - public void deleteBlobsIgnoringIfNotExists(List blobNames) { + public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { PlainActionFuture future = new PlainActionFuture<>(); deleteBlobsAsyncIgnoringIfNotExists(blobNames, future); - future.actionGet(); + getFutureValue(future); + } + + private T getFutureValue(PlainActionFuture future) throws IOException { + try { + return future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Future got interrupted", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new RuntimeException(e.getCause()); + } } @Override diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index 53371cd1529ce..d3725642760dc 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -1947,6 +1947,116 @@ public void onFailure(Exception e) { assertEquals(simulatedFailure, exceptionRef.get().getCause()); } + public void testDeleteWithInterruptedException() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null)); + + // Mock the list operation to block indefinitely + final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class); + doAnswer(invocation -> { + Thread.currentThread().interrupt(); + return null; + }).when(listPublisher).subscribe(ArgumentMatchers.>any()); + + when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + IllegalStateException e = expectThrows(IllegalStateException.class, blobContainer::delete); + assertEquals("Future got interrupted", e.getMessage()); + assertTrue(Thread.interrupted()); // Clear interrupted state + } + + public void testDeleteWithExecutionException() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null)); + + RuntimeException simulatedError = new RuntimeException("Simulated error"); + final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class); + doAnswer(invocation -> { + Subscriber subscriber = invocation.getArgument(0); + subscriber.onError(simulatedError); + return null; + }).when(listPublisher).subscribe(ArgumentMatchers.>any()); + + when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + IOException e = expectThrows(IOException.class, blobContainer::delete); + assertEquals("Failed to list objects for deletion", e.getMessage()); + assertEquals(simulatedError, e.getCause()); + } + + public void testDeleteBlobsIgnoringIfNotExistsWithInterruptedException() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.getBulkDeletesSize()).thenReturn(5); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null)); + + // Mock deleteObjects to block indefinitely + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenAnswer(invocation -> { + Thread.currentThread().interrupt(); + return null; + }); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + List blobNames = Arrays.asList("test1", "test2"); + + IllegalStateException e = expectThrows(IllegalStateException.class, () -> blobContainer.deleteBlobsIgnoringIfNotExists(blobNames)); + assertEquals("Future got interrupted", e.getMessage()); + assertTrue(Thread.interrupted()); // Clear interrupted state + } + + public void testDeleteBlobsIgnoringIfNotExistsWithExecutionException() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.getBulkDeletesSize()).thenReturn(5); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + when(asyncClientReference.get()).thenReturn(AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null)); + + RuntimeException simulatedError = new RuntimeException("Simulated delete error"); + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(simulatedError); + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(failedFuture); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + List blobNames = Arrays.asList("test1", "test2"); + + IOException e = expectThrows(IOException.class, () -> blobContainer.deleteBlobsIgnoringIfNotExists(blobNames)); + assertEquals("Failed to delete blobs " + blobNames, e.getMessage()); + assertEquals(simulatedError, e.getCause().getCause()); + } + private void mockObjectResponse(S3AsyncClient s3AsyncClient, String bucketName, String blobName, int objectSize) { final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(objectSize));