Skip to content
This repository has been archived by the owner on Apr 28, 2022. It is now read-only.

Commit

Permalink
Create sign_algorithms with first PSS class
Browse files Browse the repository at this point in the history
  • Loading branch information
fulder committed Aug 25, 2020
1 parent 0a57e36 commit c4e36fb
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 82 deletions.
36 changes: 15 additions & 21 deletions httpsig/sign.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from __future__ import print_function
import base64
import six

from Crypto.Hash import HMAC
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Signature import PKCS1_PSS

from .sign_algorithms import SIGN_ALGORITHMS
from .utils import *


DEFAULT_SIGN_ALGORITHM = "hs2019"
DEFAULT_SALT_LENGTH = None


class Signer(object):
Expand All @@ -20,14 +18,15 @@ class Signer(object):

Password-protected keyfiles are not supported.
"""
def __init__(self, secret, algorithm=None, sign_algorithm=None, salt_length=None):

def __init__(self, secret, algorithm=None, sign_algorithm=None):
if algorithm is None:
algorithm = DEFAULT_SIGN_ALGORITHM
if salt_length is None:
salt_length = DEFAULT_SALT_LENGTH

assert algorithm in ALGORITHMS, "Unknown algorithm"
assert sign_algorithm is None or sign_algorithm in SIGN_ALGORITHMS, "Unsupported digital signature algorithm"

if sign_algorithm is not None and sign_algorithm.__class__.__name__ not in SIGN_ALGORITHMS:
raise HttpSigException("Unsupported digital signature algorithm")

if algorithm != DEFAULT_SIGN_ALGORITHM:
print("Algorithm: {} is deprecated please update to {}".format(algorithm, DEFAULT_SIGN_ALGORITHM))
Expand All @@ -38,13 +37,13 @@ def __init__(self, secret, algorithm=None, sign_algorithm=None, salt_length=None
self._rsa = None
self._hash = None
self.algorithm = algorithm
self.secret = secret

if "-" in algorithm:
self.sign_algorithm, self.hash_algorithm = algorithm.split('-')
elif algorithm == "hs2019":
assert sign_algorithm is not None, "Required digital signature algorithm not specified"
self.sign_algorithm = sign_algorithm
self.hash_algorithm = "sha512"

if self.sign_algorithm == 'rsa':
try:
Expand All @@ -58,14 +57,6 @@ def __init__(self, secret, algorithm=None, sign_algorithm=None, salt_length=None
self._hash = HMAC.new(secret,
digestmod=HASHES[self.hash_algorithm])

elif self.sign_algorithm == "PSS":
try:
rsa_key = RSA.importKey(secret)
self._rsa = PKCS1_PSS.new(rsa_key, saltLen=salt_length)
self._hash = HASHES[self.hash_algorithm]
except ValueError:
raise HttpSigException("Invalid key.")

def _sign_rsa(self, data):
if isinstance(data, six.string_types):
data = data.encode("ascii")
Expand All @@ -88,6 +79,8 @@ def sign(self, data):
signed = self._sign_rsa(data)
elif self._hash:
signed = self._sign_hmac(data)
elif self.sign_algorithm.__class__.__name__ in SIGN_ALGORITHMS:
signed = self.sign_algorithm.sign(self.secret, data)
if not signed:
raise SystemError('No valid encryptor found.')
return base64.b64encode(signed).decode("ascii")
Expand All @@ -111,14 +104,15 @@ class HeaderSigner(Signer):
:param sign_header: header used to include signature, defaulting to
'authorization'.
"""
def __init__(self, key_id, secret, algorithm=None, sign_algorithm=None, salt_length=None, headers=None, sign_header='authorization'):

def __init__(self, key_id, secret, algorithm=None, sign_algorithm=None, headers=None, sign_header='authorization'):
if algorithm is None:
algorithm = DEFAULT_SIGN_ALGORITHM

