Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving bytes signing helpers from core to storage. #3668

Merged
merged 1 commit into from
Jul 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 0 additions & 173 deletions core/google/cloud/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,7 @@

"""A simple wrapper around the OAuth2 credentials library."""

import base64
import datetime

import six
from six.moves.urllib.parse import urlencode

import google.auth
import google.auth.credentials
from google.cloud._helpers import _microseconds_from_datetime
from google.cloud._helpers import _NOW
from google.cloud._helpers import UTC


def get_credentials():
Expand All @@ -38,166 +28,3 @@ def get_credentials():
"""
credentials, _ = google.auth.default()
return credentials


def _get_signed_query_params(credentials, expiration, string_to_sign):
"""Gets query parameters for creating a signed URL.
:type credentials: :class:`google.auth.credentials.Signer`
:param credentials: The credentials used to create a private key
for signing text.
:type expiration: int or long
:param expiration: When the signed URL should expire.
:type string_to_sign: str
:param string_to_sign: The string to be signed by the credentials.
:raises AttributeError: If :meth: sign_blob is unavailable.
:rtype: dict
:returns: Query parameters matching the signing credentials with a
signed payload.
"""
if not isinstance(credentials, google.auth.credentials.Signing):
auth_uri = ('https://google-cloud-python.readthedocs.io/en/latest/'
'core/auth.html?highlight=authentication#setting-up-'
'a-service-account')
raise AttributeError('you need a private key to sign credentials.'
'the credentials you are currently using %s '
'just contains a token. see %s for more '
'details.' % (type(credentials), auth_uri))

signature_bytes = credentials.sign_bytes(string_to_sign)
signature = base64.b64encode(signature_bytes)
service_account_name = credentials.signer_email
return {
'GoogleAccessId': service_account_name,
'Expires': str(expiration),
'Signature': signature,
}


def _get_expiration_seconds(expiration):
"""Convert 'expiration' to a number of seconds in the future.
:type expiration: int, long, datetime.datetime, datetime.timedelta
:param expiration: When the signed URL should expire.
:raises TypeError: When expiration is not an integer.
:rtype: int
:returns: a timestamp as an absolute number of seconds.
"""
# If it's a timedelta, add it to `now` in UTC.
if isinstance(expiration, datetime.timedelta):
now = _NOW().replace(tzinfo=UTC)
expiration = now + expiration

# If it's a datetime, convert to a timestamp.
if isinstance(expiration, datetime.datetime):
micros = _microseconds_from_datetime(expiration)
expiration = micros // 10**6

if not isinstance(expiration, six.integer_types):
raise TypeError('Expected an integer timestamp, datetime, or '
'timedelta. Got %s' % type(expiration))
return expiration


def generate_signed_url(credentials, resource, expiration,
api_access_endpoint='',
method='GET', content_md5=None,
content_type=None, response_type=None,
response_disposition=None, generation=None):
"""Generate signed URL to provide query-string auth'n to a resource.
.. note::
Assumes ``credentials`` implements the
:class:`google.auth.credentials.Signing` interface. Also assumes
``credentials`` has a ``service_account_email`` property which
identifies the credentials.
.. note::
If you are on Google Compute Engine, you can't generate a signed URL.
Follow `Issue 922`_ for updates on this. If you'd like to be able to
generate a signed URL from GCE, you can use a standard service account
from a JSON file rather than a GCE service account.
See headers `reference`_ for more details on optional arguments.
.. _Issue 922: https://github.com/GoogleCloudPlatform/\
google-cloud-python/issues/922
.. _reference: https://cloud.google.com/storage/docs/reference-headers
:type credentials: :class:`google.auth.credentials.Signing`
:param credentials: Credentials object with an associated private key to
sign text.
:type resource: str
:param resource: A pointer to a specific resource
(typically, ``/bucket-name/path/to/blob.txt``).
:type expiration: :class:`int`, :class:`long`, :class:`datetime.datetime`,
:class:`datetime.timedelta`
:param expiration: When the signed URL should expire.
:type api_access_endpoint: str
:param api_access_endpoint: Optional URI base. Defaults to empty string.
:type method: str
:param method: The HTTP verb that will be used when requesting the URL.
Defaults to ``'GET'``.
:type content_md5: str
:param content_md5: (Optional) The MD5 hash of the object referenced by
``resource``.
:type content_type: str
:param content_type: (Optional) The content type of the object referenced
by ``resource``.
:type response_type: str
:param response_type: (Optional) Content type of responses to requests for
the signed URL. Used to over-ride the content type of
the underlying resource.
:type response_disposition: str
:param response_disposition: (Optional) Content disposition of responses to
requests for the signed URL.
:type generation: str
:param generation: (Optional) A value that indicates which generation of
the resource to fetch.
:rtype: str
:returns: A signed URL you can use to access the resource
until expiration.
"""
expiration = _get_expiration_seconds(expiration)

# Generate the string to sign.
string_to_sign = '\n'.join([
method,
content_md5 or '',
content_type or '',
str(expiration),
resource])

# Set the right query parameters.
query_params = _get_signed_query_params(credentials,
expiration,
string_to_sign)
if response_type is not None:
query_params['response-content-type'] = response_type
if response_disposition is not None:
query_params['response-content-disposition'] = response_disposition
if generation is not None:
query_params['generation'] = generation

# Return the built URL.
return '{endpoint}{resource}?{querystring}'.format(
endpoint=api_access_endpoint, resource=resource,
querystring=urlencode(query_params))
199 changes: 0 additions & 199 deletions core/tests/unit/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import unittest

import mock
import six


class Test_get_credentials(unittest.TestCase):
Expand All @@ -33,201 +32,3 @@ def test_it(self):

self.assertIs(found, mock.sentinel.credentials)
default.assert_called_once_with()


class Test_generate_signed_url(unittest.TestCase):

def _call_fut(self, *args, **kwargs):
from google.cloud.credentials import generate_signed_url

return generate_signed_url(*args, **kwargs)

def _generate_helper(self, response_type=None, response_disposition=None,
generation=None):
import base64
from six.moves.urllib.parse import parse_qs
from six.moves.urllib.parse import urlsplit
import google.auth.credentials
from google.cloud._testing import _Monkey
from google.cloud import credentials as MUT

ENDPOINT = 'http://api.example.com'
RESOURCE = '/name/path'
SIGNED = base64.b64encode(b'DEADBEEF')
CREDENTIALS = mock.Mock(spec=google.auth.credentials.Signing)
CREDENTIALS.signer_email = 'service@example.com'

def _get_signed_query_params(*args):
credentials, expiration = args[:2]
return {
'GoogleAccessId': credentials.signer_email,
'Expires': str(expiration),
'Signature': SIGNED,
}

with _Monkey(MUT, _get_signed_query_params=_get_signed_query_params):
url = self._call_fut(CREDENTIALS, RESOURCE, 1000,
api_access_endpoint=ENDPOINT,
response_type=response_type,
response_disposition=response_disposition,
generation=generation)

scheme, netloc, path, qs, frag = urlsplit(url)
self.assertEqual(scheme, 'http')
self.assertEqual(netloc, 'api.example.com')
self.assertEqual(path, RESOURCE)
params = parse_qs(qs)
# In Py3k, parse_qs gives us text values:
self.assertEqual(params.pop('Signature'), [SIGNED.decode('ascii')])
self.assertEqual(params.pop('Expires'), ['1000'])
self.assertEqual(params.pop('GoogleAccessId'),
[CREDENTIALS.signer_email])
if response_type is not None:
self.assertEqual(params.pop('response-content-type'),
[response_type])
if response_disposition is not None:
self.assertEqual(params.pop('response-content-disposition'),
[response_disposition])
if generation is not None:
self.assertEqual(params.pop('generation'), [generation])
# Make sure we have checked them all.
self.assertEqual(len(params), 0)
self.assertEqual(frag, '')

def test_w_expiration_int(self):
self._generate_helper()

def test_w_custom_fields(self):
response_type = 'text/plain'
response_disposition = 'attachment; filename=blob.png'
generation = '123'
self._generate_helper(response_type=response_type,
response_disposition=response_disposition,
generation=generation)


class Test_generate_signed_url_exception(unittest.TestCase):
def test_with_google_credentials(self):
import time
import google.auth.credentials
from google.cloud.credentials import generate_signed_url

RESOURCE = '/name/path'

credentials = mock.Mock(spec=google.auth.credentials.Credentials)
expiration = int(time.time() + 5)
self.assertRaises(AttributeError, generate_signed_url, credentials,
resource=RESOURCE, expiration=expiration)


class Test__get_signed_query_params(unittest.TestCase):

def _call_fut(self, credentials, expiration, string_to_sign):
from google.cloud.credentials import _get_signed_query_params

return _get_signed_query_params(credentials, expiration,
string_to_sign)

def test_it(self):
import base64
import google.auth.credentials

SIG_BYTES = b'DEADBEEF'
ACCOUNT_NAME = mock.sentinel.service_account_email
CREDENTIALS = mock.Mock(spec=google.auth.credentials.Signing)
CREDENTIALS.signer_email = ACCOUNT_NAME
CREDENTIALS.sign_bytes.return_value = SIG_BYTES
EXPIRATION = 100
STRING_TO_SIGN = 'dummy_signature'
result = self._call_fut(CREDENTIALS, EXPIRATION,
STRING_TO_SIGN)

self.assertEqual(result, {
'GoogleAccessId': ACCOUNT_NAME,
'Expires': str(EXPIRATION),
'Signature': base64.b64encode(b'DEADBEEF'),
})
CREDENTIALS.sign_bytes.assert_called_once_with(STRING_TO_SIGN)


class Test__get_expiration_seconds(unittest.TestCase):

def _call_fut(self, expiration):
from google.cloud.credentials import _get_expiration_seconds

return _get_expiration_seconds(expiration)

def _utc_seconds(self, when):
import calendar

return int(calendar.timegm(when.timetuple()))

def test_w_invalid(self):
self.assertRaises(TypeError, self._call_fut, object())
self.assertRaises(TypeError, self._call_fut, None)

def test_w_int(self):
self.assertEqual(self._call_fut(123), 123)

def test_w_long(self):
if six.PY3:
raise unittest.SkipTest('No long on Python 3')

self.assertEqual(self._call_fut(long(123)), 123) # noqa: F821

def test_w_naive_datetime(self):
import datetime

expiration_no_tz = datetime.datetime(2004, 8, 19, 0, 0, 0, 0)
utc_seconds = self._utc_seconds(expiration_no_tz)
self.assertEqual(self._call_fut(expiration_no_tz), utc_seconds)

def test_w_utc_datetime(self):
import datetime
from google.cloud._helpers import UTC

expiration_utc = datetime.datetime(2004, 8, 19, 0, 0, 0, 0, UTC)
utc_seconds = self._utc_seconds(expiration_utc)
self.assertEqual(self._call_fut(expiration_utc), utc_seconds)

def test_w_other_zone_datetime(self):
import datetime
from google.cloud._helpers import _UTC

class CET(_UTC):
_tzname = 'CET'
_utcoffset = datetime.timedelta(hours=1)

zone = CET()
expiration_other = datetime.datetime(2004, 8, 19, 0, 0, 0, 0, zone)
utc_seconds = self._utc_seconds(expiration_other)
cet_seconds = utc_seconds - (60 * 60) # CET one hour earlier than UTC
self.assertEqual(self._call_fut(expiration_other), cet_seconds)

def test_w_timedelta_seconds(self):
import datetime
from google.cloud._testing import _Monkey
from google.cloud import credentials as MUT

dummy_utcnow = datetime.datetime(2004, 8, 19, 0, 0, 0, 0)
utc_seconds = self._utc_seconds(dummy_utcnow)
expiration_as_delta = datetime.timedelta(seconds=10)

with _Monkey(MUT, _NOW=lambda: dummy_utcnow):
result = self._call_fut(expiration_as_delta)

self.assertEqual(result, utc_seconds + 10)

def test_w_timedelta_days(self):
import datetime
from google.cloud._testing import _Monkey
from google.cloud import credentials as MUT

dummy_utcnow = datetime.datetime(2004, 8, 19, 0, 0, 0, 0)
utc_seconds = self._utc_seconds(dummy_utcnow)
expiration_as_delta = datetime.timedelta(days=1)

with _Monkey(MUT, _NOW=lambda: dummy_utcnow):
result = self._call_fut(expiration_as_delta)

self.assertEqual(result, utc_seconds + 86400)
Loading