diff --git a/itss.py b/itss.py index 9b0c7cf..8d7f653 100755 --- a/itss.py +++ b/itss.py @@ -53,10 +53,17 @@ def __init__(self, directory, ea_url, aa_url, hsm): elif hsm == 'cicada': import itss.hsm_cicada self.HSM = itss.hsm_cicada.CicadaHSM() + elif hsm == 'yubikey': + import itss.hsm_yubikey + self.HSM = itss.hsm_yubikey.YubikeyHSM() else: raise RuntimeError("Unknown/unsupported HSM '{}'".format(hsm)) + def close(self): + self.HSM.close() + + def generate_private_key(self): self.HSM.generate_private_key() self.EC = None @@ -230,6 +237,8 @@ def authorize(self): # Send request to Authorization Authority r = requests.put(self.AA_url + '/cits/ts_102941_v111/aa/approve', data=encoded_ar) + if (r.status_code != 200): + raise RuntimeError("Server error when calling AA approve: {}".format(r.status_code)) AuthorizationResponse = self.asn1.decode('AuthorizationResponse', r.content) if AuthorizationResponse[0] not in ('successfulExplicitAuthorization', 'successfulImplicitAuthorization'): @@ -335,7 +344,7 @@ def main(): parser.add_argument('-e', '--ea-url', default="https://via.teskalabs.com/croads/demo-ca", help='URL of the Enrollment Authority') parser.add_argument('-a', '--aa-url', default="https://via.teskalabs.com/croads/demo-ca", help='URL of the Authorization Authority') parser.add_argument('-i', '--enrollment-id', help='Specify a custom enrollment ID') - parser.add_argument('-H', '--hsm', default="emulated", choices=['cicada', 'emulated'], help='Use the HSM to store a private key.') + parser.add_argument('-H', '--hsm', default="emulated", choices=['cicada', 'yubikey', 'emulated'], help='Use the HSM to store a private key.') parser.add_argument('--g5-sim', default="224.1.1.1 5007 32 auto", help='Configuration of G5 simulator') args = parser.parse_args() @@ -402,6 +411,7 @@ async def periodic_sender(): except KeyboardInterrupt: pass + itss.close() loop.close() diff --git a/itss/hsm_abc.py b/itss/hsm_abc.py index 28e557f..68ef789 100644 --- a/itss/hsm_abc.py +++ b/itss/hsm_abc.py @@ -3,6 +3,11 @@ class HSM(abc.ABC): + + def close(self): + pass + + def load(self): ''' Load from persistent storage. diff --git a/itss/hsm_cicada.py b/itss/hsm_cicada.py index 7647102..051d31e 100644 --- a/itss/hsm_cicada.py +++ b/itss/hsm_cicada.py @@ -8,6 +8,8 @@ def __init__(self): self._PrivateKey = None self.P11URI = None + self.Label = "test-key" + backend = cryptography.hazmat.backends.default_backend() backend._lib.ENGINE_load_dynamic() engine = backend._lib.ENGINE_by_id(b"dynamic"); @@ -25,11 +27,14 @@ def __init__(self): def generate_private_key(self): + ''' + pkcs11-tool --module cicada-pkcs11.so --keypairgen --key-type EC:secp256r1 --label "test-key" --usage-sign + ''' print("Not implemented yet!") def load(self): - p11uri = "pkcs11:object=test-key;type=private" + p11uri = "pkcs11:object={};type=private".format(self.Label) backend = cryptography.hazmat.backends.default_backend() pkey = backend._lib.ENGINE_load_private_key( self.Engine, diff --git a/itss/hsm_yubikey.py b/itss/hsm_yubikey.py new file mode 100644 index 0000000..68fad01 --- /dev/null +++ b/itss/hsm_yubikey.py @@ -0,0 +1,57 @@ +import PyKCS11 + +import cryptography.hazmat.primitives.serialization + +from .hsm_abc import HSM + +class YubikeyHSM(HSM): + + def __init__(self): + self._privateKey = None + self._pkcs11 = PyKCS11.PyKCS11Lib() + self._pkcs11.load(pkcs11dll_filename="/Applications/YubiKey PIV Manager.app/Contents/MacOS/libykcs11.1.dylib") + self._session = None + + + def close(self): + if self._session is None: + self._session.logout() + self._session.closeSession() + + + def generate_private_key(self): + print("Not implemented yet!") + + + def load(self): + self._slot = self._pkcs11.getSlotList(tokenPresent=True)[0] + self._session = self._pkcs11.openSession(self._slot, PyKCS11.CKF_SERIAL_SESSION) + self._session.login("11223344") + + self._publicKey = self._session.findObjects([(PyKCS11.CKA_CLASS, PyKCS11.CKO_PUBLIC_KEY)])[0] + self._privateKey = self._session.findObjects([(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY)])[0] + + return True + + + def get_public_key(self): + + ec_point = self._session.getAttributeValue(self._publicKey, [PyKCS11.CKA_EC_POINT])[0] + assert(ec_point[0] == 0x04) # OCTEC STRING + assert(ec_point[1] == 0x41) # Length 64+1 bytes + assert(ec_point[2] == 0x04) # Uncompresses + + public_key_numbers = cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicNumbers( + int.from_bytes(ec_point[3:32+3], "big"), # X + int.from_bytes(ec_point[32+3:], "big"), # Y + cryptography.hazmat.primitives.asymmetric.ec.SECP256R1() + ) + + backend = cryptography.hazmat.backends.default_backend() + return public_key_numbers.public_key(backend) + + + def sign(self, payload): + rs = self._session.sign(self._privateKey, payload, mecha=PyKCS11.Mechanism(PyKCS11.CKM_ECDSA_SHA256, None)) + r, s = int.from_bytes(rs[:32], "big"), int.from_bytes(rs[32:], "big") + return r, s diff --git a/itss/secure_message.py b/itss/secure_message.py index 100ae71..93394a4 100644 --- a/itss/secure_message.py +++ b/itss/secure_message.py @@ -80,6 +80,9 @@ def verify(self, itss): else: raise RuntimeError("Unsupported SignerInfo type {}".format(self.Headers['SignerInfo']['type'])) + if signer_certificate is None: + raise RuntimeError("Unable to find the authorization ticket for a received message.") + s = int.from_bytes(self.Signature['ecdsa_signature']['s'], byteorder='big') r = self.Signature['ecdsa_signature']['R']['x'] @@ -92,9 +95,6 @@ def verify(self, itss): ) ) - if signer_certificate is None: - raise RuntimeError("Unable to find the authorization ticket for a received message.") - return signer_certificate