super(HeaderSigner, self).__init__(secret=secret, algorithm=algorithm, sign_algorithm=sign_algorithm, salt_length=salt_length)
super(HeaderSigner, self).__init__(secret=secret, algorithm=algorithm, sign_algorithm=sign_algorithm)
self.headers = headers or ['date']
self.signature_template = build_signature_template(
key_id, algorithm, headers, sign_header)
key_id, algorithm, headers, sign_header)
self.sign_header = sign_header

def sign(self, headers, host=None, method=None, path=None):
Expand All @@ -134,7 +128,7 @@ def sign(self, headers, host=None, method=None, path=None):
headers = CaseInsensitiveDict(headers)
required_headers = self.headers or ['date']
signable = generate_message(
required_headers, headers, host, method, path)
required_headers, headers, host, method, path)

signature = super(HeaderSigner, self).sign(signable)
headers[self.sign_header] = self.signature_template % signature
Expand Down
53 changes: 53 additions & 0 deletions httpsig/sign_algorithms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import base64

import six
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_PSS
from httpsig.utils import HttpSigException, HASHES

DEFAULT_HASH_ALGORITHM = "sha512"


class PSS(object):

def __init__(self, hash_algorithm=DEFAULT_HASH_ALGORITHM, salt_length=None, mgfunc=None):
if hash_algorithm not in HASHES:
raise HttpSigException("Unsupported hash algorithm")

if hash_algorithm != DEFAULT_HASH_ALGORITHM:
raise HttpSigException(
"Hash algorithm: {} is deprecated. Please use: {}".format(hash_algorithm, DEFAULT_HASH_ALGORITHM))

self.hash_algorithm = HASHES[hash_algorithm]
self.salt_length = salt_length
self.mgfunc = mgfunc

def _create_pss(self, key):
try:
rsa_key = RSA.importKey(key)
pss = PKCS1_PSS.new(rsa_key, saltLen=self.salt_length, mgfunc=self.mgfunc)
except ValueError:
raise HttpSigException("Invalid key.")
return pss

def sign(self, private_key, data):
pss = self._create_pss(private_key)

if isinstance(data, six.string_types):
data = data.encode("ascii")

h = self.hash_algorithm.new()
h.update(data)
return pss.sign(h)

def verify(self, public_key, data, signature):
pss = self._create_pss(public_key)

h = self.hash_algorithm.new()
h.update(data)
return pss.verify(h, base64.b64decode(signature))


SIGN_ALGORITHMS = frozenset([
"PSS"
])
22 changes: 17 additions & 5 deletions httpsig/tests/test_signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

import unittest

import httpsig.sign as sign
from httpsig.utils import parse_authorization_header
import pytest

import httpsig.sign as sign
from httpsig.sign_algorithms import PSS
from httpsig.utils import parse_authorization_header, HttpSigException

sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))

Expand Down Expand Up @@ -34,7 +36,7 @@ def setUp(self):
self.key_1024 = f.read()

def test_default(self):
hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm="PSS", salt_length=0)
hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS(hash_algorithm="sha512", salt_length=0))
unsigned = {
'Date': self.header_date
}
Expand All @@ -52,7 +54,7 @@ def test_default(self):
self.assertEqual(params['signature'], 'T8+Cj3Zp2cBDm2r8/loPgfHUSSFXXyZJNxxbNx1NvKVz/r5T4z6pVxhl9rqk8WfYHMdlh2aT5hCrYKvhs88Jy0DDmeUP4nELWRsO1BF0oAqHfcrbEikZQL7jA6z0guVaLr0S5QRGmd1K5HUEkP/vYEOns+FRL+JrFG4dNJNESvG5iyKUoaXfoZCFdqtzLlIteEAL7dW/kaX/dE116wfpbem1eCABuGopRhuFtjqLKVjuUVwyP/zSYTqd9j+gDhinkAifTJPxbGMh0b5LZdNCqw5irT9NkTcTFRXDp8ioX8r805Z9QhjT7H+rSo350U2LsAFoQ9ttryPBOoMPCiQTlw==') # noqa: E501

