Skip to content

Commit

Permalink
Moving bytes signing helpers from core to storage. (googleapis#3668)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes authored and landrito committed Aug 21, 2017
1 parent 43bae9a commit 5db3a82
Show file tree
Hide file tree
Showing 6 changed files with 421 additions and 376 deletions.
173 changes: 0 additions & 173 deletions core/google/cloud/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,7 @@

"""A simple wrapper around the OAuth2 credentials library."""

import base64
import datetime

import six
from six.moves.urllib.parse import urlencode

import google.auth
import google.auth.credentials
from google.cloud._helpers import _microseconds_from_datetime
from google.cloud._helpers import _NOW
from google.cloud._helpers import UTC


def get_credentials():
Expand All @@ -38,166 +28,3 @@ def get_credentials():
"""
credentials, _ = google.auth.default()
return credentials


def _get_signed_query_params(credentials, expiration, string_to_sign):
"""Gets query parameters for creating a signed URL.
:type credentials: :class:`google.auth.credentials.Signer`
:param credentials: The credentials used to create a private key
for signing text.
:type expiration: int or long
:param expiration: When the signed URL should expire.
:type string_to_sign: str
:param string_to_sign: The string to be signed by the credentials.
:raises AttributeError: If :meth: sign_blob is unavailable.
:rtype: dict
:returns: Query parameters matching the signing credentials with a
signed payload.
"""
if not isinstance(credentials, google.auth.credentials.Signing):
auth_uri = ('https://google-cloud-python.readthedocs.io/en/latest/'
'core/auth.html?highlight=authentication#setting-up-'
'a-service-account')
raise AttributeError('you need a private key to sign credentials.'
'the credentials you are currently using %s '
'just contains a token. see %s for more '
'details.' % (type(credentials), auth_uri))

signature_bytes = credentials.sign_bytes(string_to_sign)
signature = base64.b64encode(signature_bytes)
service_account_name = credentials.signer_email
return {
'GoogleAccessId': service_account_name,
'Expires': str(expiration),
'Signature': signature,
}


def _get_expiration_seconds(expiration):
"""Convert 'expiration' to a number of seconds in the future.
:type expiration: int, long, datetime.datetime, datetime.timedelta
:param expiration: When the signed URL should expire.
:raises TypeError: When expiration is not an integer.
:rtype: int
:returns: a timestamp as an absolute number of seconds.
"""
# If it's a timedelta, add it to `now` in UTC.
if isinstance(expiration, datetime.timedelta):
now = _NOW().replace(tzinfo=UTC)
expiration = now + expiration

# If it's a datetime, convert to a timestamp.
if isinstance(expiration, datetime.datetime):
micros = _microseconds_from_datetime(expiration)
expiration = micros // 10**6

if not isinstance(expiration, six.integer_types):
raise TypeError('Expected an integer timestamp, datetime, or '
'timedelta. Got %s' % type(expiration))
return expiration


def generate_signed_url(credentials, resource, expiration,
api_access_endpoint='',
method='GET', content_md5=None,
content_type=None, response_type=None,
response_disposition=None, generation=None):
"""Generate signed URL to provide query-string auth'n to a resource.
.. note::
Assumes ``credentials`` implements the
:class:`google.auth.credentials.Signing` interface. Also assumes
``credentials`` has a ``service_account_email`` property which
identifies the credentials.
.. note::
If you are on Google Compute Engine, you can't generate a signed URL.
Follow `Issue 922`_ for updates on this. If you'd like to be able to
generate a signed URL from GCE, you can use a standard service account
from a JSON file rather than a GCE service account.
See headers `reference`_ for more details on optional arguments.
.. _Issue 922: https://github.com/GoogleCloudPlatform/\
google-cloud-python/issues/922
.. _reference: https://cloud.google.com/storage/docs/reference-headers
:type credentials: :class:`google.auth.credentials.Signing`
:param credentials: Credentials object with an associated private key to
sign text.
:type resource: str
:param resource: A pointer to a specific resource
(typically, ``/bucket-name/path/to/blob.txt``).
:type expiration: :class:`int`, :class:`long`, :class:`datetime.datetime`,
:class:`datetime.timedelta`
:param expiration: When the signed URL should expire.
:type api_access_endpoint: str
:param api_access_endpoint: Optional URI base. Defaults to empty string.
:type method: str
:param method: The HTTP verb that will be used when requesting the URL.
Defaults to ``'GET'``.
:type content_md5: str
:param content_md5: (Optional) The MD5 hash of the object referenced by
``resource``.
:type content_type: str
:param content_type: (Optional) The content type of the object referenced
by ``resource``.
:type response_type: str
:param response_type: (Optional) Content type of responses to requests for
the signed URL. Used to over-ride the content type of
the underlying resource.
:type response_disposition: str
:param response_disposition: (Optional) Content disposition of responses to
requests for the signed URL.
:type generation: str
:param generation: (Optional) A value that indicates which generation of
the resource to fetch.
:rtype: str
:returns: A signed URL you can use to access the resource
until expiration.
"""
expiration = _get_expiration_seconds(expiration)

# Generate the string to sign.
string_to_sign = '\n'.join([
method,
content_md5 or '',
content_type or '',
str(expiration),
resource])

# Set the right query parameters.
query_params = _get_signed_query_params(credentials,
expiration,
string_to_sign)
if response_type is not None:
query_params['response-content-type'] = response_type
if response_disposition is not None:
query_params['response-content-disposition'] = response_disposition
if generation is not None:
query_params['generation'] = generation

# Return the built URL.
return '{endpoint}{resource}?{querystring}'.format(
endpoint=api_access_endpoint, resource=resource,
querystring=urlencode(query_params))
199 changes: 0 additions & 199 deletions core/tests/unit/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import unittest

import mock
import six


class Test_get_credentials(unittest.TestCase):
Expand All @@ -33,201 +32,3 @@ def test_it(self):

self.assertIs(found, mock.sentinel.credentials)
default.assert_called_once_with()


class Test_generate_signed_url(unittest.TestCase):

def _call_fut(self, *args, **kwargs):
from google.cloud.credentials import generate_signed_url

return generate_signed_url(*args, **kwargs)

def _generate_helper(self, response_type=None, response_disposition=None,
generation=None):
import base64
from six.moves.urllib.parse import parse_qs
from six.moves.urllib.parse import urlsplit
import google.auth.credentials
from google.cloud._testing import _Monkey
from google.cloud import credentials as MUT

ENDPOINT = 'http://api.example.com'
RESOURCE = '/name/path'
SIGNED = base64.b64encode(b'DEADBEEF')
CREDENTIALS = mock.Mock(spec=google.auth.credentials.Signing)
CREDENTIALS.signer_email = 'service@example.com'

def _get_signed_query_params(*args):
credentials, expiration = args[:2]
return {
'GoogleAccessId': credentials.signer_email,
'Expires': str(expiration),
'Signature': SIGNED,
}

with _Monkey(MUT, _get_signed_query_params=_get_signed_query_params):
url = self._call_fut(CREDENTIALS, RESOURCE, 1000,
api_access_endpoint=ENDPOINT,
response_type=response_type,
response_disposition=response_disposition,
generation=generation)

scheme, netloc, path, qs, frag = urlsplit(url)
self.assertEqual(scheme, 'http')
self.assertEqual(netloc, 'api.example.com')
self.assertEqual(path, RESOURCE)
params = parse_qs(qs)
# In Py3k, parse_qs gives us text values:
self.assertEqual(params.pop('Signature'), [SIGNED.decode('ascii')])
self.assertEqual(params.pop('Expires'), ['1000'])
self.assertEqual(params.pop('GoogleAccessId'),
[CREDENTIALS.signer_email])
if response_type is not None:
self.assertEqual(params.pop('response-content-type'),
[response_type])
if response_disposition is not None:
self.assertEqual(params.pop('response-content-disposition'),
[response_disposition])
if generation is not None:
self.assertEqual(params.pop('generation'), [generation])
# Make sure we have checked them all.
self.assertEqual(len(params), 0)
self.assertEqual(frag, '')

def test_w_expiration_int(self):
self._generate_helper()

def test_w_custom_fields(self):
response_type = 'text/plain'
response_disposition = 'attachment; filename=blob.png'
generation = '123'
self._generate_helper(response_type=response_type,
response_disposition=response_disposition,
generation=generation)


class Test_generate_signed_url_exception(unittest.TestCase):
def test_with_google_credentials(self):
import time
import google.auth.credentials
from google.cloud.credentials import generate_signed_url

RESOURCE = '/name/path'

credentials = mock.Mock(spec=google.auth.credentials.Credentials)
expiration = int(time.time() + 5)
self.assertRaises(AttributeError, generate_signed_url, credentials,
resource=RESOURCE, expiration=expiration)


class Test__get_signed_query_params(unittest.TestCase):

def _call_fut(self, credentials, expiration, string_to_sign):
from google.cloud.credentials import _get_signed_query_params

return _get_signed_query_params(credentials, expiration,
string_to_sign)

def test_it(self):
import base64
import google.auth.credentials

SIG_BYTES = b'DEADBEEF'
ACCOUNT_NAME = mock.sentinel.service_account_email
CREDENTIALS = mock.Mock(spec=google.auth.credentials.Signing)
CREDENTIALS.signer_email = ACCOUNT_NAME
CREDENTIALS.sign_bytes.return_value = SIG_BYTES
EXPIRATION = 100
STRING_TO_SIGN = 'dummy_signature'
result = self._call_fut(CREDENTIALS, EXPIRATION,
STRING_TO_SIGN)

self.assertEqual(result, {
'GoogleAccessId': ACCOUNT_NAME,
'Expires': str(EXPIRATION),
'Signature': base64.b64encode(b'DEADBEEF'),
})
CREDENTIALS.sign_bytes.assert_called_once_with(STRING_TO_SIGN)


class Test__get_expiration_seconds(unittest.TestCase):

def _call_fut(self, expiration):
from google.cloud.credentials import _get_expiration_seconds

return _get_expiration_seconds(expiration)

def _utc_seconds(self, when):
import calendar

return int(calendar.timegm(when.timetuple()))

def test_w_invalid(self):
self.assertRaises(TypeError, self._call_fut, object())
self.assertRaises(TypeError, self._call_fut, None)

def test_w_int(self):
self.assertEqual(self._call_fut(123), 123)

def test_w_long(self):
if six.PY3:
raise unittest.SkipTest('No long on Python 3')

self.assertEqual(self._call_fut(long(123)), 123) # noqa: F821

def test_w_naive_datetime(self):
import datetime

expiration_no_tz = datetime.datetime(2004, 8, 19, 0, 0, 0, 0)
utc_seconds = self._utc_seconds(expiration_no_tz)
self.assertEqual(self._call_fut(expiration_no_tz), utc_seconds)

def test_w_utc_datetime(self):
import datetime
from google.cloud._helpers import UTC

expiration_utc = datetime.datetime(2004, 8, 19, 0, 0, 0, 0, UTC)
utc_seconds = self._utc_seconds(expiration_utc)
self.assertEqual(self._call_fut(expiration_utc), utc_seconds)

def test_w_other_zone_datetime(self):
import datetime
from google.cloud._helpers import _UTC

class CET(_UTC):
_tzname = 'CET'
_utcoffset = datetime.timedelta(hours=1)

zone = CET()
expiration_other = datetime.datetime(2004, 8, 19, 0, 0, 0, 0, zone)
utc_seconds = self._utc_seconds(expiration_other)
cet_seconds = utc_seconds - (60 * 60) # CET one hour earlier than UTC
self.assertEqual(self._call_fut(expiration_other), cet_seconds)

def test_w_timedelta_seconds(self):
import datetime
from google.cloud._testing import _Monkey
from google.cloud import credentials as MUT

dummy_utcnow = datetime.datetime(2004, 8, 19, 0, 0, 0, 0)
utc_seconds = self._utc_seconds(dummy_utcnow)
expiration_as_delta = datetime.timedelta(seconds=10)

with _Monkey(MUT, _NOW=lambda: dummy_utcnow):
result = self._call_fut(expiration_as_delta)

self.assertEqual(result, utc_seconds + 10)

def test_w_timedelta_days(self):
import datetime
from google.cloud._testing import _Monkey
from google.cloud import credentials as MUT

dummy_utcnow = datetime.datetime(2004, 8, 19, 0, 0, 0, 0)
utc_seconds = self._utc_seconds(dummy_utcnow)
expiration_as_delta = datetime.timedelta(days=1)

with _Monkey(MUT, _NOW=lambda: dummy_utcnow):
result = self._call_fut(expiration_as_delta)

self.assertEqual(result, utc_seconds + 86400)
Loading

0 comments on commit 5db3a82

Please sign in to comment.