From bafbadebb04ceb50f084a9e1cafbd3c6109512bd Mon Sep 17 00:00:00 2001 From: Nate Prewitt Date: Mon, 27 Nov 2023 17:14:42 -0700 Subject: [PATCH] Add support for CRC32 default for s3express --- .../next-release/enhancement-s3-41099.json | 5 + s3transfer/manager.py | 5 + s3transfer/utils.py | 18 +++- tests/functional/test_upload.py | 101 ++++++++++++++++-- tests/unit/test_utils.py | 34 ++++++ 5 files changed, 149 insertions(+), 14 deletions(-) create mode 100644 .changes/next-release/enhancement-s3-41099.json diff --git a/.changes/next-release/enhancement-s3-41099.json b/.changes/next-release/enhancement-s3-41099.json new file mode 100644 index 00000000..add2dfdc --- /dev/null +++ b/.changes/next-release/enhancement-s3-41099.json @@ -0,0 +1,5 @@ +{ + "type": "enhancement", + "category": "``s3``", + "description": "Added support for defaulting checksums to CRC32 for s3express." +} diff --git a/s3transfer/manager.py b/s3transfer/manager.py index b11daeba..ab9a210f 100644 --- a/s3transfer/manager.py +++ b/s3transfer/manager.py @@ -35,6 +35,7 @@ OSUtils, SlidingWindowSemaphore, TaskSemaphore, + add_s3express_defaults, get_callbacks, signal_not_transferring, signal_transferring, @@ -320,6 +321,7 @@ def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None): subscribers = [] self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS) self._validate_if_bucket_supported(bucket) + self._add_operation_defaults(bucket, extra_args) call_args = CallArgs( fileobj=fileobj, bucket=bucket, @@ -502,6 +504,9 @@ def _validate_all_known_args(self, actual, allowed): "must be one of: %s" % (kwarg, ', '.join(allowed)) ) + def _add_operation_defaults(self, bucket, extra_args): + add_s3express_defaults(bucket, extra_args) + def _submit_transfer( self, call_args, submission_task_cls, extra_main_kwargs=None ): diff --git a/s3transfer/utils.py b/s3transfer/utils.py index 61407eba..9954dc0a 100644 --- a/s3transfer/utils.py +++ b/s3transfer/utils.py @@ -22,6 +22,8 @@ from collections import defaultdict from botocore.exceptions import IncompleteReadError, ReadTimeoutError +from botocore.httpchecksum import AwsChunkedWrapper +from botocore.utils import is_s3express_bucket from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file @@ -54,10 +56,12 @@ def signal_not_transferring(request, operation_name, **kwargs): def signal_transferring(request, operation_name, **kwargs): - if operation_name in ['PutObject', 'UploadPart'] and hasattr( - request.body, 'signal_transferring' - ): - request.body.signal_transferring() + if operation_name in ['PutObject', 'UploadPart']: + body = request.body + if isinstance(body, AwsChunkedWrapper): + body = getattr(body, '_raw', None) + if hasattr(body, 'signal_transferring'): + body.signal_transferring() def calculate_num_parts(size, part_size): @@ -800,3 +804,9 @@ def _adjust_for_max_parts(self, current_chunksize, file_size): ) return chunksize + + +def add_s3express_defaults(bucket, extra_args): + if is_s3express_bucket(bucket) and "ChecksumAlgorithm" not in extra_args: + # Default Transfer Operations to S3Express to use CRC32 + extra_args["ChecksumAlgorithm"] = "crc32" diff --git a/tests/functional/test_upload.py b/tests/functional/test_upload.py index cf61ecb3..021b409b 100644 --- a/tests/functional/test_upload.py +++ b/tests/functional/test_upload.py @@ -142,9 +142,12 @@ class TestNonMultipartUpload(BaseUploadTest): __test__ = True def add_put_object_response_with_default_expected_params( - self, extra_expected_params=None + self, extra_expected_params=None, bucket=None ): - expected_params = {'Body': ANY, 'Bucket': self.bucket, 'Key': self.key} + if bucket is None: + bucket = self.bucket + + expected_params = {'Body': ANY, 'Bucket': bucket, 'Key': self.key} if extra_expected_params: expected_params.update(extra_expected_params) upload_response = self.create_stubbed_responses()[0] @@ -167,9 +170,9 @@ def test_upload(self): self.assert_put_object_body_was_correct() def test_upload_with_checksum(self): - self.extra_args['ChecksumAlgorithm'] = 'crc32' + self.extra_args['ChecksumAlgorithm'] = 'sha256' self.add_put_object_response_with_default_expected_params( - extra_expected_params={'ChecksumAlgorithm': 'crc32'} + extra_expected_params={'ChecksumAlgorithm': 'sha256'} ) future = self.manager.upload( self.filename, self.bucket, self.key, self.extra_args @@ -178,6 +181,21 @@ def test_upload_with_checksum(self): self.assert_expected_client_calls_were_correct() self.assert_put_object_body_was_correct() + def test_upload_with_s3express_default_checksum(self): + s3express_bucket = "mytestbucket--usw2-az6--x-s3" + self.assertFalse("ChecksumAlgorithm" in self.extra_args) + + self.add_put_object_response_with_default_expected_params( + extra_expected_params={'ChecksumAlgorithm': 'crc32'}, + bucket=s3express_bucket, + ) + future = self.manager.upload( + self.filename, s3express_bucket, self.key, self.extra_args + ) + future.result() + self.assert_expected_client_calls_were_correct() + self.assert_put_object_body_was_correct() + def test_upload_for_fileobj(self): self.add_put_object_response_with_default_expected_params() with open(self.filename, 'rb') as f: @@ -342,9 +360,14 @@ def assert_upload_part_bodies_were_correct(self): self.assertEqual(self.sent_bodies, expected_contents) def add_create_multipart_response_with_default_expected_params( - self, extra_expected_params=None + self, + extra_expected_params=None, + bucket=None, ): - expected_params = {'Bucket': self.bucket, 'Key': self.key} + if bucket is None: + bucket = self.bucket + + expected_params = {'Bucket': bucket, 'Key': self.key} if extra_expected_params: expected_params.update(extra_expected_params) response = self.create_stubbed_responses()[0] @@ -352,14 +375,19 @@ def add_create_multipart_response_with_default_expected_params( self.stubber.add_response(**response) def add_upload_part_responses_with_default_expected_params( - self, extra_expected_params=None + self, + extra_expected_params=None, + bucket=None, ): + if bucket is None: + bucket = self.bucket + num_parts = 3 upload_part_responses = self.create_stubbed_responses()[1:-1] for i in range(num_parts): upload_part_response = upload_part_responses[i] expected_params = { - 'Bucket': self.bucket, + 'Bucket': bucket, 'Key': self.key, 'UploadId': self.multipart_id, 'Body': ANY, @@ -378,10 +406,15 @@ def add_upload_part_responses_with_default_expected_params( self.stubber.add_response(**upload_part_response) def add_complete_multipart_response_with_default_expected_params( - self, extra_expected_params=None + self, + extra_expected_params=None, + bucket=None, ): + if bucket is None: + bucket = self.bucket + expected_params = { - 'Bucket': self.bucket, + 'Bucket': bucket, 'Key': self.key, 'UploadId': self.multipart_id, 'MultipartUpload': { @@ -600,6 +633,54 @@ def test_multipart_upload_passes_checksums(self): future.result() self.assert_expected_client_calls_were_correct() + def test_multipart_upload_sets_s3express_default_checksum(self): + s3express_bucket = "mytestbucket--usw2-az6--x-s3" + self.assertFalse('ChecksumAlgorithm' in self.extra_args) + + # ChecksumAlgorithm should be passed on the create_multipart call + self.add_create_multipart_response_with_default_expected_params( + extra_expected_params={'ChecksumAlgorithm': 'crc32'}, + bucket=s3express_bucket, + ) + + # ChecksumAlgorithm should be forwarded and a SHA1 will come back + self.add_upload_part_responses_with_default_expected_params( + extra_expected_params={'ChecksumAlgorithm': 'crc32'}, + bucket=s3express_bucket, + ) + + # The checksums should be used in the complete call like etags + self.add_complete_multipart_response_with_default_expected_params( + extra_expected_params={ + 'MultipartUpload': { + 'Parts': [ + { + 'ETag': 'etag-1', + 'PartNumber': 1, + 'ChecksumCRC32': 'sum1==', + }, + { + 'ETag': 'etag-2', + 'PartNumber': 2, + 'ChecksumCRC32': 'sum2==', + }, + { + 'ETag': 'etag-3', + 'PartNumber': 3, + 'ChecksumCRC32': 'sum3==', + }, + ] + } + }, + bucket=s3express_bucket, + ) + + future = self.manager.upload( + self.filename, s3express_bucket, self.key, self.extra_args + ) + future.result() + self.assert_expected_client_calls_were_correct() + def test_multipart_upload_with_ssec_args(self): params = { 'RequestPayer': 'requester', diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 83ce1265..ea7edf8a 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -20,6 +20,8 @@ import time from io import BytesIO, StringIO +import pytest + from s3transfer.futures import TransferFuture, TransferMeta from s3transfer.utils import ( MAX_PARTS, @@ -36,6 +38,7 @@ SlidingWindowSemaphore, StreamReaderProgress, TaskSemaphore, + add_s3express_defaults, calculate_num_parts, calculate_range_parameter, get_callbacks, @@ -1187,3 +1190,34 @@ def test_unknown_file_size_above_maximum(self): chunksize = MAX_SINGLE_UPLOAD_SIZE + 1 new_size = self.adjuster.adjust_chunksize(chunksize) self.assertEqual(new_size, MAX_SINGLE_UPLOAD_SIZE) + + +class TestS3ExpressDefaults: + @pytest.mark.parametrize( + "bucket,extra_args,expected", + ( + ( + "mytestbucket--usw2-az2--x-s3", + {}, + {"ChecksumAlgorithm": "crc32"}, + ), + ( + "mytestbucket--usw2-az2--x-s3", + {"Some": "Setting"}, + {"ChecksumAlgorithm": "crc32", "Some": "Setting"}, + ), + ( + "mytestbucket", + {}, + {}, + ), + ( + "mytestbucket--usw2-az2--x-s3", + {"ChecksumAlgorithm": "sha256"}, + {"ChecksumAlgorithm": "sha256"}, + ), + ), + ) + def test_add_s3express_defaults(self, bucket, extra_args, expected): + add_s3express_defaults(bucket, extra_args) + assert extra_args == expected