def test_basic(self):
hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm="PSS", salt_length=0, headers=[
hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS(salt_length=0), headers=[
'(request-target)',
'host',
'date',
Expand All @@ -79,7 +81,7 @@ def test_basic(self):
self.assertEqual(params['signature'], 'KkF4oeOJJH9TaYjQdaU634G7AVmM5Bf3fnfJCBZ7G0H5puW5XlQTpduA+TgouKOJhbv4aRRpunPzCHUxUjEvrR3TSALqW1EOsBwCVIusE9CnrhL7vUOvciIDai/jI15RsfR9+XyTmOSFbsI07E8mmywr3nLeWX6AAFDMO2vWc21zZxrSc13vFfAkVvFhXLxO4g0bBm6Z4m5/9ytWtdE0Gf3St2kY8aZTedllRCS8cMx8GVAIw/qYGeIlGKUCZKxrFxnviN7gfxixwova6lcxpppIo+WXxEiwMJfSQBlx0WGn3A3twCv6TsIxPOVUEW4jcogDh+jGFf1aGdVyHquTRQ==') # noqa: E501

def test_all(self):
hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm="PSS", salt_length=0, headers=[
hs = sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS("sha512", salt_length=0), headers=[
'(request-target)',
'host',
'date',
Expand Down Expand Up @@ -129,3 +131,13 @@ def test_default_deprecated_256(self):
self.assertEqual(params['keyId'], 'Test')
self.assertEqual(params['algorithm'], 'rsa-sha256')
self.assertEqual(params['signature'], 'jKyvPcxB4JbmYY4mByyBY7cZfNl4OW9HpFQlG7N4YcJPteKTu4MWCLyk+gIr0wDgqtLWf9NLpMAMimdfsH7FSWGfbMFSrsVTHNTk0rK3usrfFnti1dxsM4jl0kYJCKTGI/UWkqiaxwNiKqGcdlEDrTcUhhsFsOIo8VhddmZTZ8w=') # noqa: E501

def test_unsupported_hash_algorithm(self):
with pytest.raises(HttpSigException) as e:
sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS("sha123", salt_length=0))
self.assertEqual(str(e.value), "Unsupported hash algorithm")

def test_deprecated_hash_algorithm(self):
with pytest.raises(HttpSigException) as e:
sign.HeaderSigner(key_id='Test', secret=self.key_2048, sign_algorithm=PSS("sha256", salt_length=0))
self.assertEqual(str(e.value), "Hash algorithm: sha256 is deprecated. Please use: sha512")
89 changes: 42 additions & 47 deletions httpsig/tests/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import unittest

from httpsig.sign import HeaderSigner, Signer
from httpsig.sign_algorithms import PSS
from httpsig.verify import HeaderVerifier, Verifier


sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))


Expand Down Expand Up @@ -48,7 +48,7 @@ def setUp(self):
def test_basic_sign(self):
signer = Signer(secret=self.sign_secret, algorithm=self.algorithm, sign_algorithm=self.sign_algorithm)
verifier = Verifier(
secret=self.verify_secret, algorithm=self.algorithm, sign_algorithm=self.sign_algorithm)
secret=self.verify_secret, algorithm=self.algorithm, sign_algorithm=self.sign_algorithm)

GOOD = b"this is a test"
BAD = b"this is not the signature you were looking for..."
Expand Down Expand Up @@ -76,19 +76,19 @@ def test_signed_headers(self):
METHOD = self.test_method
PATH = self.test_path
hs = HeaderSigner(
key_id="Test",
secret=self.sign_secret,
algorithm=self.algorithm,
sign_header=self.sign_header,
headers=[
'(request-target)',
'host',
'date',
'content-type',
'digest',
'content-length'
],
sign_algorithm=self.sign_algorithm)
key_id="Test",
secret=self.sign_secret,
algorithm=self.algorithm,
sign_header=self.sign_header,
headers=[
'(request-target)',
'host',
'date',
'content-type',
'digest',
'content-length'
],
sign_algorithm=self.sign_algorithm)
unsigned = {
'Host': HOST,
'Date': self.header_date,
Expand All @@ -99,9 +99,9 @@ def test_signed_headers(self):
signed = hs.sign(unsigned, method=METHOD, path=PATH)

hv = HeaderVerifier(
headers=signed, secret=self.verify_secret,
host=HOST, method=METHOD, path=PATH,
sign_header=self.sign_header, sign_algorithm=self.sign_algorithm)
headers=signed, secret=self.verify_secret,
host=HOST, method=METHOD, path=PATH,
sign_header=self.sign_header, sign_algorithm=self.sign_algorithm)
self.assertTrue(hv.verify())

