From ff7fea2bf72ae8dc3e4b663ec3b9ce8c8deecef8 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Tue, 25 Jul 2017 12:32:18 -0700 Subject: [PATCH] Moving bytes signing helpers from `core` to `storage`. --- core/google/cloud/credentials.py | 173 ------------------ core/tests/unit/test_credentials.py | 199 -------------------- storage/google/cloud/storage/_signing.py | 189 +++++++++++++++++++ storage/google/cloud/storage/blob.py | 2 +- storage/nox.py | 12 +- storage/tests/unit/test__signing.py | 222 +++++++++++++++++++++++ 6 files changed, 421 insertions(+), 376 deletions(-) create mode 100644 storage/google/cloud/storage/_signing.py create mode 100644 storage/tests/unit/test__signing.py diff --git a/core/google/cloud/credentials.py b/core/google/cloud/credentials.py index 29c4a5d310f4..b434cac2f1e7 100644 --- a/core/google/cloud/credentials.py +++ b/core/google/cloud/credentials.py @@ -14,17 +14,7 @@ """A simple wrapper around the OAuth2 credentials library.""" -import base64 -import datetime - -import six -from six.moves.urllib.parse import urlencode - import google.auth -import google.auth.credentials -from google.cloud._helpers import _microseconds_from_datetime -from google.cloud._helpers import _NOW -from google.cloud._helpers import UTC def get_credentials(): @@ -38,166 +28,3 @@ def get_credentials(): """ credentials, _ = google.auth.default() return credentials - - -def _get_signed_query_params(credentials, expiration, string_to_sign): - """Gets query parameters for creating a signed URL. - - :type credentials: :class:`google.auth.credentials.Signer` - :param credentials: The credentials used to create a private key - for signing text. - - :type expiration: int or long - :param expiration: When the signed URL should expire. - - :type string_to_sign: str - :param string_to_sign: The string to be signed by the credentials. - - :raises AttributeError: If :meth: sign_blob is unavailable. - - :rtype: dict - :returns: Query parameters matching the signing credentials with a - signed payload. - """ - if not isinstance(credentials, google.auth.credentials.Signing): - auth_uri = ('https://google-cloud-python.readthedocs.io/en/latest/' - 'core/auth.html?highlight=authentication#setting-up-' - 'a-service-account') - raise AttributeError('you need a private key to sign credentials.' - 'the credentials you are currently using %s ' - 'just contains a token. see %s for more ' - 'details.' % (type(credentials), auth_uri)) - - signature_bytes = credentials.sign_bytes(string_to_sign) - signature = base64.b64encode(signature_bytes) - service_account_name = credentials.signer_email - return { - 'GoogleAccessId': service_account_name, - 'Expires': str(expiration), - 'Signature': signature, - } - - -def _get_expiration_seconds(expiration): - """Convert 'expiration' to a number of seconds in the future. - - :type expiration: int, long, datetime.datetime, datetime.timedelta - :param expiration: When the signed URL should expire. - - :raises TypeError: When expiration is not an integer. - - :rtype: int - :returns: a timestamp as an absolute number of seconds. - """ - # If it's a timedelta, add it to `now` in UTC. - if isinstance(expiration, datetime.timedelta): - now = _NOW().replace(tzinfo=UTC) - expiration = now + expiration - - # If it's a datetime, convert to a timestamp. - if isinstance(expiration, datetime.datetime): - micros = _microseconds_from_datetime(expiration) - expiration = micros // 10**6 - - if not isinstance(expiration, six.integer_types): - raise TypeError('Expected an integer timestamp, datetime, or ' - 'timedelta. Got %s' % type(expiration)) - return expiration - - -def generate_signed_url(credentials, resource, expiration, - api_access_endpoint='', - method='GET', content_md5=None, - content_type=None, response_type=None, - response_disposition=None, generation=None): - """Generate signed URL to provide query-string auth'n to a resource. - - .. note:: - - Assumes ``credentials`` implements the - :class:`google.auth.credentials.Signing` interface. Also assumes - ``credentials`` has a ``service_account_email`` property which - identifies the credentials. - - .. note:: - - If you are on Google Compute Engine, you can't generate a signed URL. - Follow `Issue 922`_ for updates on this. If you'd like to be able to - generate a signed URL from GCE, you can use a standard service account - from a JSON file rather than a GCE service account. - - See headers `reference`_ for more details on optional arguments. - - .. _Issue 922: https://github.com/GoogleCloudPlatform/\ - google-cloud-python/issues/922 - .. _reference: https://cloud.google.com/storage/docs/reference-headers - - :type credentials: :class:`google.auth.credentials.Signing` - :param credentials: Credentials object with an associated private key to - sign text. - - :type resource: str - :param resource: A pointer to a specific resource - (typically, ``/bucket-name/path/to/blob.txt``). - - :type expiration: :class:`int`, :class:`long`, :class:`datetime.datetime`, - :class:`datetime.timedelta` - :param expiration: When the signed URL should expire. - - :type api_access_endpoint: str - :param api_access_endpoint: Optional URI base. Defaults to empty string. - - :type method: str - :param method: The HTTP verb that will be used when requesting the URL. - Defaults to ``'GET'``. - - :type content_md5: str - :param content_md5: (Optional) The MD5 hash of the object referenced by - ``resource``. - - :type content_type: str - :param content_type: (Optional) The content type of the object referenced - by ``resource``. - - :type response_type: str - :param response_type: (Optional) Content type of responses to requests for - the signed URL. Used to over-ride the content type of - the underlying resource. - - :type response_disposition: str - :param response_disposition: (Optional) Content disposition of responses to - requests for the signed URL. - - :type generation: str - :param generation: (Optional) A value that indicates which generation of - the resource to fetch. - - :rtype: str - :returns: A signed URL you can use to access the resource - until expiration. - """ - expiration = _get_expiration_seconds(expiration) - - # Generate the string to sign. - string_to_sign = '\n'.join([ - method, - content_md5 or '', - content_type or '', - str(expiration), - resource]) - - # Set the right query parameters. - query_params = _get_signed_query_params(credentials, - expiration, - string_to_sign) - if response_type is not None: - query_params['response-content-type'] = response_type - if response_disposition is not None: - query_params['response-content-disposition'] = response_disposition - if generation is not None: - query_params['generation'] = generation - - # Return the built URL. - return '{endpoint}{resource}?{querystring}'.format( - endpoint=api_access_endpoint, resource=resource, - querystring=urlencode(query_params)) diff --git a/core/tests/unit/test_credentials.py b/core/tests/unit/test_credentials.py index aaffa907dda1..3b313c1dc1d6 100644 --- a/core/tests/unit/test_credentials.py +++ b/core/tests/unit/test_credentials.py @@ -15,7 +15,6 @@ import unittest import mock -import six class Test_get_credentials(unittest.TestCase): @@ -33,201 +32,3 @@ def test_it(self): self.assertIs(found, mock.sentinel.credentials) default.assert_called_once_with() - - -class Test_generate_signed_url(unittest.TestCase): - - def _call_fut(self, *args, **kwargs): - from google.cloud.credentials import generate_signed_url - - return generate_signed_url(*args, **kwargs) - - def _generate_helper(self, response_type=None, response_disposition=None, - generation=None): - import base64 - from six.moves.urllib.parse import parse_qs - from six.moves.urllib.parse import urlsplit - import google.auth.credentials - from google.cloud._testing import _Monkey - from google.cloud import credentials as MUT - - ENDPOINT = 'http://api.example.com' - RESOURCE = '/name/path' - SIGNED = base64.b64encode(b'DEADBEEF') - CREDENTIALS = mock.Mock(spec=google.auth.credentials.Signing) - CREDENTIALS.signer_email = 'service@example.com' - - def _get_signed_query_params(*args): - credentials, expiration = args[:2] - return { - 'GoogleAccessId': credentials.signer_email, - 'Expires': str(expiration), - 'Signature': SIGNED, - } - - with _Monkey(MUT, _get_signed_query_params=_get_signed_query_params): - url = self._call_fut(CREDENTIALS, RESOURCE, 1000, - api_access_endpoint=ENDPOINT, - response_type=response_type, - response_disposition=response_disposition, - generation=generation) - - scheme, netloc, path, qs, frag = urlsplit(url) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'api.example.com') - self.assertEqual(path, RESOURCE) - params = parse_qs(qs) - # In Py3k, parse_qs gives us text values: - self.assertEqual(params.pop('Signature'), [SIGNED.decode('ascii')]) - self.assertEqual(params.pop('Expires'), ['1000']) - self.assertEqual(params.pop('GoogleAccessId'), - [CREDENTIALS.signer_email]) - if response_type is not None: - self.assertEqual(params.pop('response-content-type'), - [response_type]) - if response_disposition is not None: - self.assertEqual(params.pop('response-content-disposition'), - [response_disposition]) - if generation is not None: - self.assertEqual(params.pop('generation'), [generation]) - # Make sure we have checked them all. - self.assertEqual(len(params), 0) - self.assertEqual(frag, '') - - def test_w_expiration_int(self): - self._generate_helper() - - def test_w_custom_fields(self): - response_type = 'text/plain' - response_disposition = 'attachment; filename=blob.png' - generation = '123' - self._generate_helper(response_type=response_type, - response_disposition=response_disposition, - generation=generation) - - -class Test_generate_signed_url_exception(unittest.TestCase): - def test_with_google_credentials(self): - import time - import google.auth.credentials - from google.cloud.credentials import generate_signed_url - - RESOURCE = '/name/path' - - credentials = mock.Mock(spec=google.auth.credentials.Credentials) - expiration = int(time.time() + 5) - self.assertRaises(AttributeError, generate_signed_url, credentials, - resource=RESOURCE, expiration=expiration) - - -class Test__get_signed_query_params(unittest.TestCase): - - def _call_fut(self, credentials, expiration, string_to_sign): - from google.cloud.credentials import _get_signed_query_params - - return _get_signed_query_params(credentials, expiration, - string_to_sign) - - def test_it(self): - import base64 - import google.auth.credentials - - SIG_BYTES = b'DEADBEEF' - ACCOUNT_NAME = mock.sentinel.service_account_email - CREDENTIALS = mock.Mock(spec=google.auth.credentials.Signing) - CREDENTIALS.signer_email = ACCOUNT_NAME - CREDENTIALS.sign_bytes.return_value = SIG_BYTES - EXPIRATION = 100 - STRING_TO_SIGN = 'dummy_signature' - result = self._call_fut(CREDENTIALS, EXPIRATION, - STRING_TO_SIGN) - - self.assertEqual(result, { - 'GoogleAccessId': ACCOUNT_NAME, - 'Expires': str(EXPIRATION), - 'Signature': base64.b64encode(b'DEADBEEF'), - }) - CREDENTIALS.sign_bytes.assert_called_once_with(STRING_TO_SIGN) - - -class Test__get_expiration_seconds(unittest.TestCase): - - def _call_fut(self, expiration): - from google.cloud.credentials import _get_expiration_seconds - - return _get_expiration_seconds(expiration) - - def _utc_seconds(self, when): - import calendar - - return int(calendar.timegm(when.timetuple())) - - def test_w_invalid(self): - self.assertRaises(TypeError, self._call_fut, object()) - self.assertRaises(TypeError, self._call_fut, None) - - def test_w_int(self): - self.assertEqual(self._call_fut(123), 123) - - def test_w_long(self): - if six.PY3: - raise unittest.SkipTest('No long on Python 3') - - self.assertEqual(self._call_fut(long(123)), 123) # noqa: F821 - - def test_w_naive_datetime(self): - import datetime - - expiration_no_tz = datetime.datetime(2004, 8, 19, 0, 0, 0, 0) - utc_seconds = self._utc_seconds(expiration_no_tz) - self.assertEqual(self._call_fut(expiration_no_tz), utc_seconds) - - def test_w_utc_datetime(self): - import datetime - from google.cloud._helpers import UTC - - expiration_utc = datetime.datetime(2004, 8, 19, 0, 0, 0, 0, UTC) - utc_seconds = self._utc_seconds(expiration_utc) - self.assertEqual(self._call_fut(expiration_utc), utc_seconds) - - def test_w_other_zone_datetime(self): - import datetime - from google.cloud._helpers import _UTC - - class CET(_UTC): - _tzname = 'CET' - _utcoffset = datetime.timedelta(hours=1) - - zone = CET() - expiration_other = datetime.datetime(2004, 8, 19, 0, 0, 0, 0, zone) - utc_seconds = self._utc_seconds(expiration_other) - cet_seconds = utc_seconds - (60 * 60) # CET one hour earlier than UTC - self.assertEqual(self._call_fut(expiration_other), cet_seconds) - - def test_w_timedelta_seconds(self): - import datetime - from google.cloud._testing import _Monkey - from google.cloud import credentials as MUT - - dummy_utcnow = datetime.datetime(2004, 8, 19, 0, 0, 0, 0) - utc_seconds = self._utc_seconds(dummy_utcnow) - expiration_as_delta = datetime.timedelta(seconds=10) - - with _Monkey(MUT, _NOW=lambda: dummy_utcnow): - result = self._call_fut(expiration_as_delta) - - self.assertEqual(result, utc_seconds + 10) - - def test_w_timedelta_days(self): - import datetime - from google.cloud._testing import _Monkey - from google.cloud import credentials as MUT - - dummy_utcnow = datetime.datetime(2004, 8, 19, 0, 0, 0, 0) - utc_seconds = self._utc_seconds(dummy_utcnow) - expiration_as_delta = datetime.timedelta(days=1) - - with _Monkey(MUT, _NOW=lambda: dummy_utcnow): - result = self._call_fut(expiration_as_delta) - - self.assertEqual(result, utc_seconds + 86400) diff --git a/storage/google/cloud/storage/_signing.py b/storage/google/cloud/storage/_signing.py new file mode 100644 index 000000000000..58e62ac1502d --- /dev/null +++ b/storage/google/cloud/storage/_signing.py @@ -0,0 +1,189 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import base64 +import datetime + +import six + +import google.auth.credentials +from google.cloud import _helpers + + +NOW = datetime.datetime.utcnow # To be replaced by tests. + + +def get_signed_query_params(credentials, expiration, string_to_sign): + """Gets query parameters for creating a signed URL. + + :type credentials: :class:`google.auth.credentials.Signer` + :param credentials: The credentials used to create a private key + for signing text. + + :type expiration: int or long + :param expiration: When the signed URL should expire. + + :type string_to_sign: str + :param string_to_sign: The string to be signed by the credentials. + + :raises AttributeError: If :meth: sign_blob is unavailable. + + :rtype: dict + :returns: Query parameters matching the signing credentials with a + signed payload. + """ + if not isinstance(credentials, google.auth.credentials.Signing): + auth_uri = ('https://google-cloud-python.readthedocs.io/en/latest/' + 'core/auth.html?highlight=authentication#setting-up-' + 'a-service-account') + raise AttributeError('you need a private key to sign credentials.' + 'the credentials you are currently using %s ' + 'just contains a token. see %s for more ' + 'details.' % (type(credentials), auth_uri)) + + signature_bytes = credentials.sign_bytes(string_to_sign) + signature = base64.b64encode(signature_bytes) + service_account_name = credentials.signer_email + return { + 'GoogleAccessId': service_account_name, + 'Expires': str(expiration), + 'Signature': signature, + } + + +def get_expiration_seconds(expiration): + """Convert 'expiration' to a number of seconds in the future. + + :type expiration: int, long, datetime.datetime, datetime.timedelta + :param expiration: When the signed URL should expire. + + :raises TypeError: When expiration is not an integer. + + :rtype: int + :returns: a timestamp as an absolute number of seconds. + """ + # If it's a timedelta, add it to `now` in UTC. + if isinstance(expiration, datetime.timedelta): + now = NOW().replace(tzinfo=_helpers.UTC) + expiration = now + expiration + + # If it's a datetime, convert to a timestamp. + if isinstance(expiration, datetime.datetime): + micros = _helpers._microseconds_from_datetime(expiration) + expiration = micros // 10**6 + + if not isinstance(expiration, six.integer_types): + raise TypeError('Expected an integer timestamp, datetime, or ' + 'timedelta. Got %s' % type(expiration)) + return expiration + + +def generate_signed_url(credentials, resource, expiration, + api_access_endpoint='', + method='GET', content_md5=None, + content_type=None, response_type=None, + response_disposition=None, generation=None): + """Generate signed URL to provide query-string auth'n to a resource. + + .. note:: + + Assumes ``credentials`` implements the + :class:`google.auth.credentials.Signing` interface. Also assumes + ``credentials`` has a ``service_account_email`` property which + identifies the credentials. + + .. note:: + + If you are on Google Compute Engine, you can't generate a signed URL. + Follow `Issue 922`_ for updates on this. If you'd like to be able to + generate a signed URL from GCE, you can use a standard service account + from a JSON file rather than a GCE service account. + + See headers `reference`_ for more details on optional arguments. + + .. _Issue 922: https://github.com/GoogleCloudPlatform/\ + google-cloud-python/issues/922 + .. _reference: https://cloud.google.com/storage/docs/reference-headers + + :type credentials: :class:`google.auth.credentials.Signing` + :param credentials: Credentials object with an associated private key to + sign text. + + :type resource: str + :param resource: A pointer to a specific resource + (typically, ``/bucket-name/path/to/blob.txt``). + + :type expiration: :class:`int`, :class:`long`, :class:`datetime.datetime`, + :class:`datetime.timedelta` + :param expiration: When the signed URL should expire. + + :type api_access_endpoint: str + :param api_access_endpoint: Optional URI base. Defaults to empty string. + + :type method: str + :param method: The HTTP verb that will be used when requesting the URL. + Defaults to ``'GET'``. + + :type content_md5: str + :param content_md5: (Optional) The MD5 hash of the object referenced by + ``resource``. + + :type content_type: str + :param content_type: (Optional) The content type of the object referenced + by ``resource``. + + :type response_type: str + :param response_type: (Optional) Content type of responses to requests for + the signed URL. Used to over-ride the content type of + the underlying resource. + + :type response_disposition: str + :param response_disposition: (Optional) Content disposition of responses to + requests for the signed URL. + + :type generation: str + :param generation: (Optional) A value that indicates which generation of + the resource to fetch. + + :rtype: str + :returns: A signed URL you can use to access the resource + until expiration. + """ + expiration = get_expiration_seconds(expiration) + + # Generate the string to sign. + string_to_sign = '\n'.join([ + method, + content_md5 or '', + content_type or '', + str(expiration), + resource, + ]) + + # Set the right query parameters. + query_params = get_signed_query_params( + credentials, expiration, string_to_sign) + + if response_type is not None: + query_params['response-content-type'] = response_type + if response_disposition is not None: + query_params['response-content-disposition'] = response_disposition + if generation is not None: + query_params['generation'] = generation + + # Return the built URL. + return '{endpoint}{resource}?{querystring}'.format( + endpoint=api_access_endpoint, resource=resource, + querystring=six.moves.urllib.parse.urlencode(query_params)) diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index 8d6ec2619ea1..dfefc3c1a4fa 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -47,12 +47,12 @@ from google.cloud._helpers import _rfc3339_to_datetime from google.cloud._helpers import _to_bytes from google.cloud._helpers import _bytes_to_unicode -from google.cloud.credentials import generate_signed_url from google.cloud.exceptions import NotFound from google.cloud.exceptions import make_exception from google.cloud.iam import Policy from google.cloud.storage._helpers import _PropertyMixin from google.cloud.storage._helpers import _scalar_property +from google.cloud.storage._signing import generate_signed_url from google.cloud.storage.acl import ObjectACL diff --git a/storage/nox.py b/storage/nox.py index 3de8efed3fd9..18ccf81aaff2 100644 --- a/storage/nox.py +++ b/storage/nox.py @@ -39,10 +39,16 @@ def unit_tests(session, python_version): # Run py.test against the unit tests. session.run( - 'py.test', '--quiet', - '--cov=google.cloud.storage', '--cov=tests.unit', '--cov-append', - '--cov-config=.coveragerc', '--cov-report=', '--cov-fail-under=97', + 'py.test', + '--quiet', + '--cov=google.cloud.storage', + '--cov=tests.unit', + '--cov-append', + '--cov-config=.coveragerc', + '--cov-report=', + '--cov-fail-under=97', 'tests/unit', + *session.posargs ) diff --git a/storage/tests/unit/test__signing.py b/storage/tests/unit/test__signing.py new file mode 100644 index 000000000000..1e2aabb9d25e --- /dev/null +++ b/storage/tests/unit/test__signing.py @@ -0,0 +1,222 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import calendar +import datetime +import time +import unittest + +import mock +import six +from six.moves import urllib_parse + + +class Test_get_expiration_seconds(unittest.TestCase): + + @staticmethod + def _call_fut(expiration): + from google.cloud.storage._signing import get_expiration_seconds + + return get_expiration_seconds(expiration) + + @staticmethod + def _utc_seconds(when): + return int(calendar.timegm(when.timetuple())) + + def test_w_invalid(self): + self.assertRaises(TypeError, self._call_fut, object()) + self.assertRaises(TypeError, self._call_fut, None) + + def test_w_int(self): + self.assertEqual(self._call_fut(123), 123) + + def test_w_long(self): + if six.PY3: + raise unittest.SkipTest('No long on Python 3') + + self.assertEqual(self._call_fut(long(123)), 123) # noqa: F821 + + def test_w_naive_datetime(self): + expiration_no_tz = datetime.datetime(2004, 8, 19, 0, 0, 0, 0) + utc_seconds = self._utc_seconds(expiration_no_tz) + self.assertEqual(self._call_fut(expiration_no_tz), utc_seconds) + + def test_w_utc_datetime(self): + from google.cloud._helpers import UTC + + expiration_utc = datetime.datetime(2004, 8, 19, 0, 0, 0, 0, UTC) + utc_seconds = self._utc_seconds(expiration_utc) + self.assertEqual(self._call_fut(expiration_utc), utc_seconds) + + def test_w_other_zone_datetime(self): + from google.cloud._helpers import _UTC + + class CET(_UTC): + _tzname = 'CET' + _utcoffset = datetime.timedelta(hours=1) + + zone = CET() + expiration_other = datetime.datetime(2004, 8, 19, 0, 0, 0, 0, zone) + utc_seconds = self._utc_seconds(expiration_other) + cet_seconds = utc_seconds - (60 * 60) # CET one hour earlier than UTC + self.assertEqual(self._call_fut(expiration_other), cet_seconds) + + def test_w_timedelta_seconds(self): + dummy_utcnow = datetime.datetime(2004, 8, 19, 0, 0, 0, 0) + utc_seconds = self._utc_seconds(dummy_utcnow) + expiration_as_delta = datetime.timedelta(seconds=10) + + patch = mock.patch( + 'google.cloud.storage._signing.NOW', + return_value=dummy_utcnow) + with patch as utcnow: + result = self._call_fut(expiration_as_delta) + + self.assertEqual(result, utc_seconds + 10) + utcnow.assert_called_once_with() + + def test_w_timedelta_days(self): + dummy_utcnow = datetime.datetime(2004, 8, 19, 0, 0, 0, 0) + utc_seconds = self._utc_seconds(dummy_utcnow) + expiration_as_delta = datetime.timedelta(days=1) + + patch = mock.patch( + 'google.cloud.storage._signing.NOW', + return_value=dummy_utcnow) + with patch as utcnow: + result = self._call_fut(expiration_as_delta) + + self.assertEqual(result, utc_seconds + 86400) + utcnow.assert_called_once_with() + + +class Test_get_signed_query_params(unittest.TestCase): + + @staticmethod + def _call_fut(credentials, expiration, string_to_sign): + from google.cloud.storage._signing import get_signed_query_params + + return get_signed_query_params( + credentials, expiration, string_to_sign) + + def test_it(self): + sig_bytes = b'DEADBEEF' + account_name = mock.sentinel.service_account_email + credentials = _make_credentials( + signing=True, signer_email=account_name) + credentials.sign_bytes.return_value = sig_bytes + expiration = 100 + string_to_sign = 'dummy_signature' + result = self._call_fut( + credentials, expiration, string_to_sign) + + expected = { + 'GoogleAccessId': account_name, + 'Expires': str(expiration), + 'Signature': base64.b64encode(sig_bytes), + } + self.assertEqual(result, expected) + credentials.sign_bytes.assert_called_once_with(string_to_sign) + + +class Test_generate_signed_url(unittest.TestCase): + + @staticmethod + def _call_fut(*args, **kwargs): + from google.cloud.storage._signing import generate_signed_url + + return generate_signed_url(*args, **kwargs) + + def _generate_helper(self, response_type=None, response_disposition=None, + generation=None): + endpoint = 'http://api.example.com' + resource = '/name/path' + credentials = _make_credentials( + signing=True, signer_email='service@example.com') + credentials.sign_bytes.return_value = b'DEADBEEF' + signed = base64.b64encode(credentials.sign_bytes.return_value) + signed = signed.decode('ascii') + + expiration = 1000 + url = self._call_fut( + credentials, + resource, + expiration, + api_access_endpoint=endpoint, + response_type=response_type, + response_disposition=response_disposition, + generation=generation, + ) + + # Check the mock was called. + string_to_sign = '\n'.join([ + 'GET', + '', + '', + str(expiration), + resource, + ]) + credentials.sign_bytes.assert_called_once_with(string_to_sign) + + scheme, netloc, path, qs, frag = urllib_parse.urlsplit(url) + self.assertEqual(scheme, 'http') + self.assertEqual(netloc, 'api.example.com') + self.assertEqual(path, resource) + self.assertEqual(frag, '') + + # Check the URL parameters. + params = urllib_parse.parse_qs(qs) + expected_params = { + 'GoogleAccessId': [credentials.signer_email], + 'Expires': [str(expiration)], + 'Signature': [signed], + } + if response_type is not None: + expected_params['response-content-type'] = [response_type] + if response_disposition is not None: + expected_params['response-content-disposition'] = [ + response_disposition] + if generation is not None: + expected_params['generation'] = [generation] + self.assertEqual(params, expected_params) + + def test_w_expiration_int(self): + self._generate_helper() + + def test_w_custom_fields(self): + response_type = 'text/plain' + response_disposition = 'attachment; filename=blob.png' + generation = '123' + self._generate_helper(response_type=response_type, + response_disposition=response_disposition, + generation=generation) + + def test_with_google_credentials(self): + resource = '/name/path' + credentials = _make_credentials() + expiration = int(time.time() + 5) + self.assertRaises(AttributeError, self._call_fut, credentials, + resource=resource, expiration=expiration) + + +def _make_credentials(signing=False, signer_email=None): + import google.auth.credentials + + if signing: + credentials = mock.Mock(spec=google.auth.credentials.Signing) + credentials.signer_email = signer_email + return credentials + else: + return mock.Mock(spec=google.auth.credentials.Credentials)