diff --git a/README.rst b/README.rst index bcc9c96..466507f 100644 --- a/README.rst +++ b/README.rst @@ -7,7 +7,7 @@ httpsig .. image:: https://travis-ci.org/ahknight/httpsig.svg?branch=develop :target: https://travis-ci.org/ahknight/httpsig -Sign HTTP requests with secure signatures according to the IETF HTTP Signatures specification (`Draft 8`_). This is a fork of the original module_ to fully support both RSA and HMAC schemes as well as unit test both schemes to prove they work. It's being used in production and is actively-developed. +Sign HTTP requests with secure signatures according to the IETF HTTP Signatures specification (`Draft 12`_). This is a fork of the original module_ to fully support both RSA and HMAC schemes as well as unit test both schemes to prove they work. It's being used in production and is actively-developed. See the original project_, original Python module_, original spec_, and `current IETF draft`_ for more details on the signing scheme. @@ -15,7 +15,7 @@ See the original project_, original Python module_, original spec_, and `current .. _module: https://github.com/zzsnzmn/py-http-signature .. _spec: https://github.com/joyent/node-http-signature/blob/master/http_signing.md .. _`current IETF draft`: https://datatracker.ietf.org/doc/draft-cavage-http-signatures/ -.. _`Draft 8`: http://tools.ietf.org/html/draft-cavage-http-signatures-08 +.. _`Draft 12`: http://tools.ietf.org/html/draft-cavage-http-signatures-12 Requirements ------------ @@ -49,7 +49,7 @@ For simple raw signing: secret = open('rsa_private.pem', 'rb').read() - sig_maker = httpsig.Signer(secret=secret, algorithm='rsa-sha256') + sig_maker = httpsig.Signer(secret=secret, algorithm='hs2019', sign_algorithm=httpsig.PSS()) sig_maker.sign('hello world!') For general use with web frameworks: @@ -59,9 +59,9 @@ For general use with web frameworks: import httpsig key_id = "Some Key ID" - secret = b'some big secret' + secret = open('rsa_private.pem', 'rb').read() - hs = httpsig.HeaderSigner(key_id, secret, algorithm="hmac-sha256", headers=['(request-target)', 'host', 'date']) + hs = httpsig.HeaderSigner(key_id, secret, algorithm="hs2019", sign_algorithm=httpsig.PSS(), headers=['(request-target)', 'host', 'date']) signed_headers_dict = hs.sign({"Date": "Tue, 01 Jan 2014 01:01:01 GMT", "Host": "example.com"}, method="GET", path="/api/1/object/1") For use with requests: @@ -74,9 +74,9 @@ For use with requests: secret = open('rsa_private.pem', 'rb').read() - auth = HTTPSignatureAuth(key_id='Test', secret=secret) + auth = HTTPSignatureAuth(key_id='Test', secret=secret, sign_algorithm=httpsig.PSS()) z = requests.get('https://api.example.com/path/to/endpoint', - auth=auth, headers={'X-Api-Version': '~6.5'}) + auth=auth, headers={'X-Api-Version': '~6.5', 'Date': 'Tue, 01 Jan 2014 01:01:01 GMT') Class initialization parameters ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -85,20 +85,22 @@ Note that keys and secrets should be bytes objects. At attempt will be made to .. code:: python - httpsig.Signer(secret, algorithm='rsa-sha256') + httpsig.Signer(secret, algorithm='hs2019', sign_algorithm=httpsig.PSS()) ``secret``, in the case of an RSA signature, is a string containing private RSA pem. In the case of HMAC, it is a secret password. -``algorithm`` is one of the six allowed signatures: ``rsa-sha1``, ``rsa-sha256``, ``rsa-sha512``, ``hmac-sha1``, ``hmac-sha256``, +``algorithm`` should be set to 'hs2019' the other six signatures are now deprecated: ``rsa-sha1``, ``rsa-sha256``, ``rsa-sha512``, ``hmac-sha1``, ``hmac-sha256``, ``hmac-sha512``. +``sign_algorithm`` The digital signature algorithm derived from ``keyId``. Currently supported algorithms: ``httpsig.PSS`` .. code:: python - httpsig.requests_auth.HTTPSignatureAuth(key_id, secret, algorithm='rsa-sha256', headers=None) + httpsig.requests_auth.HTTPSignatureAuth(key_id, secret, algorithm='hs2019', sign_algorithm=httpsig.PSS(), headers=None) -``key_id`` is the label by which the server system knows your RSA signature or password. +``key_id`` is the label by which the server system knows your secret. ``headers`` is the list of HTTP headers that are concatenated and used as signing objects. By default it is the specification's minimum, the ``Date`` HTTP header. ``secret`` and ``algorithm`` are as above. +``sign_algorithm`` The digital signature algorithm derived from ``keyId``. Currently supported algorithms: ``httpsig.PSS`` Tests ----- diff --git a/httpsig/__init__.py b/httpsig/__init__.py index 01cb860..b723018 100644 --- a/httpsig/__init__.py +++ b/httpsig/__init__.py @@ -2,6 +2,7 @@ from .sign import Signer, HeaderSigner from .verify import Verifier, HeaderVerifier +from .sign_algorithms import * try: __version__ = get_distribution(__name__).version diff --git a/httpsig/requests_auth.py b/httpsig/requests_auth.py index 8a00310..b504e9c 100644 --- a/httpsig/requests_auth.py +++ b/httpsig/requests_auth.py @@ -21,11 +21,11 @@ class HTTPSignatureAuth(requests.auth.AuthBase): headers is a list of http headers to be included in the signing string, defaulting to "Date" alone. """ - def __init__(self, key_id='', secret='', algorithm=None, headers=None): + def __init__(self, key_id='', secret='', algorithm=None, sign_algorithm=None, headers=None): headers = headers or [] self.header_signer = HeaderSigner( key_id=key_id, secret=secret, - algorithm=algorithm, headers=headers) + algorithm=algorithm, sign_algorithm=sign_algorithm, headers=headers) self.uses_host = 'host' in [h.lower() for h in headers] def __call__(self, r): diff --git a/httpsig/sign.py b/httpsig/sign.py index 94e2180..0b1ec07 100644 --- a/httpsig/sign.py +++ b/httpsig/sign.py @@ -1,14 +1,14 @@ +from __future__ import print_function import base64 import six from Crypto.Hash import HMAC from Crypto.PublicKey import RSA from Crypto.Signature import PKCS1_v1_5 - +from .sign_algorithms import SignAlgorithm from .utils import * - -DEFAULT_SIGN_ALGORITHM = "hmac-sha256" +DEFAULT_ALGORITHM = "hs2019" class Signer(object): @@ -18,17 +18,32 @@ class Signer(object): Password-protected keyfiles are not supported. """ - def __init__(self, secret, algorithm=None): + + def __init__(self, secret, algorithm=None, sign_algorithm=None): if algorithm is None: - algorithm = DEFAULT_SIGN_ALGORITHM + algorithm = DEFAULT_ALGORITHM assert algorithm in ALGORITHMS, "Unknown algorithm" + + if sign_algorithm is not None and not issubclass(type(sign_algorithm), SignAlgorithm): + raise HttpSigException("Unsupported digital signature algorithm") + + if algorithm != DEFAULT_ALGORITHM: + print("Algorithm: {} is deprecated please update to {}".format(algorithm, DEFAULT_ALGORITHM)) + if isinstance(secret, six.string_types): secret = secret.encode("ascii") self._rsa = None self._hash = None - self.sign_algorithm, self.hash_algorithm = algorithm.split('-') + self.algorithm = algorithm + self.secret = secret + + if "-" in algorithm: + self.sign_algorithm, self.hash_algorithm = algorithm.split('-') + elif algorithm == "hs2019": + assert sign_algorithm is not None, "Required digital signature algorithm not specified" + self.sign_algorithm = sign_algorithm if self.sign_algorithm == 'rsa': try: @@ -42,10 +57,6 @@ def __init__(self, secret, algorithm=None): self._hash = HMAC.new(secret, digestmod=HASHES[self.hash_algorithm]) - @property - def algorithm(self): - return '%s-%s' % (self.sign_algorithm, self.hash_algorithm) - def _sign_rsa(self, data): if isinstance(data, six.string_types): data = data.encode("ascii") @@ -68,6 +79,8 @@ def sign(self, data): signed = self._sign_rsa(data) elif self._hash: signed = self._sign_hmac(data) + elif issubclass(type(self.sign_algorithm), SignAlgorithm): + signed = self.sign_algorithm.sign(self.secret, data) if not signed: raise SystemError('No valid encryptor found.') return base64.b64encode(signed).decode("ascii") @@ -83,20 +96,22 @@ class HeaderSigner(Signer): to use :arg secret: a PEM-encoded RSA private key or an HMAC secret (must match the algorithm) - :arg algorithm: one of the six specified algorithms - :arg headers: a list of http headers to be included in the signing + :param algorithm: one of the seven specified algorithms + :param sign_algorithm: required for 'hs2019' algorithm. Sign algorithm for the secret + :param headers: a list of http headers to be included in the signing string, defaulting to ['date']. - :arg sign_header: header used to include signature, defaulting to + :param sign_header: header used to include signature, defaulting to 'authorization'. """ - def __init__(self, key_id, secret, algorithm=None, headers=None, sign_header='authorization'): + + def __init__(self, key_id, secret, algorithm=None, sign_algorithm=None, headers=None, sign_header='authorization'): if algorithm is None: - algorithm = DEFAULT_SIGN_ALGORITHM + algorithm = DEFAULT_ALGORITHM - super(HeaderSigner, self).__init__(secret=secret, algorithm=algorithm) + super(HeaderSigner, self).__init__(secret=secret, algorithm=algorithm, sign_algorithm=sign_algorithm) self.headers = headers or ['date'] self.signature_template = build_signature_template( - key_id, algorithm, headers, sign_header) + key_id, algorithm, headers, sign_header) self.sign_header = sign_header def sign(self, headers, host=None, method=None, path=None): @@ -112,7 +127,7 @@ def sign(self, headers, host=None, method=None, path=None): headers = CaseInsensitiveDict(headers) required_headers = self.headers or ['date'] signable = generate_message( - required_headers, headers, host, method, path) + required_headers, headers, host, method, path) signature = super(HeaderSigner, self).sign(signable) headers[self.sign_header] = self.signature_template % signature diff --git a/httpsig/sign_algorithms.py b/httpsig/sign_algorithms.py new file mode 100644 index 0000000..e4d2656 --- /dev/null +++ b/httpsig/sign_algorithms.py @@ -0,0 +1,61 @@ +import base64 + +import six +from Crypto.PublicKey import RSA +from Crypto.Signature import PKCS1_PSS +from httpsig.utils import HttpSigException, HASHES +from abc import ABCMeta, abstractmethod + +DEFAULT_HASH_ALGORITHM = "sha512" + + +class SignAlgorithm(object): + __metaclass__ = ABCMeta + + @abstractmethod + def sign(self, private, data): + raise NotImplementedError() + + @abstractmethod + def verify(self, public, data, signature): + raise NotImplementedError() + + +class PSS(SignAlgorithm): + + def __init__(self, hash_algorithm=DEFAULT_HASH_ALGORITHM, salt_length=None, mgfunc=None): + if hash_algorithm not in HASHES: + raise HttpSigException("Unsupported hash algorithm") + + if hash_algorithm != DEFAULT_HASH_ALGORITHM: + raise HttpSigException( + "Hash algorithm: {} is deprecated. Please use: {}".format(hash_algorithm, DEFAULT_HASH_ALGORITHM)) + + self.hash_algorithm = HASHES[hash_algorithm] + self.salt_length = salt_length + self.mgfunc = mgfunc + + def _create_pss(self, key): + try: + rsa_key = RSA.importKey(key) + pss = PKCS1_PSS.new(rsa_key, saltLen=self.salt_length, mgfunc=self.mgfunc) + except ValueError: + raise HttpSigException("Invalid key.") + return pss + + def sign(self, private_key, data): + pss = self._create_pss(private_key) + + if isinstance(data, six.string_types): + data = data.encode("ascii") + + h = self.hash_algorithm.new() + h.update(data) + return pss.sign(h) + + def verify(self, public_key, data, signature): + pss = self._create_pss(public_key) + + h = self.hash_algorithm.new() + h.update(data) + return pss.verify(h, base64.b64decode(signature)) diff --git a/httpsig/tests/rsa_private.pem b/httpsig/tests/rsa_private_1024.pem similarity index 100% rename from httpsig/tests/rsa_private.pem rename to httpsig/tests/rsa_private_1024.pem diff --git a/httpsig/tests/rsa_private_2048.pem b/httpsig/tests/rsa_private_2048.pem new file mode 100644 index 0000000..153a8ec --- /dev/null +++ b/httpsig/tests/rsa_private_2048.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQB7eXXK+gSpDXsvZkcXd19X85iemJd0KywRH+/W+1J1j8pd+O1l +H2He8GLaDFCwFijTvTmptfMYB2XyvG8/tPpaSzaIbSBlKXWxSo1fdUMf2e7SbqVr +Fi5DolPrIfRpVqw4iqnTZZ46Y2vfa57Ee3NRF5zoagMS9BM7nfuCKvzZcUK81V75 +hup5kpMHW1ofBZAPwQMm8CoXD1bpM+acN1N+63vgTY2QyUq2yJOI3HJvyFZTw+Sj +/ialYtDvDTluBH98i4504OIA6z0SCijF11irvAOSPc0GVXB8HjtUlqbD0BD6Hyqg +MeXgi9nGJhJDnJDiCVlPwg6Ni+h3nW/sXXopAgMBAAECggEANkOg8v2CAtG7647l +e3io3DxgPIMPPKykhzoj67Uz/hqdc0MtAZ4TIyk+KFn1NA3pD3U/3EfseAj4Uv9h +XPwqcnhPlRFwhUT9RldfXi5ou5zJio26ASAUYQD8JIAdrBW9RnQaQp+MNFjxVZU0 +h2FBwse/25yLkU7XDQJXQFOoH988Dpozz1y8q11NxurakR67+xtqO5KG7FZdwCsN +W2Z7gTm7T59NYdHevFi2b91hdBdLWCn9RPduEvRViQY5KzzkT6cg493G3vCPXxCy +9C9aCNF7PXghy/im7dLz+H28xYls3KPOJve2dmvox2+aPH66TgXkfj/kfULJmHZq +el3dIQKBgQDAxiqPcEF1Fq4UOoipCvcpiyz0gdFFw1x58km9GOpDdDK1bqcFc2z/ +GEoauWVl/PZZJdmht1zzkg4R3Izpbsg1IFxd3m7KbcfOK2bA9h2QPmjW8OwSu4/h +/l8mDsNF5crOdBnUHacgHhL1SJx323Yu3z9PmiN9wLW1gyYkh82SzQKBgQCj+LWP +1DZdsHOs224CjGjfj02PsaV5RNgD7Qqk5VcQFHzmJTAqoroPzJNjUD1sUnXXJHI0 +JL533giIsxQxnyca1qtxaO6KA4baykQtKKQqKTWhE2oowS1howHRbLShq1Hxvw9S +QSS0ZAo5DyjZLMkVnlB+v7sXJR8X0Ru8qHKczQKBgQCBMEy1c/VqEpj21YNgRgj9 +vleSRK2KozIGR2lDYL8eFXEmRdGIxaH2EsEWx8g8YRp3A/aleczBLtBfB/8nMSba +86TzA24cGxYcBNoH1uhZEnoQEcUjiK8UNPRu/NXAsg8H7KaikHy/+WebGd5CNMEv +CE3VeubuD4e27P1S3e/WwQKBgDzgGjASvjhcSSXUtWv2yvyszEPb1S5Hk9cpSvlb +N859fL1I8y/xCBjTf6iwYo1zs9Iy8r9PIPOJmCuAKLAfgToilrXdGipdEtTpoRQO +8ZvBfuqVNaV5yqpkBUnGDO20mBCjOUH1c3YRagYzDZxLV0BSbVoRPpliK8AA30ZU +V3DFAoGAfaPc8p6o7tCaPMpRxynIAvgIqg4sIBJdX/G4Q+SZeZR/mFlfpuhY4kzh +CL+RKAhOyOaYsSxlk4vB954y4UZFl6/t2W6gNxouelA77TgV2/rjx/fLk06J+RIF +QQkiAXwUZ2xpmdnUk+UREBwrB3LoU9kZM6fKX/LB4QEZuOmbERQ= +-----END RSA PRIVATE KEY----- \ No newline at end of file diff --git a/httpsig/tests/rsa_public.pem b/httpsig/tests/rsa_public_1024.pem similarity index 90% rename from httpsig/tests/rsa_public.pem rename to httpsig/tests/rsa_public_1024.pem index b3bbf6c..80135fd 100644 --- a/httpsig/tests/rsa_public.pem +++ b/httpsig/tests/rsa_public_1024.pem @@ -3,4 +3,4 @@ MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDCFENGw33yGihy92pDjZQhl0C3 6rPJj+CvfSC8+q28hxA161QFNUd13wuCTUcq0Qd2qsBe/2hFyc2DCJJg0h1L78+6 Z4UMR7EOcpfdUE9Hf3m/hs+FUR45uBJeDK1HSFHD8bHKD6kv8FPGfJTotc+2xjJw oYi+1hqp1fIekaxsyQIDAQAB ------END PUBLIC KEY----- +-----END PUBLIC KEY----- \ No newline at end of file diff --git a/httpsig/tests/rsa_public_2048.pem b/httpsig/tests/rsa_public_2048.pem new file mode 100644 index 0000000..4675c47 --- /dev/null +++ b/httpsig/tests/rsa_public_2048.pem @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBITANBgkqhkiG9w0BAQEFAAOCAQ4AMIIBCQKCAQB7eXXK+gSpDXsvZkcXd19X +85iemJd0KywRH+/W+1J1j8pd+O1lH2He8GLaDFCwFijTvTmptfMYB2XyvG8/tPpa +SzaIbSBlKXWxSo1fdUMf2e7SbqVrFi5DolPrIfRpVqw4iqnTZZ46Y2vfa57Ee3NR +F5zoagMS9BM7nfuCKvzZcUK81V75hup5kpMHW1ofBZAPwQMm8CoXD1bpM+acN1N+ +63vgTY2QyUq2yJOI3HJvyFZTw+Sj/ialYtDvDTluBH98i4504OIA6z0SCijF11ir +vAOSPc0GVXB8HjtUlqbD0BD6HyqgMeXgi9nGJhJDnJDiCVlPwg6Ni+h3nW/sXXop +AgMBAAE= +-----END PUBLIC KEY----- \ No newline at end of file diff --git a/httpsig/tests/test_signature.py b/httpsig/tests/test_signature.py index b8b4c90..ffc4369 100755 --- a/httpsig/tests/test_signature.py +++ b/httpsig/tests/test_signature.py @@ -4,13 +4,15 @@ import unittest -import httpsig.sign as sign -from httpsig.utils import parse_authorization_header +import pytest +import httpsig.sign as sign +from httpsig.sign_algorithms import PSS +from httpsig.utils import parse_authorization_header, HttpSigException sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) -sign.DEFAULT_SIGN_ALGORITHM = "rsa-sha256" +sign.DEFAULT_ALGORITHM = "hs2019" class TestSign(unittest.TestCase): @@ -23,13 +25,18 @@ class TestSign(unittest.TestCase): header_content_length = '18' def setUp(self): - self.key_path = os.path.join( - os.path.dirname(__file__), 'rsa_private.pem') - with open(self.key_path, 'rb') as f: - self.key = f.read() + self.key_path_2048 = os.path.join( + os.path.dirname(__file__), 'rsa_private_2048.pem') + with open(self.key_path_2048, 'rb') as f: + self.key_2048 = f.read() + + self.key_path_1024 = os.path.join( + os.path.dirname(__file__), 'rsa_private_1024.pem') + with open(self.key_path_1024, 'rb') as f: + self.key_1024 = f.read() def test_default(self): - hs = sign.HeaderSigner(key_id='Test', secret=self.key) + hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS(hash_algorithm="sha512", salt_length=0)) unsigned = { 'Date': self.header_date } @@ -43,11 +50,11 @@ def test_default(self): self.assertIn('algorithm', params) self.assertIn('signature', params) self.assertEqual(params['keyId'], 'Test') - self.assertEqual(params['algorithm'], 'rsa-sha256') - self.assertEqual(params['signature'], 'jKyvPcxB4JbmYY4mByyBY7cZfNl4OW9HpFQlG7N4YcJPteKTu4MWCLyk+gIr0wDgqtLWf9NLpMAMimdfsH7FSWGfbMFSrsVTHNTk0rK3usrfFnti1dxsM4jl0kYJCKTGI/UWkqiaxwNiKqGcdlEDrTcUhhsFsOIo8VhddmZTZ8w=') # noqa: E501 + self.assertEqual(params['algorithm'], 'hs2019') + self.assertEqual(params['signature'], 'T8+Cj3Zp2cBDm2r8/loPgfHUSSFXXyZJNxxbNx1NvKVz/r5T4z6pVxhl9rqk8WfYHMdlh2aT5hCrYKvhs88Jy0DDmeUP4nELWRsO1BF0oAqHfcrbEikZQL7jA6z0guVaLr0S5QRGmd1K5HUEkP/vYEOns+FRL+JrFG4dNJNESvG5iyKUoaXfoZCFdqtzLlIteEAL7dW/kaX/dE116wfpbem1eCABuGopRhuFtjqLKVjuUVwyP/zSYTqd9j+gDhinkAifTJPxbGMh0b5LZdNCqw5irT9NkTcTFRXDp8ioX8r805Z9QhjT7H+rSo350U2LsAFoQ9ttryPBOoMPCiQTlw==') # noqa: E501 def test_basic(self): - hs = sign.HeaderSigner(key_id='Test', secret=self.key, headers=[ + hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS(salt_length=0), headers=[ '(request-target)', 'host', 'date', @@ -68,13 +75,13 @@ def test_basic(self): self.assertIn('algorithm', params) self.assertIn('signature', params) self.assertEqual(params['keyId'], 'Test') - self.assertEqual(params['algorithm'], 'rsa-sha256') + self.assertEqual(params['algorithm'], 'hs2019') self.assertEqual( params['headers'], '(request-target) host date') - self.assertEqual(params['signature'], 'HUxc9BS3P/kPhSmJo+0pQ4IsCo007vkv6bUm4Qehrx+B1Eo4Mq5/6KylET72ZpMUS80XvjlOPjKzxfeTQj4DiKbAzwJAb4HX3qX6obQTa00/qPDXlMepD2JtTw33yNnm/0xV7fQuvILN/ys+378Ysi082+4xBQFwvhNvSoVsGv4=') # noqa: E501 + self.assertEqual(params['signature'], 'KkF4oeOJJH9TaYjQdaU634G7AVmM5Bf3fnfJCBZ7G0H5puW5XlQTpduA+TgouKOJhbv4aRRpunPzCHUxUjEvrR3TSALqW1EOsBwCVIusE9CnrhL7vUOvciIDai/jI15RsfR9+XyTmOSFbsI07E8mmywr3nLeWX6AAFDMO2vWc21zZxrSc13vFfAkVvFhXLxO4g0bBm6Z4m5/9ytWtdE0Gf3St2kY8aZTedllRCS8cMx8GVAIw/qYGeIlGKUCZKxrFxnviN7gfxixwova6lcxpppIo+WXxEiwMJfSQBlx0WGn3A3twCv6TsIxPOVUEW4jcogDh+jGFf1aGdVyHquTRQ==') # noqa: E501 def test_all(self): - hs = sign.HeaderSigner(key_id='Test', secret=self.key, headers=[ + hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS("sha512", salt_length=0), headers=[ '(request-target)', 'host', 'date', @@ -101,8 +108,36 @@ def test_all(self): self.assertIn('algorithm', params) self.assertIn('signature', params) self.assertEqual(params['keyId'], 'Test') - self.assertEqual(params['algorithm'], 'rsa-sha256') + self.assertEqual(params['algorithm'], 'hs2019') self.assertEqual( params['headers'], '(request-target) host date content-type digest content-length') - self.assertEqual(params['signature'], 'Ef7MlxLXoBovhil3AlyjtBwAL9g4TN3tibLj7uuNB3CROat/9KaeQ4hW2NiJ+pZ6HQEOx9vYZAyi+7cmIkmJszJCut5kQLAwuX+Ms/mUFvpKlSo9StS2bMXDBNjOh4Auj774GFj4gwjS+3NhFeoqyr/MuN6HsEnkvn6zdgfE2i0=') # noqa: E501 + self.assertEqual(params['signature'], 'bxWyLDB/Tuhzxd/tWG2g60l3Goyk9XJZzj2ouNKizZuZoe1Ngj+19N11bhK7FABHJ7lSzH5g6fp5LkN894ivIv6N29L2sPssuAkqgzNXyvYkp4KWOr5j7sVpApmRH7gf7THljcXosmrYk5gdBTspixpJJJ5LGkkPKCRAFurmi/LqopSH6cJbLJNIccTu2dTMGEeDOqqNterVmfonpZyPeBsEEwoeOo6d8zgHzB/1Xxk7dfELFbA1c0LE5kZbwEIEFPmS01YFz6EJW7Aj8kzvzwQRyvgDobi25niGOy/D7JVHvtDjBIaJedFuFJSb8rZ2DGryBQ6NwchMp3f2MUoTGg==') # noqa: E501 + + def test_default_deprecated_256(self): + hs = sign.HeaderSigner(key_id='Test', secret=self.key_1024, algorithm="rsa-sha256") + unsigned = { + 'Date': self.header_date + } + signed = hs.sign(unsigned) + self.assertIn('Date', signed) + self.assertEqual(unsigned['Date'], signed['Date']) + self.assertIn('Authorization', signed) + auth = parse_authorization_header(signed['authorization']) + params = auth[1] + self.assertIn('keyId', params) + self.assertIn('algorithm', params) + self.assertIn('signature', params) + self.assertEqual(params['keyId'], 'Test') + self.assertEqual(params['algorithm'], 'rsa-sha256') + self.assertEqual(params['signature'], 'jKyvPcxB4JbmYY4mByyBY7cZfNl4OW9HpFQlG7N4YcJPteKTu4MWCLyk+gIr0wDgqtLWf9NLpMAMimdfsH7FSWGfbMFSrsVTHNTk0rK3usrfFnti1dxsM4jl0kYJCKTGI/UWkqiaxwNiKqGcdlEDrTcUhhsFsOIo8VhddmZTZ8w=') # noqa: E501 + + def test_unsupported_hash_algorithm(self): + with pytest.raises(HttpSigException) as e: + sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS("sha123", salt_length=0)) + self.assertEqual(str(e.value), "Unsupported hash algorithm") + + def test_deprecated_hash_algorithm(self): + with pytest.raises(HttpSigException) as e: + sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS("sha256", salt_length=0)) + self.assertEqual(str(e.value), "Hash algorithm: sha256 is deprecated. Please use: sha512") \ No newline at end of file diff --git a/httpsig/tests/test_utils.py b/httpsig/tests/test_utils.py index aa53acd..9263659 100755 --- a/httpsig/tests/test_utils.py +++ b/httpsig/tests/test_utils.py @@ -11,7 +11,7 @@ class TestUtils(unittest.TestCase): def test_get_fingerprint(self): with open(os.path.join( - os.path.dirname(__file__), 'rsa_public.pem'), 'r') as k: + os.path.dirname(__file__), 'rsa_public_1024.pem'), 'r') as k: key = k.read() fingerprint = get_fingerprint(key) self.assertEqual( diff --git a/httpsig/tests/test_verify.py b/httpsig/tests/test_verify.py index 6e6d9eb..d5f9785 100755 --- a/httpsig/tests/test_verify.py +++ b/httpsig/tests/test_verify.py @@ -4,9 +4,9 @@ import unittest from httpsig.sign import HeaderSigner, Signer +from httpsig.sign_algorithms import PSS from httpsig.verify import HeaderVerifier, Verifier - sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) @@ -43,11 +43,12 @@ def setUp(self): self.algorithm = "hmac-sha1" self.sign_secret = secret self.verify_secret = secret + self.sign_algorithm = None def test_basic_sign(self): - signer = Signer(secret=self.sign_secret, algorithm=self.algorithm) + signer = Signer(secret=self.sign_secret, algorithm=self.algorithm, sign_algorithm=self.sign_algorithm) verifier = Verifier( - secret=self.verify_secret, algorithm=self.algorithm) + secret=self.verify_secret, algorithm=self.algorithm, sign_algorithm=self.sign_algorithm) GOOD = b"this is a test" BAD = b"this is not the signature you were looking for..." @@ -64,10 +65,10 @@ def test_default(self): hs = HeaderSigner( key_id="Test", secret=self.sign_secret, algorithm=self.algorithm, - sign_header=self.sign_header) + sign_header=self.sign_header, sign_algorithm=self.sign_algorithm) signed = hs.sign(unsigned) hv = HeaderVerifier( - headers=signed, secret=self.verify_secret, sign_header=self.sign_header) + headers=signed, secret=self.verify_secret, sign_header=self.sign_header, sign_algorithm=self.sign_algorithm) self.assertTrue(hv.verify()) def test_signed_headers(self): @@ -75,18 +76,19 @@ def test_signed_headers(self): METHOD = self.test_method PATH = self.test_path hs = HeaderSigner( - key_id="Test", - secret=self.sign_secret, - algorithm=self.algorithm, - sign_header=self.sign_header, - headers=[ - '(request-target)', - 'host', - 'date', - 'content-type', - 'digest', - 'content-length' - ]) + key_id="Test", + secret=self.sign_secret, + algorithm=self.algorithm, + sign_header=self.sign_header, + headers=[ + '(request-target)', + 'host', + 'date', + 'content-type', + 'digest', + 'content-length' + ], + sign_algorithm=self.sign_algorithm) unsigned = { 'Host': HOST, 'Date': self.header_date, @@ -97,9 +99,9 @@ def test_signed_headers(self): signed = hs.sign(unsigned, method=METHOD, path=PATH) hv = HeaderVerifier( - headers=signed, secret=self.verify_secret, - host=HOST, method=METHOD, path=PATH, - sign_header=self.sign_header) + headers=signed, secret=self.verify_secret, + host=HOST, method=METHOD, path=PATH, + sign_header=self.sign_header, sign_algorithm=self.sign_algorithm) self.assertTrue(hv.verify()) def test_incorrect_headers(self): @@ -116,7 +118,8 @@ def test_incorrect_headers(self): 'date', 'content-type', 'digest', - 'content-length']) + 'content-length'], + sign_algorithm=self.sign_algorithm) unsigned = { 'Host': HOST, 'Date': self.header_date, @@ -129,7 +132,7 @@ def test_incorrect_headers(self): hv = HeaderVerifier(headers=signed, secret=self.verify_secret, required_headers=["some-other-header"], host=HOST, method=METHOD, path=PATH, - sign_header=self.sign_header) + sign_header=self.sign_header, sign_algorithm=self.sign_algorithm) with self.assertRaises(Exception): hv.verify() @@ -138,17 +141,18 @@ def test_extra_auth_headers(self): METHOD = "POST" PATH = '/foo?param=value&pet=dog' hs = HeaderSigner( - key_id="Test", - secret=self.sign_secret, - sign_header=self.sign_header, - algorithm=self.algorithm, headers=[ - '(request-target)', - 'host', - 'date', - 'content-type', - 'digest', - 'content-length' - ]) + key_id="Test", + secret=self.sign_secret, + sign_header=self.sign_header, + algorithm=self.algorithm, headers=[ + '(request-target)', + 'host', + 'date', + 'content-type', + 'digest', + 'content-length' + ], + sign_algorithm=self.sign_algorithm) unsigned = { 'Host': HOST, 'Date': self.header_date, @@ -158,12 +162,13 @@ def test_extra_auth_headers(self): } signed = hs.sign(unsigned, method=METHOD, path=PATH) hv = HeaderVerifier( - headers=signed, - secret=self.verify_secret, - method=METHOD, - path=PATH, - sign_header=self.sign_header, - required_headers=['date', '(request-target)']) + headers=signed, + secret=self.verify_secret, + method=METHOD, + path=PATH, + sign_header=self.sign_header, + required_headers=['date', '(request-target)'], + sign_algorithm=self.sign_algorithm) self.assertTrue(hv.verify()) @@ -185,14 +190,14 @@ class TestVerifyRSASHA1(TestVerifyHMACSHA1): def setUp(self): private_key_path = os.path.join( - os.path.dirname(__file__), - 'rsa_private.pem') + os.path.dirname(__file__), + 'rsa_private_1024.pem') with open(private_key_path, 'rb') as f: private_key = f.read() public_key_path = os.path.join( - os.path.dirname(__file__), - 'rsa_public.pem') + os.path.dirname(__file__), + 'rsa_public_1024.pem') with open(public_key_path, 'rb') as f: public_key = f.read() @@ -200,6 +205,7 @@ def setUp(self): self.algorithm = "rsa-sha1" self.sign_secret = private_key self.verify_secret = public_key + self.sign_algorithm = None class TestVerifyRSASHA256(TestVerifyRSASHA1): @@ -218,3 +224,21 @@ def setUp(self): class TestVerifyRSASHA512ChangeHeader(TestVerifyRSASHA1): sign_header = 'Signature' + + +class TestVerifyHS2019PSS(TestVerifyHMACSHA1): + + def setUp(self): + private_key_path = os.path.join(os.path.dirname(__file__), 'rsa_private_2048.pem') + with open(private_key_path, 'rb') as f: + private_key = f.read() + + public_key_path = os.path.join(os.path.dirname(__file__), 'rsa_public_2048.pem') + with open(public_key_path, 'rb') as f: + public_key = f.read() + + self.keyId = "Test" + self.algorithm = "hs2019" + self.sign_secret = private_key + self.verify_secret = public_key + self.sign_algorithm = PSS(salt_length=0) diff --git a/httpsig/utils.py b/httpsig/utils.py index 5f80ef0..9cef5e5 100644 --- a/httpsig/utils.py +++ b/httpsig/utils.py @@ -19,7 +19,8 @@ 'rsa-sha512', 'hmac-sha1', 'hmac-sha256', - 'hmac-sha512']) + 'hmac-sha512', + 'hs2019']) HASHES = {'sha1': SHA, 'sha256': SHA256, 'sha512': SHA512} diff --git a/httpsig/verify.py b/httpsig/verify.py index 17e313d..055a74a 100644 --- a/httpsig/verify.py +++ b/httpsig/verify.py @@ -4,7 +4,8 @@ import base64 import six -from .sign import Signer +from .sign import Signer, DEFAULT_ALGORITHM +from .sign_algorithms import SignAlgorithm from .utils import * @@ -37,6 +38,9 @@ def _verify(self, data, signature): s = base64.b64decode(signature) return ct_bytes_compare(h, s) + elif issubclass(type(self.sign_algorithm), SignAlgorithm): + return self.sign_algorithm.verify(self.secret, data, signature) + else: raise HttpSigException("Unsupported algorithm.") @@ -47,7 +51,7 @@ class HeaderVerifier(Verifier): """ def __init__(self, headers, secret, required_headers=None, method=None, - path=None, host=None, sign_header='authorization'): + path=None, host=None, sign_header='authorization', sign_algorithm=None): """ Instantiate a HeaderVerifier object. @@ -66,6 +70,8 @@ def __init__(self, headers, secret, required_headers=None, method=None, header, if not supplied in :param:headers. :param sign_header: Optional. The header where the signature is. Default is 'authorization'. + :param sign_algorithm: Required for 'hs2019' algorithm, specifies the + digital signature algorithm (derived from keyId) to use. """ required_headers = required_headers or ['date'] self.headers = CaseInsensitiveDict(headers) @@ -84,8 +90,13 @@ def __init__(self, headers, secret, required_headers=None, method=None, self.path = path self.host = host + if self.auth_dict['algorithm'] != DEFAULT_ALGORITHM: + print("Algorithm: {} is deprecated please update to {}".format(self.auth_dict['algorithm'], DEFAULT_ALGORITHM)) + elif self.auth_dict['algorithm'] == DEFAULT_ALGORITHM and sign_algorithm is None: + raise HttpSigException("Required sign algorithm for {} algorithm not set".format(DEFAULT_ALGORITHM)) + super(HeaderVerifier, self).__init__( - secret, algorithm=self.auth_dict['algorithm']) + secret, algorithm=self.auth_dict['algorithm'], sign_algorithm=sign_algorithm) def verify(self): """