def test_incorrect_headers(self):
Expand Down Expand Up @@ -141,18 +141,18 @@ def test_extra_auth_headers(self):
METHOD = "POST"
PATH = '/foo?param=value&pet=dog'
hs = HeaderSigner(
key_id="Test",
secret=self.sign_secret,
sign_header=self.sign_header,
algorithm=self.algorithm, headers=[
'(request-target)',
'host',
'date',
'content-type',
'digest',
'content-length'
],
sign_algorithm=self.sign_algorithm)
key_id="Test",
secret=self.sign_secret,
sign_header=self.sign_header,
algorithm=self.algorithm, headers=[
'(request-target)',
'host',
'date',
'content-type',
'digest',
'content-length'
],
sign_algorithm=self.sign_algorithm)
unsigned = {
'Host': HOST,
'Date': self.header_date,
Expand All @@ -162,13 +162,13 @@ def test_extra_auth_headers(self):
}
signed = hs.sign(unsigned, method=METHOD, path=PATH)
hv = HeaderVerifier(
headers=signed,
secret=self.verify_secret,
method=METHOD,
path=PATH,
sign_header=self.sign_header,
required_headers=['date', '(request-target)'],
sign_algorithm=self.sign_algorithm)
headers=signed,
secret=self.verify_secret,
method=METHOD,
path=PATH,
sign_header=self.sign_header,
required_headers=['date', '(request-target)'],
sign_algorithm=self.sign_algorithm)
self.assertTrue(hv.verify())


Expand All @@ -190,13 +190,13 @@ class TestVerifyRSASHA1(TestVerifyHMACSHA1):

def setUp(self):
private_key_path = os.path.join(
os.path.dirname(__file__),
os.path.dirname(__file__),
'rsa_private_1024.pem')
with open(private_key_path, 'rb') as f:
private_key = f.read()

public_key_path = os.path.join(
os.path.dirname(__file__),
os.path.dirname(__file__),
'rsa_public_1024.pem')
with open(public_key_path, 'rb') as f:
public_key = f.read()
Expand Down Expand Up @@ -229,21 +229,16 @@ class TestVerifyRSASHA512ChangeHeader(TestVerifyRSASHA1):
class TestVerifyHS2019PSS(TestVerifyHMACSHA1):

def setUp(self):
private_key_path = os.path.join(
os.path.dirname(__file__),
'rsa_private_2048.pem')
private_key_path = os.path.join(os.path.dirname(__file__), 'rsa_private_2048.pem')
with open(private_key_path, 'rb') as f:
private_key = f.read()

public_key_path = os.path.join(
os.path.dirname(__file__),
'rsa_public_2048.pem')
public_key_path = os.path.join(os.path.dirname(__file__), 'rsa_public_2048.pem')
with open(public_key_path, 'rb') as f:
public_key = f.read()

self.keyId = "Test"
self.algorithm = "hs2019"
self.sign_secret = private_key
self.verify_secret = public_key
self.sign_algorithm = "PSS"

self.sign_algorithm = PSS(salt_length=0)
4 changes: 0 additions & 4 deletions httpsig/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
'sha256': SHA256,
'sha512': SHA512}

SIGN_ALGORITHMS = frozenset([
"PSS"
])


class HttpSigException(Exception):
pass
Expand Down
Loading

0 comments on commit c4e36fb

Please sign in to comment.