diff --git a/gcloud/credentials.py b/gcloud/credentials.py index f4f6222be3eea..d71722959e939 100644 --- a/gcloud/credentials.py +++ b/gcloud/credentials.py @@ -29,6 +29,16 @@ from oauth2client import service_account import pytz +try: + from google.appengine.api import app_identity +except ImportError: + app_identity = None + +try: + from oauth2client.appengine import AppAssertionCredentials as _GAECreds +except ImportError: + _GAECreds = None + def get_credentials(): """Gets credentials implicitly from the current environment. @@ -160,7 +170,66 @@ def _get_pem_key(credentials): return RSA.importKey(pem_text) -def _get_signed_query_params(credentials, expiration, signature_string): +def _get_signature_bytes(credentials, string_to_sign): + """Uses crypto attributes of credentials to sign a string/bytes. + + :type credentials: :class:`client.SignedJwtAssertionCredentials`, + :class:`service_account._ServiceAccountCredentials`, + :class:`_GAECreds` + :param credentials: The credentials used for signing text (typically + involves the creation of an RSA key). + + :type string_to_sign: string + :param string_to_sign: The string to be signed by the credentials. + + :rtype: bytes + :returns: Signed bytes produced by the credentials. + """ + if _GAECreds is not None and isinstance(credentials, _GAECreds): + _, signed_bytes = app_identity.sign_blob(string_to_sign) + return signed_bytes + else: + pem_key = _get_pem_key(credentials) + # Sign the string with the RSA key. + signer = PKCS1_v1_5.new(pem_key) + if not isinstance(string_to_sign, six.binary_type): + string_to_sign = string_to_sign.encode('utf-8') + signature_hash = SHA256.new(string_to_sign) + return signer.sign(signature_hash) + + +def _get_service_account_name(credentials): + """Determines service account name from a credentials object. + + :type credentials: :class:`client.SignedJwtAssertionCredentials`, + :class:`service_account._ServiceAccountCredentials`, + :class:`_GAECreds` + :param credentials: The credentials used to determine the service + account name. + + :type string_to_sign: string + :param string_to_sign: The string to be signed by the credentials. + + :rtype: bytes + :returns: Signed bytes produced by the credentials. + :raises: :class:`ValueError` if the credentials are not a valid service + account type + """ + service_account_name = None + if isinstance(credentials, client.SignedJwtAssertionCredentials): + service_account_name = credentials.service_account_name + elif isinstance(credentials, service_account._ServiceAccountCredentials): + service_account_name = credentials._service_account_email + elif _GAECreds is not None and isinstance(credentials, _GAECreds): + service_account_name = app_identity.get_service_account_name() + + if service_account_name is None: + raise ValueError('Service account name could not be determined ' + 'from credentials') + return service_account_name + + +def _get_signed_query_params(credentials, expiration, string_to_sign): """Gets query parameters for creating a signed URL. :type credentials: :class:`client.SignedJwtAssertionCredentials`, @@ -171,27 +240,16 @@ def _get_signed_query_params(credentials, expiration, signature_string): :type expiration: int or long :param expiration: When the signed URL should expire. - :type signature_string: string - :param signature_string: The string to be signed by the credentials. + :type string_to_sign: string + :param string_to_sign: The string to be signed by the credentials. :rtype: dict :returns: Query parameters matching the signing credentials with a signed payload. """ - pem_key = _get_pem_key(credentials) - # Sign the string with the RSA key. - signer = PKCS1_v1_5.new(pem_key) - if not isinstance(signature_string, six.binary_type): - signature_string = signature_string.encode('utf-8') - signature_hash = SHA256.new(signature_string) - signature_bytes = signer.sign(signature_hash) + signature_bytes = _get_signature_bytes(credentials, string_to_sign) signature = base64.b64encode(signature_bytes) - - if isinstance(credentials, client.SignedJwtAssertionCredentials): - service_account_name = credentials.service_account_name - elif isinstance(credentials, service_account._ServiceAccountCredentials): - service_account_name = credentials._service_account_email - # We know one of the above must occur since `_get_pem_key` fails if not. + service_account_name = _get_service_account_name(credentials) return { 'GoogleAccessId': service_account_name, 'Expires': str(expiration), @@ -277,7 +335,7 @@ def generate_signed_url(credentials, resource, expiration, expiration = _get_expiration_seconds(expiration) # Generate the string to sign. - signature_string = '\n'.join([ + string_to_sign = '\n'.join([ method, content_md5 or '', content_type or '', @@ -287,7 +345,7 @@ def generate_signed_url(credentials, resource, expiration, # Set the right query parameters. query_params = _get_signed_query_params(credentials, expiration, - signature_string) + string_to_sign) # Return the built URL. return '{endpoint}{resource}?{querystring}'.format( diff --git a/gcloud/test_credentials.py b/gcloud/test_credentials.py index 732861adbf1a0..25760c8611983 100644 --- a/gcloud/test_credentials.py +++ b/gcloud/test_credentials.py @@ -15,6 +15,51 @@ import unittest2 +def _setup_appengine_import(test_case, app_identity): + import sys + import types + + GOOGLE = types.ModuleType('google') + GAE = types.ModuleType('appengine') + GAE_API = types.ModuleType('api') + GAE_EXT = types.ModuleType('ext') + GAE_EXT_WEBAPP = types.ModuleType('webapp') + GAE_EXT_WEBAPP_UTIL = types.ModuleType('util') + + GOOGLE.appengine = GAE + GAE.api = GAE_API + GAE.api.app_identity = app_identity + GAE.api.memcache = None + GAE.api.users = None + GAE.ext = GAE_EXT + GAE.ext.db = _MockDB + GAE.ext.webapp = GAE_EXT_WEBAPP + GAE.ext.webapp.util = GAE_EXT_WEBAPP_UTIL + GAE.ext.webapp.util.login_required = None + GAE.ext.webapp.util.run_wsgi_app = None + + test_case._PREV_GOOGLE_MODULE = sys.modules['google'] + + sys.modules['google'] = GOOGLE + sys.modules['google.appengine'] = GAE + sys.modules['google.appengine.api'] = GAE_API + sys.modules['google.appengine.ext'] = GAE_EXT + sys.modules['google.appengine.ext.webapp'] = GAE_EXT_WEBAPP + sys.modules['google.appengine.ext.webapp.util'] = GAE_EXT_WEBAPP_UTIL + + +def _teardown_appengine_import(test_case): + import sys + sys.modules.pop('google') + sys.modules.pop('google.appengine') + sys.modules.pop('google.appengine.api') + sys.modules.pop('google.appengine.ext') + sys.modules.pop('google.appengine.ext.webapp') + sys.modules.pop('google.appengine.ext.webapp.util') + + sys.modules['google'] = test_case._PREV_GOOGLE_MODULE + + class TestCredentials(unittest2.TestCase): def test_get_for_service_account_p12_wo_scope(self): @@ -153,34 +198,22 @@ def _get_signed_query_params(*args): self.assertEqual(frag, '') -class Test__get_signed_query_params(unittest2.TestCase): +class Test__get_signature_bytes(unittest2.TestCase): - def _callFUT(self, credentials, expiration, signature_string): - from gcloud.credentials import _get_signed_query_params - return _get_signed_query_params(credentials, expiration, - signature_string) + def setUp(self): + SERVICE_ACCOUNT_NAME = 'SERVICE_ACCOUNT_NAME' + self.APP_IDENTITY = _AppIdentity(SERVICE_ACCOUNT_NAME) + _setup_appengine_import(self, self.APP_IDENTITY) - def test_wrong_type(self): - from gcloud._testing import _Monkey - from gcloud import credentials as MUT - - pkcs_v1_5 = _PKCS1_v1_5() - rsa = _RSA() - sha256 = _SHA256() + def tearDown(self): + _teardown_appengine_import(self) - def _get_pem_key(credentials): - return credentials + def _callFUT(self, credentials, string_to_sign): + from gcloud.credentials import _get_signature_bytes + return _get_signature_bytes(credentials, string_to_sign) - BAD_CREDENTIALS = None - EXPIRATION = '100' - SIGNATURE_STRING = 'dummy_signature' - with _Monkey(MUT, RSA=rsa, PKCS1_v1_5=pkcs_v1_5, - SHA256=sha256, _get_pem_key=_get_pem_key): - self.assertRaises(UnboundLocalError, self._callFUT, - BAD_CREDENTIALS, EXPIRATION, SIGNATURE_STRING) - - def _run_test_with_credentials(self, credentials, account_name, - signature_string=None): + def _run_with_fake_crypto(self, credentials, private_key_text, + string_to_sign): import base64 import six from gcloud._testing import _Monkey @@ -191,68 +224,183 @@ def _run_test_with_credentials(self, credentials, account_name, rsa = _RSA() sha256 = _SHA256() - EXPIRATION = '100' - SIGNATURE_STRING = signature_string or b'dummy_signature' with _Monkey(MUT, crypt=crypt, RSA=rsa, PKCS1_v1_5=pkcs_v1_5, SHA256=sha256): - result = self._callFUT(credentials, EXPIRATION, SIGNATURE_STRING) + result = self._callFUT(credentials, string_to_sign) if crypt._pkcs12_key_as_pem_called: self.assertEqual(crypt._private_key_text, - base64.b64encode(b'dummy_private_key_text')) + base64.b64encode(private_key_text)) self.assertEqual(crypt._private_key_password, 'notasecret') - # sha256._signature_string is always bytes. - if isinstance(SIGNATURE_STRING, six.binary_type): - self.assertEqual(sha256._signature_string, SIGNATURE_STRING) + # sha256._string_to_sign is always bytes. + if isinstance(string_to_sign, six.binary_type): + self.assertEqual(sha256._string_to_sign, string_to_sign) else: - self.assertEqual(sha256._signature_string, - SIGNATURE_STRING.encode('utf-8')) - SIGNED = base64.b64encode(b'DEADBEEF') - expected_query = { - 'Expires': EXPIRATION, - 'GoogleAccessId': account_name, - 'Signature': SIGNED, - } - self.assertEqual(result, expected_query) - - def test_signed_jwt_for_p12(self): - from oauth2client import client + self.assertEqual(sha256._string_to_sign, + string_to_sign.encode('utf-8')) + self.assertEqual(result, b'DEADBEEF') - scopes = [] + def test_p12_type(self): + from oauth2client.client import SignedJwtAssertionCredentials ACCOUNT_NAME = 'dummy_service_account_name' - credentials = client.SignedJwtAssertionCredentials( - ACCOUNT_NAME, b'dummy_private_key_text', scopes) - self._run_test_with_credentials(credentials, ACCOUNT_NAME) + PRIVATE_KEY_TEXT = b'dummy_private_key_text' + STRING_TO_SIGN = b'dummy_signature' + CREDENTIALS = SignedJwtAssertionCredentials( + ACCOUNT_NAME, PRIVATE_KEY_TEXT, []) + self._run_with_fake_crypto(CREDENTIALS, PRIVATE_KEY_TEXT, + STRING_TO_SIGN) + + def test_p12_type_non_bytes_to_sign(self): + from oauth2client.client import SignedJwtAssertionCredentials + ACCOUNT_NAME = 'dummy_service_account_name' + PRIVATE_KEY_TEXT = b'dummy_private_key_text' + STRING_TO_SIGN = u'dummy_signature' + CREDENTIALS = SignedJwtAssertionCredentials( + ACCOUNT_NAME, PRIVATE_KEY_TEXT, []) + self._run_with_fake_crypto(CREDENTIALS, PRIVATE_KEY_TEXT, + STRING_TO_SIGN) + + def test_json_type(self): + from oauth2client import service_account + from gcloud._testing import _Monkey - def test_signature_non_bytes(self): - from oauth2client import client + PRIVATE_KEY_TEXT = 'dummy_private_key_pkcs8_text' + STRING_TO_SIGN = b'dummy_signature' - scopes = [] - ACCOUNT_NAME = 'dummy_service_account_name' - SIGNATURE_STRING = u'dummy_signature' - credentials = client.SignedJwtAssertionCredentials( - ACCOUNT_NAME, b'dummy_private_key_text', scopes) - self._run_test_with_credentials(credentials, ACCOUNT_NAME, - signature_string=SIGNATURE_STRING) + def _get_private_key(private_key_pkcs8_text): + return private_key_pkcs8_text - def test_service_account_via_json_key(self): - from oauth2client import service_account + with _Monkey(service_account, _get_private_key=_get_private_key): + CREDENTIALS = service_account._ServiceAccountCredentials( + 'dummy_service_account_id', 'dummy_service_account_email', + 'dummy_private_key_id', PRIVATE_KEY_TEXT, []) + + self._run_with_fake_crypto(CREDENTIALS, PRIVATE_KEY_TEXT, + STRING_TO_SIGN) + + def test_gae_type(self): + # Relies on setUp fixing up App Engine imports. + from oauth2client.appengine import AppAssertionCredentials from gcloud._testing import _Monkey + from gcloud import credentials - scopes = [] + APP_IDENTITY = self.APP_IDENTITY + CREDENTIALS = AppAssertionCredentials([]) + STRING_TO_SIGN = b'STRING_TO_SIGN' - PRIVATE_TEXT = 'dummy_private_key_pkcs8_text' + with _Monkey(credentials, _GAECreds=AppAssertionCredentials, + app_identity=APP_IDENTITY): + signed_bytes = self._callFUT(CREDENTIALS, b'STRING_TO_SIGN') + + self.assertEqual(signed_bytes, STRING_TO_SIGN) + self.assertEqual(APP_IDENTITY._strings_signed, [STRING_TO_SIGN]) + + +class Test__get_service_account_name(unittest2.TestCase): + + def setUp(self): + SERVICE_ACCOUNT_NAME = 'SERVICE_ACCOUNT_NAME' + self.APP_IDENTITY = _AppIdentity(SERVICE_ACCOUNT_NAME) + _setup_appengine_import(self, self.APP_IDENTITY) + + def tearDown(self): + _teardown_appengine_import(self) + + def _callFUT(self, credentials): + from gcloud.credentials import _get_service_account_name + return _get_service_account_name(credentials) + + def test_bad_type(self): + from oauth2client.client import OAuth2Credentials + CREDENTIALS = OAuth2Credentials('bogus_token', 'bogus_id', + 'bogus_secret', 'bogus_refresh', + None, None, None) + self.assertRaises(ValueError, self._callFUT, CREDENTIALS) + + def test_p12_type(self): + from oauth2client.client import SignedJwtAssertionCredentials + SERVICE_ACCOUNT_NAME = 'SERVICE_ACCOUNT_NAME' + CREDENTIALS = SignedJwtAssertionCredentials(SERVICE_ACCOUNT_NAME, + b'bogus_key', []) + found = self._callFUT(CREDENTIALS) + self.assertEqual(found, SERVICE_ACCOUNT_NAME) + + def test_json_type(self): + from oauth2client import service_account + from gcloud._testing import _Monkey def _get_private_key(private_key_pkcs8_text): return private_key_pkcs8_text - ACCOUNT_NAME = 'dummy_service_account_email' + SERVICE_ACCOUNT_NAME = 'SERVICE_ACCOUNT_NAME' with _Monkey(service_account, _get_private_key=_get_private_key): - credentials = service_account._ServiceAccountCredentials( - 'dummy_service_account_id', ACCOUNT_NAME, - 'dummy_private_key_id', PRIVATE_TEXT, scopes) + CREDENTIALS = service_account._ServiceAccountCredentials( + 'bogus_id', SERVICE_ACCOUNT_NAME, 'bogus_id', + 'bogus_key_text', []) + + found = self._callFUT(CREDENTIALS) + self.assertEqual(found, SERVICE_ACCOUNT_NAME) + + def test_gae_type(self): + # Relies on setUp fixing up App Engine imports. + from oauth2client.appengine import AppAssertionCredentials + from gcloud._testing import _Monkey + from gcloud import credentials - self._run_test_with_credentials(credentials, ACCOUNT_NAME) + APP_IDENTITY = self.APP_IDENTITY + SERVICE_ACCOUNT_NAME = APP_IDENTITY.service_account_name + + CREDENTIALS = AppAssertionCredentials([]) + + with _Monkey(credentials, _GAECreds=AppAssertionCredentials, + app_identity=APP_IDENTITY): + found = self._callFUT(CREDENTIALS) + + self.assertEqual(found, SERVICE_ACCOUNT_NAME) + + +class Test__get_signed_query_params(unittest2.TestCase): + + def _callFUT(self, credentials, expiration, string_to_sign): + from gcloud.credentials import _get_signed_query_params + return _get_signed_query_params(credentials, expiration, + string_to_sign) + + def test_it(self): + import base64 + from gcloud._testing import _Monkey + from gcloud import credentials as MUT + + _called_get_sig = [] + SIG_BYTES = b'DEADBEEF' + + def mock_get_sig_bytes(creds, string_to_sign): + _called_get_sig.append((creds, string_to_sign)) + return SIG_BYTES + + _called_get_name = [] + ACCOUNT_NAME = object() + + def mock_get_name(creds): + _called_get_name.append((creds,)) + return ACCOUNT_NAME + + CREDENTIALS = object() + EXPIRATION = 100 + STRING_TO_SIGN = 'dummy_signature' + with _Monkey(MUT, _get_signature_bytes=mock_get_sig_bytes, + _get_service_account_name=mock_get_name): + result = self._callFUT(CREDENTIALS, EXPIRATION, + STRING_TO_SIGN) + + self.assertEqual(result, { + 'GoogleAccessId': ACCOUNT_NAME, + 'Expires': str(EXPIRATION), + 'Signature': base64.b64encode(b'DEADBEEF'), + }) + self.assertEqual(_called_get_sig, + [(CREDENTIALS, STRING_TO_SIGN)]) + self.assertEqual(_called_get_name, [(CREDENTIALS,)]) class Test__get_pem_key(unittest2.TestCase): @@ -452,8 +600,39 @@ def sign(self, signature_hash): class _SHA256(object): - _signature_string = None + _string_to_sign = None - def new(self, signature_string): - self._signature_string = signature_string + def new(self, string_to_sign): + self._string_to_sign = string_to_sign return self + + +class _AppIdentity(object): + + def __init__(self, service_account_name): + self._strings_signed = [] + self.service_account_name = service_account_name + + def get_service_account_name(self): + return self.service_account_name + + def sign_blob(self, string_to_sign): + self._strings_signed.append(string_to_sign) + throwaway = object() + return throwaway, string_to_sign + + +class _MockDB(object): + + Model = object + Property = object + StringProperty = object + _stored = [] + + @staticmethod + def non_transactional(*args, **kwargs): + _MockDB._stored.append((args, kwargs)) # To please lint. + + def do_nothing_wrapper(func): + return func + return do_nothing_wrapper