diff --git a/httpsig/sign.py b/httpsig/sign.py index eb62e60..1a0a3fe 100644 --- a/httpsig/sign.py +++ b/httpsig/sign.py @@ -1,16 +1,14 @@ +from __future__ import print_function import base64 import six from Crypto.Hash import HMAC from Crypto.PublicKey import RSA from Crypto.Signature import PKCS1_v1_5 -from Crypto.Signature import PKCS1_PSS - +from .sign_algorithms import SIGN_ALGORITHMS from .utils import * - DEFAULT_SIGN_ALGORITHM = "hs2019" -DEFAULT_SALT_LENGTH = None class Signer(object): @@ -20,14 +18,15 @@ class Signer(object): Password-protected keyfiles are not supported. """ - def __init__(self, secret, algorithm=None, sign_algorithm=None, salt_length=None): + + def __init__(self, secret, algorithm=None, sign_algorithm=None): if algorithm is None: algorithm = DEFAULT_SIGN_ALGORITHM - if salt_length is None: - salt_length = DEFAULT_SALT_LENGTH assert algorithm in ALGORITHMS, "Unknown algorithm" - assert sign_algorithm is None or sign_algorithm in SIGN_ALGORITHMS, "Unsupported digital signature algorithm" + + if sign_algorithm is not None and sign_algorithm.__class__.__name__ not in SIGN_ALGORITHMS: + raise HttpSigException("Unsupported digital signature algorithm") if algorithm != DEFAULT_SIGN_ALGORITHM: print("Algorithm: {} is deprecated please update to {}".format(algorithm, DEFAULT_SIGN_ALGORITHM)) @@ -38,13 +37,13 @@ def __init__(self, secret, algorithm=None, sign_algorithm=None, salt_length=None self._rsa = None self._hash = None self.algorithm = algorithm + self.secret = secret if "-" in algorithm: self.sign_algorithm, self.hash_algorithm = algorithm.split('-') elif algorithm == "hs2019": assert sign_algorithm is not None, "Required digital signature algorithm not specified" self.sign_algorithm = sign_algorithm - self.hash_algorithm = "sha512" if self.sign_algorithm == 'rsa': try: @@ -58,14 +57,6 @@ def __init__(self, secret, algorithm=None, sign_algorithm=None, salt_length=None self._hash = HMAC.new(secret, digestmod=HASHES[self.hash_algorithm]) - elif self.sign_algorithm == "PSS": - try: - rsa_key = RSA.importKey(secret) - self._rsa = PKCS1_PSS.new(rsa_key, saltLen=salt_length) - self._hash = HASHES[self.hash_algorithm] - except ValueError: - raise HttpSigException("Invalid key.") - def _sign_rsa(self, data): if isinstance(data, six.string_types): data = data.encode("ascii") @@ -88,6 +79,8 @@ def sign(self, data): signed = self._sign_rsa(data) elif self._hash: signed = self._sign_hmac(data) + elif self.sign_algorithm.__class__.__name__ in SIGN_ALGORITHMS: + signed = self.sign_algorithm.sign(self.secret, data) if not signed: raise SystemError('No valid encryptor found.') return base64.b64encode(signed).decode("ascii") @@ -111,14 +104,15 @@ class HeaderSigner(Signer): :param sign_header: header used to include signature, defaulting to 'authorization'. """ - def __init__(self, key_id, secret, algorithm=None, sign_algorithm=None, salt_length=None, headers=None, sign_header='authorization'): + + def __init__(self, key_id, secret, algorithm=None, sign_algorithm=None, headers=None, sign_header='authorization'): if algorithm is None: algorithm = DEFAULT_SIGN_ALGORITHM - super(HeaderSigner, self).__init__(secret=secret, algorithm=algorithm, sign_algorithm=sign_algorithm, salt_length=salt_length) + super(HeaderSigner, self).__init__(secret=secret, algorithm=algorithm, sign_algorithm=sign_algorithm) self.headers = headers or ['date'] self.signature_template = build_signature_template( - key_id, algorithm, headers, sign_header) + key_id, algorithm, headers, sign_header) self.sign_header = sign_header def sign(self, headers, host=None, method=None, path=None): @@ -134,7 +128,7 @@ def sign(self, headers, host=None, method=None, path=None): headers = CaseInsensitiveDict(headers) required_headers = self.headers or ['date'] signable = generate_message( - required_headers, headers, host, method, path) + required_headers, headers, host, method, path) signature = super(HeaderSigner, self).sign(signable) headers[self.sign_header] = self.signature_template % signature diff --git a/httpsig/sign_algorithms.py b/httpsig/sign_algorithms.py new file mode 100644 index 0000000..8018175 --- /dev/null +++ b/httpsig/sign_algorithms.py @@ -0,0 +1,53 @@ +import base64 + +import six +from Crypto.PublicKey import RSA +from Crypto.Signature import PKCS1_PSS +from httpsig.utils import HttpSigException, HASHES + +DEFAULT_HASH_ALGORITHM = "sha512" + + +class PSS(object): + + def __init__(self, hash_algorithm=DEFAULT_HASH_ALGORITHM, salt_length=None, mgfunc=None): + if hash_algorithm not in HASHES: + raise HttpSigException("Unsupported hash algorithm") + + if hash_algorithm != DEFAULT_HASH_ALGORITHM: + raise HttpSigException( + "Hash algorithm: {} is deprecated. Please use: {}".format(hash_algorithm, DEFAULT_HASH_ALGORITHM)) + + self.hash_algorithm = HASHES[hash_algorithm] + self.salt_length = salt_length + self.mgfunc = mgfunc + + def _create_pss(self, key): + try: + rsa_key = RSA.importKey(key) + pss = PKCS1_PSS.new(rsa_key, saltLen=self.salt_length, mgfunc=self.mgfunc) + except ValueError: + raise HttpSigException("Invalid key.") + return pss + + def sign(self, private_key, data): + pss = self._create_pss(private_key) + + if isinstance(data, six.string_types): + data = data.encode("ascii") + + h = self.hash_algorithm.new() + h.update(data) + return pss.sign(h) + + def verify(self, public_key, data, signature): + pss = self._create_pss(public_key) + + h = self.hash_algorithm.new() + h.update(data) + return pss.verify(h, base64.b64decode(signature)) + + +SIGN_ALGORITHMS = frozenset([ + "PSS" +]) diff --git a/httpsig/tests/test_signature.py b/httpsig/tests/test_signature.py index e03cb31..00a7fed 100755 --- a/httpsig/tests/test_signature.py +++ b/httpsig/tests/test_signature.py @@ -4,9 +4,11 @@ import unittest -import httpsig.sign as sign -from httpsig.utils import parse_authorization_header +import pytest +import httpsig.sign as sign +from httpsig.sign_algorithms import PSS +from httpsig.utils import parse_authorization_header, HttpSigException sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) @@ -34,7 +36,7 @@ def setUp(self): self.key_1024 = f.read() def test_default(self): - hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm="PSS", salt_length=0) + hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS(hash_algorithm="sha512", salt_length=0)) unsigned = { 'Date': self.header_date } @@ -52,7 +54,7 @@ def test_default(self): self.assertEqual(params['signature'], 'T8+Cj3Zp2cBDm2r8/loPgfHUSSFXXyZJNxxbNx1NvKVz/r5T4z6pVxhl9rqk8WfYHMdlh2aT5hCrYKvhs88Jy0DDmeUP4nELWRsO1BF0oAqHfcrbEikZQL7jA6z0guVaLr0S5QRGmd1K5HUEkP/vYEOns+FRL+JrFG4dNJNESvG5iyKUoaXfoZCFdqtzLlIteEAL7dW/kaX/dE116wfpbem1eCABuGopRhuFtjqLKVjuUVwyP/zSYTqd9j+gDhinkAifTJPxbGMh0b5LZdNCqw5irT9NkTcTFRXDp8ioX8r805Z9QhjT7H+rSo350U2LsAFoQ9ttryPBOoMPCiQTlw==') # noqa: E501 def test_basic(self): - hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm="PSS", salt_length=0, headers=[ + hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS(salt_length=0), headers=[ '(request-target)', 'host', 'date', @@ -79,7 +81,7 @@ def test_basic(self): self.assertEqual(params['signature'], 'KkF4oeOJJH9TaYjQdaU634G7AVmM5Bf3fnfJCBZ7G0H5puW5XlQTpduA+TgouKOJhbv4aRRpunPzCHUxUjEvrR3TSALqW1EOsBwCVIusE9CnrhL7vUOvciIDai/jI15RsfR9+XyTmOSFbsI07E8mmywr3nLeWX6AAFDMO2vWc21zZxrSc13vFfAkVvFhXLxO4g0bBm6Z4m5/9ytWtdE0Gf3St2kY8aZTedllRCS8cMx8GVAIw/qYGeIlGKUCZKxrFxnviN7gfxixwova6lcxpppIo+WXxEiwMJfSQBlx0WGn3A3twCv6TsIxPOVUEW4jcogDh+jGFf1aGdVyHquTRQ==') # noqa: E501 def test_all(self): - hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm="PSS", salt_length=0, headers=[ + hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS("sha512", salt_length=0), headers=[ '(request-target)', 'host', 'date', @@ -129,3 +131,13 @@ def test_default_deprecated_256(self): self.assertEqual(params['keyId'], 'Test') self.assertEqual(params['algorithm'], 'rsa-sha256') self.assertEqual(params['signature'], 'jKyvPcxB4JbmYY4mByyBY7cZfNl4OW9HpFQlG7N4YcJPteKTu4MWCLyk+gIr0wDgqtLWf9NLpMAMimdfsH7FSWGfbMFSrsVTHNTk0rK3usrfFnti1dxsM4jl0kYJCKTGI/UWkqiaxwNiKqGcdlEDrTcUhhsFsOIo8VhddmZTZ8w=') # noqa: E501 + + def test_unsupported_hash_algorithm(self): + with pytest.raises(HttpSigException) as e: + sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS("sha123", salt_length=0)) + self.assertEqual(str(e.value), "Unsupported hash algorithm") + + def test_deprecated_hash_algorithm(self): + with pytest.raises(HttpSigException) as e: + sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS("sha256", salt_length=0)) + self.assertEqual(str(e.value), "Hash algorithm: sha256 is deprecated. Please use: sha512") \ No newline at end of file diff --git a/httpsig/tests/test_verify.py b/httpsig/tests/test_verify.py index fe8853b..d5f9785 100755 --- a/httpsig/tests/test_verify.py +++ b/httpsig/tests/test_verify.py @@ -4,9 +4,9 @@ import unittest from httpsig.sign import HeaderSigner, Signer +from httpsig.sign_algorithms import PSS from httpsig.verify import HeaderVerifier, Verifier - sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) @@ -48,7 +48,7 @@ def setUp(self): def test_basic_sign(self): signer = Signer(secret=self.sign_secret, algorithm=self.algorithm, sign_algorithm=self.sign_algorithm) verifier = Verifier( - secret=self.verify_secret, algorithm=self.algorithm, sign_algorithm=self.sign_algorithm) + secret=self.verify_secret, algorithm=self.algorithm, sign_algorithm=self.sign_algorithm) GOOD = b"this is a test" BAD = b"this is not the signature you were looking for..." @@ -76,19 +76,19 @@ def test_signed_headers(self): METHOD = self.test_method PATH = self.test_path hs = HeaderSigner( - key_id="Test", - secret=self.sign_secret, - algorithm=self.algorithm, - sign_header=self.sign_header, - headers=[ - '(request-target)', - 'host', - 'date', - 'content-type', - 'digest', - 'content-length' - ], - sign_algorithm=self.sign_algorithm) + key_id="Test", + secret=self.sign_secret, + algorithm=self.algorithm, + sign_header=self.sign_header, + headers=[ + '(request-target)', + 'host', + 'date', + 'content-type', + 'digest', + 'content-length' + ], + sign_algorithm=self.sign_algorithm) unsigned = { 'Host': HOST, 'Date': self.header_date, @@ -99,9 +99,9 @@ def test_signed_headers(self): signed = hs.sign(unsigned, method=METHOD, path=PATH) hv = HeaderVerifier( - headers=signed, secret=self.verify_secret, - host=HOST, method=METHOD, path=PATH, - sign_header=self.sign_header, sign_algorithm=self.sign_algorithm) + headers=signed, secret=self.verify_secret, + host=HOST, method=METHOD, path=PATH, + sign_header=self.sign_header, sign_algorithm=self.sign_algorithm) self.assertTrue(hv.verify()) def test_incorrect_headers(self): @@ -141,18 +141,18 @@ def test_extra_auth_headers(self): METHOD = "POST" PATH = '/foo?param=value&pet=dog' hs = HeaderSigner( - key_id="Test", - secret=self.sign_secret, - sign_header=self.sign_header, - algorithm=self.algorithm, headers=[ - '(request-target)', - 'host', - 'date', - 'content-type', - 'digest', - 'content-length' - ], - sign_algorithm=self.sign_algorithm) + key_id="Test", + secret=self.sign_secret, + sign_header=self.sign_header, + algorithm=self.algorithm, headers=[ + '(request-target)', + 'host', + 'date', + 'content-type', + 'digest', + 'content-length' + ], + sign_algorithm=self.sign_algorithm) unsigned = { 'Host': HOST, 'Date': self.header_date, @@ -162,13 +162,13 @@ def test_extra_auth_headers(self): } signed = hs.sign(unsigned, method=METHOD, path=PATH) hv = HeaderVerifier( - headers=signed, - secret=self.verify_secret, - method=METHOD, - path=PATH, - sign_header=self.sign_header, - required_headers=['date', '(request-target)'], - sign_algorithm=self.sign_algorithm) + headers=signed, + secret=self.verify_secret, + method=METHOD, + path=PATH, + sign_header=self.sign_header, + required_headers=['date', '(request-target)'], + sign_algorithm=self.sign_algorithm) self.assertTrue(hv.verify()) @@ -190,13 +190,13 @@ class TestVerifyRSASHA1(TestVerifyHMACSHA1): def setUp(self): private_key_path = os.path.join( - os.path.dirname(__file__), + os.path.dirname(__file__), 'rsa_private_1024.pem') with open(private_key_path, 'rb') as f: private_key = f.read() public_key_path = os.path.join( - os.path.dirname(__file__), + os.path.dirname(__file__), 'rsa_public_1024.pem') with open(public_key_path, 'rb') as f: public_key = f.read() @@ -229,15 +229,11 @@ class TestVerifyRSASHA512ChangeHeader(TestVerifyRSASHA1): class TestVerifyHS2019PSS(TestVerifyHMACSHA1): def setUp(self): - private_key_path = os.path.join( - os.path.dirname(__file__), - 'rsa_private_2048.pem') + private_key_path = os.path.join(os.path.dirname(__file__), 'rsa_private_2048.pem') with open(private_key_path, 'rb') as f: private_key = f.read() - public_key_path = os.path.join( - os.path.dirname(__file__), - 'rsa_public_2048.pem') + public_key_path = os.path.join(os.path.dirname(__file__), 'rsa_public_2048.pem') with open(public_key_path, 'rb') as f: public_key = f.read() @@ -245,5 +241,4 @@ def setUp(self): self.algorithm = "hs2019" self.sign_secret = private_key self.verify_secret = public_key - self.sign_algorithm = "PSS" - + self.sign_algorithm = PSS(salt_length=0) diff --git a/httpsig/utils.py b/httpsig/utils.py index 135926f..9cef5e5 100644 --- a/httpsig/utils.py +++ b/httpsig/utils.py @@ -25,10 +25,6 @@ 'sha256': SHA256, 'sha512': SHA512} -SIGN_ALGORITHMS = frozenset([ - "PSS" -]) - class HttpSigException(Exception): pass diff --git a/httpsig/verify.py b/httpsig/verify.py index 7501daf..b6bc26e 100644 --- a/httpsig/verify.py +++ b/httpsig/verify.py @@ -5,6 +5,7 @@ import six from .sign import Signer, DEFAULT_SIGN_ALGORITHM +from .sign_algorithms import SIGN_ALGORITHMS from .utils import * @@ -37,10 +38,8 @@ def _verify(self, data, signature): s = base64.b64decode(signature) return ct_bytes_compare(h, s) - elif self.sign_algorithm == 'PSS': - h = self._hash.new() - h.update(data) - return self._rsa.verify(h, base64.b64decode(signature)) + elif self.sign_algorithm.__class__.__name__ in SIGN_ALGORITHMS: + return self.sign_algorithm.verify(self.secret, data, signature) else: raise HttpSigException("Unsupported algorithm.") @@ -98,7 +97,7 @@ def __init__(self, headers, secret, required_headers=None, method=None, raise HttpSigException("Required sign algorithm for {} algorithm not set".format(DEFAULT_SIGN_ALGORITHM)) super(HeaderVerifier, self).__init__( - secret, algorithm=self.auth_dict['algorithm'], sign_algorithm=sign_algorithm, salt_length=salt_length) + secret, algorithm=self.auth_dict['algorithm'], sign_algorithm=sign_algorithm) def verify(self): """