diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index 0b26781b06a4..b52092072d1e 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -71,12 +71,13 @@ 'storageClass', ) _NUM_RETRIES_MESSAGE = ( - 'num_retries is no longer supported. When a transient error occurs, ' - 'such as a 429 Too Many Requests or 500 Internal Server Error, upload ' - 'requests will be automatically retried. Subsequent retries will be ' - 'done after waiting 1, 2, 4, 8, etc. seconds (exponential backoff) until ' - '10 minutes of wait time have elapsed. At that point, there will be no ' - 'more attempts to retry.') + '`num_retries` has been deprecated and will be removed in a future ' + 'release. The default behavior (when `num_retries` is not specified) when ' + 'a transient error (e.g. 429 Too Many Requests or 500 Internal Server ' + 'Error) occurs will be as follows: upload requests will be automatically ' + 'retried. Subsequent retries will be sent after waiting 1, 2, 4, 8, etc. ' + 'seconds (exponential backoff) until 10 minutes of wait time have ' + 'elapsed. At that point, there will be no more attempts to retry.') _READ_LESS_THAN_SIZE = ( 'Size {:d} was specified but the file-like object only had ' '{:d} bytes remaining.') @@ -583,7 +584,8 @@ def _get_upload_arguments(self, content_type): content_type = self._get_content_type(content_type) return headers, object_metadata, content_type - def _do_multipart_upload(self, client, stream, content_type, size): + def _do_multipart_upload(self, client, stream, content_type, + size, num_retries): """Perform a multipart upload. Assumes ``chunk_size`` is :data:`None` on the current blob. @@ -610,6 +612,10 @@ def _do_multipart_upload(self, client, stream, content_type, size): from ``stream``). If not provided, the upload will be concluded once ``stream`` is exhausted (or :data:`None`). + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the multipart upload request. @@ -631,13 +637,19 @@ def _do_multipart_upload(self, client, stream, content_type, size): upload_url = _MULTIPART_URL_TEMPLATE.format( bucket_path=self.bucket.path) upload = MultipartUpload(upload_url, headers=headers) + + if num_retries is not None: + upload._retry_strategy = resumable_media.RetryStrategy( + max_retries=num_retries) + response = upload.transmit( transport, data, object_metadata, content_type) return response def _initiate_resumable_upload(self, client, stream, content_type, - size, extra_headers=None, chunk_size=None): + size, num_retries, extra_headers=None, + chunk_size=None): """Initiate a resumable upload. The content type of the upload will be determined in order @@ -662,6 +674,10 @@ def _initiate_resumable_upload(self, client, stream, content_type, from ``stream``). If not provided, the upload will be concluded once ``stream`` is exhausted (or :data:`None`). + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + :type extra_headers: dict :param extra_headers: (Optional) Extra headers to add to standard headers. @@ -693,13 +709,19 @@ def _initiate_resumable_upload(self, client, stream, content_type, upload_url = _RESUMABLE_URL_TEMPLATE.format( bucket_path=self.bucket.path) upload = ResumableUpload(upload_url, chunk_size, headers=headers) + + if num_retries is not None: + upload._retry_strategy = resumable_media.RetryStrategy( + max_retries=num_retries) + upload.initiate( transport, stream, object_metadata, content_type, total_bytes=size, stream_final=False) return upload, transport - def _do_resumable_upload(self, client, stream, content_type, size): + def _do_resumable_upload(self, client, stream, content_type, + size, num_retries): """Perform a resumable upload. Assumes ``chunk_size`` is not :data:`None` on the current blob. @@ -726,19 +748,23 @@ def _do_resumable_upload(self, client, stream, content_type, size): from ``stream``). If not provided, the upload will be concluded once ``stream`` is exhausted (or :data:`None`). + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the final chunk is uploaded. """ upload, transport = self._initiate_resumable_upload( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) while not upload.finished: response = upload.transmit_next_chunk(transport) return response - def _do_upload(self, client, stream, content_type, size): + def _do_upload(self, client, stream, content_type, size, num_retries): """Determine an upload strategy and then perform the upload. If the current blob has a ``chunk_size`` set, then a resumable upload @@ -767,6 +793,10 @@ def _do_upload(self, client, stream, content_type, size): from ``stream``). If not provided, the upload will be concluded once ``stream`` is exhausted (or :data:`None`). + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + :rtype: dict :returns: The parsed JSON from the "200 OK" response. This will be the **only** response in the multipart case and it will be the @@ -774,10 +804,10 @@ def _do_upload(self, client, stream, content_type, size): """ if self.chunk_size is None: response = self._do_multipart_upload( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) else: response = self._do_resumable_upload( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) return response.json() @@ -831,7 +861,8 @@ def upload_from_file(self, file_obj, rewind=False, size=None, :param content_type: Optional type of content being uploaded. :type num_retries: int - :param num_retries: Number of upload retries. (Deprecated.) + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) :type client: :class:`~google.cloud.storage.client.Client` :param client: (Optional) The client to use. If not passed, falls back @@ -846,7 +877,7 @@ def upload_from_file(self, file_obj, rewind=False, size=None, _maybe_rewind(file_obj, rewind=rewind) try: created_json = self._do_upload( - client, file_obj, content_type, size) + client, file_obj, content_type, size, num_retries) self._set_properties(created_json) except resumable_media.InvalidResponse as exc: _raise_from_invalid_response(exc) @@ -1004,7 +1035,7 @@ def create_resumable_upload_session( # to the `ResumableUpload` constructor. The chunk size only # matters when **sending** bytes to an upload. upload, _ = self._initiate_resumable_upload( - client, dummy_stream, content_type, size, + client, dummy_stream, content_type, size, None, extra_headers=extra_headers, chunk_size=self._CHUNK_SIZE_MULTIPLE) diff --git a/storage/setup.py b/storage/setup.py index 88ebfcbe853e..e261f6402c02 100644 --- a/storage/setup.py +++ b/storage/setup.py @@ -53,7 +53,7 @@ REQUIREMENTS = [ 'google-cloud-core >= 0.24.1, < 0.25dev', 'google-auth >= 1.0.0', - 'google-resumable-media >= 0.1.0', + 'google-resumable-media >= 0.1.1', 'requests >= 2.0.0', ] diff --git a/storage/tests/unit/test_blob.py b/storage/tests/unit/test_blob.py index bbe67047fbff..21443480b32f 100644 --- a/storage/tests/unit/test_blob.py +++ b/storage/tests/unit/test_blob.py @@ -762,7 +762,8 @@ def _mock_transport(self, status_code, headers, content=b''): fake_transport.request.return_value = fake_response return fake_transport - def _do_multipart_success(self, mock_get_boundary, size=None): + def _do_multipart_success(self, mock_get_boundary, size=None, + num_retries=None): bucket = mock.Mock(path='/b/w00t', spec=[u'path']) blob = self._make_one(u'blob-name', bucket=bucket) self.assertIsNone(blob.chunk_size) @@ -777,7 +778,7 @@ def _do_multipart_success(self, mock_get_boundary, size=None): stream = io.BytesIO(data) content_type = u'application/xml' response = blob._do_multipart_upload( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) # Check the mocks and the returned value. self.assertIs(response, fake_transport.request.return_value) @@ -817,6 +818,11 @@ def test__do_multipart_upload_no_size(self, mock_get_boundary): def test__do_multipart_upload_with_size(self, mock_get_boundary): self._do_multipart_success(mock_get_boundary, size=10) + @mock.patch(u'google.resumable_media._upload.get_boundary', + return_value=b'==0==') + def test__do_multipart_upload_with_retry(self, mock_get_boundary): + self._do_multipart_success(mock_get_boundary, num_retries=8) + def test__do_multipart_upload_bad_size(self): blob = self._make_one(u'blob-name', bucket=None) @@ -826,7 +832,7 @@ def test__do_multipart_upload_bad_size(self): self.assertGreater(size, len(data)) with self.assertRaises(ValueError) as exc_info: - blob._do_multipart_upload(None, stream, None, size) + blob._do_multipart_upload(None, stream, None, size, None) exc_contents = str(exc_info.exception) self.assertIn( @@ -834,7 +840,7 @@ def test__do_multipart_upload_bad_size(self): self.assertEqual(stream.tell(), len(data)) def _initiate_resumable_helper(self, size=None, extra_headers=None, - chunk_size=None): + chunk_size=None, num_retries=None): from google.resumable_media.requests import ResumableUpload bucket = mock.Mock(path='/b/whammy', spec=[u'path']) @@ -862,7 +868,7 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None, stream = io.BytesIO(data) content_type = u'text/plain' upload, transport = blob._initiate_resumable_upload( - client, stream, content_type, size, + client, stream, content_type, size, num_retries, extra_headers=extra_headers, chunk_size=chunk_size) # Check the returned values. @@ -890,6 +896,14 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None, self.assertEqual(upload._total_bytes, size) self.assertEqual(upload._content_type, content_type) self.assertEqual(upload.resumable_url, resumable_url) + retry_strategy = upload._retry_strategy + self.assertEqual(retry_strategy.max_sleep, 64.0) + if num_retries is None: + self.assertEqual(retry_strategy.max_cumulative_retry, 600.0) + self.assertIsNone(retry_strategy.max_retries) + else: + self.assertIsNone(retry_strategy.max_cumulative_retry) + self.assertEqual(retry_strategy.max_retries, num_retries) self.assertIs(transport, fake_transport) # Make sure we never read from the stream. self.assertEqual(stream.tell(), 0) @@ -923,6 +937,9 @@ def test__initiate_resumable_upload_with_extra_headers(self): extra_headers = {'origin': 'http://not-in-kansas-anymore.invalid'} self._initiate_resumable_helper(extra_headers=extra_headers) + def test__initiate_resumable_upload_with_retry(self): + self._initiate_resumable_helper(num_retries=11) + def _make_resumable_transport(self, headers1, headers2, headers3, total_bytes): from google import resumable_media @@ -990,7 +1007,7 @@ def _do_resumable_upload_call2(blob, content_type, data, return mock.call( 'PUT', resumable_url, data=payload, headers=expected_headers) - def _do_resumable_helper(self, use_size=False): + def _do_resumable_helper(self, use_size=False, num_retries=None): bucket = mock.Mock(path='/b/yesterday', spec=[u'path']) blob = self._make_one(u'blob-name', bucket=bucket) blob.chunk_size = blob._CHUNK_SIZE_MULTIPLE @@ -1017,7 +1034,7 @@ def _do_resumable_helper(self, use_size=False): stream = io.BytesIO(data) content_type = u'text/html' response = blob._do_resumable_upload( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) # Check the returned values. self.assertIs(response, responses[2]) @@ -1039,7 +1056,10 @@ def test__do_resumable_upload_no_size(self): def test__do_resumable_upload_with_size(self): self._do_resumable_helper(use_size=True) - def _do_upload_helper(self, chunk_size=None): + def test__do_resumable_upload_with_retry(self): + self._do_resumable_helper(num_retries=6) + + def _do_upload_helper(self, chunk_size=None, num_retries=None): blob = self._make_one(u'blob-name', bucket=None) # Create a fake response. @@ -1061,17 +1081,18 @@ def _do_upload_helper(self, chunk_size=None): size = 12345654321 # Make the request and check the mocks. - created_json = blob._do_upload(client, stream, content_type, size) + created_json = blob._do_upload( + client, stream, content_type, size, num_retries) self.assertIs(created_json, mock.sentinel.json) response.json.assert_called_once_with() if chunk_size is None: blob._do_multipart_upload.assert_called_once_with( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) blob._do_resumable_upload.assert_not_called() else: blob._do_multipart_upload.assert_not_called() blob._do_resumable_upload.assert_called_once_with( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) def test__do_upload_without_chunk_size(self): self._do_upload_helper() @@ -1080,6 +1101,9 @@ def test__do_upload_with_chunk_size(self): chunk_size = 1024 * 1024 * 1024 # 1GB self._do_upload_helper(chunk_size=chunk_size) + def test__do_upload_with_retry(self): + self._do_upload_helper(num_retries=20) + def _upload_from_file_helper(self, side_effect=None, **kwargs): from google.cloud._helpers import UTC @@ -1109,8 +1133,9 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): self.assertEqual(blob.updated, new_updated) # Check the mock. + num_retries = kwargs.get('num_retries') blob._do_upload.assert_called_once_with( - client, stream, content_type, len(data)) + client, stream, content_type, len(data), num_retries) return stream @@ -1151,10 +1176,11 @@ def _do_upload_mock_call_helper(self, blob, client, content_type, size): mock_call = blob._do_upload.mock_calls[0] call_name, pos_args, kwargs = mock_call self.assertEqual(call_name, '') - self.assertEqual(len(pos_args), 4) + self.assertEqual(len(pos_args), 5) self.assertEqual(pos_args[0], client) self.assertEqual(pos_args[2], content_type) self.assertEqual(pos_args[3], size) + self.assertIsNone(pos_args[4]) # num_retries self.assertEqual(kwargs, {}) return pos_args[1]