Skip to content

Commit

Permalink
Refactoring - separating CRT logic from S3 Transfer Manager code (#3985)
Browse files Browse the repository at this point in the history
* Refactoring - separating CRT logic from S3 Transfer Manager code

* Rename S3TransferManagerFactory so that it doesn't pop up in IDE suggestions
  • Loading branch information
zoewangg authored May 18, 2023
1 parent ee31237 commit cffd8ea
Show file tree
Hide file tree
Showing 18 changed files with 918 additions and 571 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-f21648a.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
"contributor": "",
"description": "Fixed the issue where S3 Transfer Manager attempted to load AWS CRT classes when Java based S3 client was used. See [#3936](https://github.com/aws/aws-sdk-java-v2/issues/3936)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
import software.amazon.awssdk.core.waiters.AsyncWaiter;
import software.amazon.awssdk.core.waiters.Waiter;
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
Expand All @@ -48,19 +52,22 @@ public class S3TransferManagerUploadPauseResumeIntegrationTest extends S3Integra
private static final long OBJ_SIZE = 24 * MB;
private static File largeFile;
private static File smallFile;
private static ScheduledExecutorService executorService;

@BeforeAll
public static void setup() throws Exception {
createBucket(BUCKET);
largeFile = new RandomTempFile(OBJ_SIZE);
smallFile = new RandomTempFile(2 * MB);
executorService = Executors.newScheduledThreadPool(3);
}

@AfterAll
public static void cleanup() {
deleteBucketAndAllContents(BUCKET);
largeFile.delete();
smallFile.delete();
executorService.shutdown();
}

@Test
Expand Down Expand Up @@ -151,8 +158,13 @@ private void verifyMultipartUploadIdExists(ResumableFileUpload resumableFileUplo

private void verifyMultipartUploadIdNotExist(ResumableFileUpload resumableFileUpload) {
String multipartUploadId = resumableFileUpload.multipartUploadId().get();
assertThatThrownBy(() -> s3Async.listParts(r -> r.uploadId(multipartUploadId).bucket(BUCKET).key(KEY)).join())
.hasCauseInstanceOf(NoSuchUploadException.class);
AsyncWaiter<ListPartsResponse> waiter = AsyncWaiter.builder(ListPartsResponse.class)
.addAcceptor(WaiterAcceptor.successOnExceptionAcceptor(e -> e instanceof NoSuchUploadException))
.addAcceptor(WaiterAcceptor.retryOnResponseAcceptor(r -> true))
.overrideConfiguration(o -> o.waitTimeout(Duration.ofMinutes(1)))
.scheduledExecutorService(executorService)
.build();
waiter.runAsync(() -> s3Async.listParts(r -> r.uploadId(multipartUploadId).bucket(BUCKET).key(KEY)));
}

private static void waitUntilMultipartUploadExists() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
import software.amazon.awssdk.transfer.s3.internal.DefaultS3TransferManager;
import software.amazon.awssdk.transfer.s3.internal.TransferManagerFactory;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryDownload;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.Copy;
Expand Down Expand Up @@ -678,7 +678,7 @@ static S3TransferManager create() {
* Creates a default builder for {@link S3TransferManager}.
*/
static S3TransferManager.Builder builder() {
return DefaultS3TransferManager.builder();
return new TransferManagerFactory.DefaultBuilder();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.DEFAULT_FILE_UPLOAD_CHUNK_SIZE;
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.assertNotUnsupportedArn;
import static software.amazon.awssdk.transfer.s3.internal.utils.FileUtils.fileNotModified;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.internal.model.CrtFileUpload;
import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;

/**
* An implementation of {@link S3TransferManager} that uses CRT-based S3 client under the hood.
*/
@SdkInternalApi
class CrtS3TransferManager extends DelegatingS3TransferManager {
private static final Logger log = Logger.loggerFor(S3TransferManager.class);
private final S3AsyncClient s3AsyncClient;

CrtS3TransferManager(TransferManagerConfiguration transferConfiguration, S3AsyncClient s3AsyncClient,
boolean isDefaultS3AsyncClient) {
super(new GenericS3TransferManager(transferConfiguration, s3AsyncClient, isDefaultS3AsyncClient));
this.s3AsyncClient = s3AsyncClient;
}

@Override
public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
Validate.paramNotNull(uploadFileRequest, "uploadFileRequest");
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();

AsyncRequestBody requestBody =
FileAsyncRequestBody.builder()
.path(uploadFileRequest.source())
.chunkSizeInBytes(DEFAULT_FILE_UPLOAD_CHUNK_SIZE)
.build();

Consumer<SdkHttpExecutionAttributes.Builder> attachObservable =
b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable);

PutObjectRequest putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachObservable);

CompletableFuture<CompletedFileUpload> returnFuture = new CompletableFuture<>();

TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, requestBody);
progressUpdater.transferInitiated();
requestBody = progressUpdater.wrapRequestBody(requestBody);
progressUpdater.registerCompletion(returnFuture);

try {
assertNotUnsupportedArn(putObjectRequest.bucket(), "upload");

CompletableFuture<PutObjectResponse> crtFuture =
s3AsyncClient.putObject(putObjectRequest, requestBody);

// Forward upload cancellation to CRT future
CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture);

CompletableFutureUtils.forwardTransformedResultTo(crtFuture, returnFuture,
r -> CompletedFileUpload.builder()
.response(r)
.build());
} catch (Throwable throwable) {
returnFuture.completeExceptionally(throwable);
}


return new CrtFileUpload(returnFuture, progressUpdater.progress(), observable, uploadFileRequest);
}

private FileUpload uploadFromBeginning(ResumableFileUpload resumableFileUpload, boolean fileModified,
boolean noResumeToken) {
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
if (fileModified) {
log.debug(() -> String.format("The file (%s) has been modified since "
+ "the last pause. " +
"The SDK will upload the requested object in bucket"
+ " (%s) with key (%s) from "
+ "the "
+ "beginning.",
uploadFileRequest.source(),
putObjectRequest.bucket(),
putObjectRequest.key()));
resumableFileUpload.multipartUploadId()
.ifPresent(id -> {
log.debug(() -> "Aborting previous upload with multipartUploadId: " + id);
s3AsyncClient.abortMultipartUpload(
AbortMultipartUploadRequest.builder()
.bucket(putObjectRequest.bucket())
.key(putObjectRequest.key())
.uploadId(id)
.build())
.exceptionally(t -> {
log.warn(() -> String.format("Failed to abort previous multipart upload "
+ "(id: %s)"
+ ". You may need to call "
+ "S3AsyncClient#abortMultiPartUpload to "
+ "free all storage consumed by"
+ " all parts. ",
id), t);
return null;
});
});
}

if (noResumeToken) {
log.debug(() -> String.format("No resume token is found. " +
"The SDK will upload the requested object in bucket"
+ " (%s) with key (%s) from "
+ "the beginning.",
putObjectRequest.bucket(),
putObjectRequest.key()));
}


return uploadFile(uploadFileRequest);
}

@Override
public FileUpload resumeUploadFile(ResumableFileUpload resumableFileUpload) {
Validate.paramNotNull(resumableFileUpload, "resumableFileUpload");

boolean fileModified = !fileNotModified(resumableFileUpload.fileLength(),
resumableFileUpload.fileLastModified(),
resumableFileUpload.uploadFileRequest().source());

boolean noResumeToken = !hasResumeToken(resumableFileUpload);

if (fileModified || noResumeToken) {
return uploadFromBeginning(resumableFileUpload, fileModified, noResumeToken);
}

return doResumeUpload(resumableFileUpload);
}

private FileUpload doResumeUpload(ResumableFileUpload resumableFileUpload) {
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
ResumeToken resumeToken = crtResumeToken(resumableFileUpload);

Consumer<SdkHttpExecutionAttributes.Builder> attachResumeToken =
b -> b.put(CRT_PAUSE_RESUME_TOKEN, resumeToken);

PutObjectRequest modifiedPutObjectRequest = attachSdkAttribute(putObjectRequest, attachResumeToken);

return uploadFile(uploadFileRequest.toBuilder()
.putObjectRequest(modifiedPutObjectRequest)
.build());
}

private static ResumeToken crtResumeToken(ResumableFileUpload resumableFileUpload) {
return new ResumeToken(new ResumeToken.PutResumeTokenBuilder()
.withNumPartsCompleted(resumableFileUpload.transferredParts().orElse(0L))
.withTotalNumParts(resumableFileUpload.totalParts().orElse(0L))
.withPartSize(resumableFileUpload.partSizeInBytes().getAsLong())
.withUploadId(resumableFileUpload.multipartUploadId().orElse(null)));
}

private boolean hasResumeToken(ResumableFileUpload resumableFileUpload) {
return resumableFileUpload.totalParts().isPresent() && resumableFileUpload.partSizeInBytes().isPresent();
}

private PutObjectRequest attachSdkAttribute(PutObjectRequest putObjectRequest,
Consumer<SdkHttpExecutionAttributes.Builder> builderMutation) {
SdkHttpExecutionAttributes modifiedAttributes =
putObjectRequest.overrideConfiguration().map(o -> o.executionAttributes().getAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES))
.map(b -> b.toBuilder().applyMutation(builderMutation).build())
.orElseGet(() -> SdkHttpExecutionAttributes.builder().applyMutation(builderMutation).build());

Consumer<AwsRequestOverrideConfiguration.Builder> attachSdkHttpAttributes =
b -> b.putExecutionAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES, modifiedAttributes);

AwsRequestOverrideConfiguration modifiedRequestOverrideConfig =
putObjectRequest.overrideConfiguration()
.map(o -> o.toBuilder().applyMutation(attachSdkHttpAttributes).build())
.orElseGet(() -> AwsRequestOverrideConfiguration.builder()
.applyMutation(attachSdkHttpAttributes)
.build());

return putObjectRequest.toBuilder()
.overrideConfiguration(modifiedRequestOverrideConfig)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.transfer.s3.internal;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.Copy;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
import software.amazon.awssdk.transfer.s3.model.DirectoryDownload;
import software.amazon.awssdk.transfer.s3.model.DirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.Download;
import software.amazon.awssdk.transfer.s3.model.DownloadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
import software.amazon.awssdk.transfer.s3.model.DownloadRequest;
import software.amazon.awssdk.transfer.s3.model.FileDownload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.ResumableFileDownload;
import software.amazon.awssdk.transfer.s3.model.Upload;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
import software.amazon.awssdk.transfer.s3.model.UploadRequest;


/**
* An {@link S3TransferManager} that just delegates to another {@link S3TransferManager}.
*/
@SdkInternalApi
abstract class DelegatingS3TransferManager implements S3TransferManager {
private final S3TransferManager delegate;

protected DelegatingS3TransferManager(S3TransferManager delegate) {
this.delegate = delegate;
}

@Override
public Upload upload(UploadRequest uploadRequest) {
return delegate.upload(uploadRequest);
}

@Override
public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
return delegate.uploadFile(uploadFileRequest);
}

@Override
public DirectoryUpload uploadDirectory(UploadDirectoryRequest uploadDirectoryRequest) {
return delegate.uploadDirectory(uploadDirectoryRequest);
}

@Override
public <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downloadRequest) {
return delegate.download(downloadRequest);
}

@Override
public FileDownload downloadFile(DownloadFileRequest downloadRequest) {
return delegate.downloadFile(downloadRequest);
}

@Override
public FileDownload resumeDownloadFile(ResumableFileDownload resumableFileDownload) {
return delegate.resumeDownloadFile(resumableFileDownload);
}

@Override
public DirectoryDownload downloadDirectory(DownloadDirectoryRequest downloadDirectoryRequest) {
return delegate.downloadDirectory(downloadDirectoryRequest);
}

@Override
public Copy copy(CopyRequest copyRequest) {
return delegate.copy(copyRequest);
}

@Override
public void close() {
delegate.close();
}
}
Loading

0 comments on commit cffd8ea

Please sign in to comment.