Skip to content

Commit

Permalink
Add a support for Cicada HSM (from a CryptaLabs).
Browse files Browse the repository at this point in the history
  • Loading branch information
ateska committed Aug 16, 2018
1 parent 69d1f5a commit 1026f35
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 45 deletions.
58 changes: 21 additions & 37 deletions itss.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ class ITSS(object):
asn1 = asn1tools.compile_files(fname, 'der')


def __init__(self, directory, ea_url, aa_url):
self.PrivateKey = None
def __init__(self, directory, ea_url, aa_url, hsm):
self.EC = None # Enrollment credentials
self.AT = None # Authorization ticket

Expand All @@ -48,13 +47,20 @@ def __init__(self, directory, ea_url, aa_url):
if not os.path.isdir(cert_dir):
os.mkdir(cert_dir)

if hsm == 'emulated':
import itss.hsm_emulated
self.HSM = itss.hsm_emulated.EmulatedHSM(self.Directory)
elif hsm == 'cicada':
import itss.hsm_cicada
self.HSM = itss.hsm_cicada.CicadaHSM()
else:
raise RuntimeError("Unknown/unsupported HSM '{}'".format(hsm))


def generate_private_key(self):
self.PrivateKey = cryptography.hazmat.primitives.asymmetric.ec.generate_private_key(
cryptography.hazmat.primitives.asymmetric.ec.SECP256R1(),
cryptography.hazmat.backends.default_backend()
)
self.HSM.generate_private_key()
self.EC = None
self.AT = None


def enroll(self, enrollment_id):
Expand All @@ -63,7 +69,7 @@ def enroll(self, enrollment_id):
The process is described in CITS / ETSI TS 102 941 V1.1.1
'''

verification_public_key = self.PrivateKey.public_key()
verification_public_key = self.HSM.get_public_key()
verification_public_numbers = verification_public_key.public_numbers()

response_encryption_private_key = cryptography.hazmat.primitives.asymmetric.ec.generate_private_key(
Expand Down Expand Up @@ -136,13 +142,7 @@ def enroll(self, enrollment_id):
encoded_er += struct.pack(">B", 0xa1) + self.asn1.encode('ToBeSignedEnrolmentCertificateRequest', EnrolmentRequest['enrolCertRequest'])[1:]

# Sign with ecdsa_nistp256_with_sha256
signature_RFC3279 = self.PrivateKey.sign(
encoded_er,
cryptography.hazmat.primitives.asymmetric.ec.ECDSA(
cryptography.hazmat.primitives.hashes.SHA256()
)
)
r, s = cryptography.hazmat.primitives.asymmetric.utils.decode_dss_signature(signature_RFC3279)
r, s = self.HSM.sign(encoded_er)

EnrolmentRequest['signature'] = {
'r': {
Expand Down Expand Up @@ -215,13 +215,7 @@ def authorize(self):
encoded_ar += struct.pack(">B", 0xa1) + itss.encode_der_length(len(acr)) + acr

# Sign with ecdsa_nistp256_with_sha256
signature_RFC3279 = self.PrivateKey.sign(
encoded_ar,
cryptography.hazmat.primitives.asymmetric.ec.ECDSA(
cryptography.hazmat.primitives.hashes.SHA256()
)
)
r, s = cryptography.hazmat.primitives.asymmetric.utils.decode_dss_signature(signature_RFC3279)
r, s = self.HSM.sign(encoded_ar)

AuthorizationRequest['signature'] = {
'r': {
Expand Down Expand Up @@ -251,12 +245,7 @@ def authorize(self):


def store(self):
x = self.PrivateKey.private_bytes(
encoding=cryptography.hazmat.primitives.serialization.Encoding.DER,
format=cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8,
encryption_algorithm=cryptography.hazmat.primitives.serialization.BestAvailableEncryption(b'strong-and-secret :-)')
)
open(os.path.join(self.Directory, 'itss.key'),'wb').write(x)
self.HSM.store()

if self.EC is not None:
open(os.path.join(self.Directory, 'itss.ec'),'wb').write(self.EC.Data)
Expand All @@ -270,17 +259,11 @@ def store(self):


def load(self):
assert(self.PrivateKey is None)
assert(self.EC is None)
assert(self.AT is None)

try:
self.PrivateKey = cryptography.hazmat.primitives.serialization.load_der_private_key(
open(os.path.join(self.Directory, 'itss.key'),'rb').read(),
password=b'strong-and-secret :-)',
backend=cryptography.hazmat.backends.default_backend()
)
except:
ok = self.HSM.load()
if not ok:
return False

try:
Expand Down Expand Up @@ -352,11 +335,12 @@ def main():
parser.add_argument('-e', '--ea-url', default="https://via.teskalabs.com/croads/demo-ca", help='URL of the Enrollment Authority')
parser.add_argument('-a', '--aa-url', default="https://via.teskalabs.com/croads/demo-ca", help='URL of the Authorization Authority')
parser.add_argument('-i', '--enrollment-id', help='Specify a custom enrollment ID')
parser.add_argument('-H', '--hsm', default="emulated", choices=['cicada', 'emulated'], help='Use the HSM to store a private key.')
parser.add_argument('--g5-sim', default="224.1.1.1 5007 32 auto", help='Configuration of G5 simulator')

args = parser.parse_args()

itss_obj = ITSS(args.DIR, args.ea_url, args.aa_url)
itss_obj = ITSS(args.DIR, args.ea_url, args.aa_url, args.hsm)
ok = itss_obj.load()
store = False
if not ok:
Expand Down Expand Up @@ -404,7 +388,7 @@ def datagram_received(self, data, addr):
async def periodic_sender():
while True:
smb = itss.CITS103097v121SecureMessageBuilder()
msg = smb.finish(itss_obj.AT, itss_obj.PrivateKey, "payload from '{}'".format(platform.node()))
msg = smb.finish(itss_obj.AT, itss_obj.HSM, "payload from '{}'".format(platform.node()))

g5sim.send(msg)
await asyncio.sleep(1)
Expand Down
42 changes: 42 additions & 0 deletions itss/hsm_abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import abc


class HSM(abc.ABC):

def load(self):
'''
Load from persistent storage.
Return True if OK (private key exists) or False if privake key doesn't exists
'''
return False


def store(self):
'''
Store to persistent storage. Can be NOOP.
'''
pass


@abc.abstractmethod
def generate_private_key(self):
'''
Generate a private key.
'''
pass


@abc.abstractmethod
def get_public_key(self):
'''
Get a public key for a private key
'''
pass


@abc.abstractmethod
def sign(self, payload):
'''
Sign a payload, return 'r' and 's'.
'''
pass
59 changes: 59 additions & 0 deletions itss/hsm_cicada.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import cryptography.hazmat.backends

from .hsm_abc import HSM

class CicadaHSM(HSM):

def __init__(self):
self._PrivateKey = None
self.P11URI = None

backend = cryptography.hazmat.backends.default_backend()
backend._lib.ENGINE_load_dynamic()
engine = backend._lib.ENGINE_by_id(b"dynamic");
backend.openssl_assert(engine != backend._ffi.NULL)
engine = backend._ffi.gc(engine, backend._lib.ENGINE_free)

backend._lib.ENGINE_ctrl_cmd_string(engine, b"SO_PATH", b"/usr/lib/arm-linux-gnueabihf/engines-1.1/libpkcs11.so", 0)
backend._lib.ENGINE_ctrl_cmd_string(engine, b"ID", b"pkcs11", 0)
backend._lib.ENGINE_ctrl_cmd_string(engine, b"LOAD", backend._ffi.NULL, 0)
backend._lib.ENGINE_ctrl_cmd_string(engine, b"MODULE_PATH", b"/usr/lib/cicada-pkcs11.so", 0)
res = backend._lib.ENGINE_init(engine)
backend.openssl_assert(res > 0)

self.Engine = engine


def generate_private_key(self):
print("Not implemented yet!")


def load(self):
p11uri = "pkcs11:object=test-key;type=private"
backend = cryptography.hazmat.backends.default_backend()
pkey = backend._lib.ENGINE_load_private_key(
self.Engine,
p11uri.encode("utf-8"),
backend._ffi.NULL,
backend._ffi.NULL
)

backend.openssl_assert(pkey != backend._ffi.NULL)
pkey = backend._ffi.gc(pkey, backend._lib.EVP_PKEY_free)
self.P11URI = p11uri
self._PrivateKey = backend._evp_pkey_to_private_key(pkey)


def get_public_key(self):
return self._PrivateKey.public_key()


def sign(self, payload):
signature_RFC3279 = self._PrivateKey.sign(
payload,
cryptography.hazmat.primitives.asymmetric.ec.ECDSA(
cryptography.hazmat.primitives.hashes.SHA256()
)
)
r, s = cryptography.hazmat.primitives.asymmetric.utils.decode_dss_signature(signature_RFC3279)
return r, s
56 changes: 56 additions & 0 deletions itss/hsm_emulated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os.path

import cryptography.hazmat.primitives.asymmetric.ec
import cryptography.hazmat.primitives.serialization

from .hsm_abc import HSM


class EmulatedHSM(HSM):

def __init__(self, directory):
self.Directory = directory
self._PrivateKey = None


def load(self):
try:
self._PrivateKey = cryptography.hazmat.primitives.serialization.load_der_private_key(
open(os.path.join(self.Directory, 'itss.key'),'rb').read(),
password=b'strong-and-secret :-)',
backend=cryptography.hazmat.backends.default_backend()
)
except:
return False
return True


def store(self):
x = self._PrivateKey.private_bytes(
encoding=cryptography.hazmat.primitives.serialization.Encoding.DER,
format=cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8,
encryption_algorithm=cryptography.hazmat.primitives.serialization.BestAvailableEncryption(b'strong-and-secret :-)')
)
open(os.path.join(self.Directory, 'itss.key'),'wb').write(x)


def generate_private_key(self):
self._PrivateKey = cryptography.hazmat.primitives.asymmetric.ec.generate_private_key(
cryptography.hazmat.primitives.asymmetric.ec.SECP256R1(),
cryptography.hazmat.backends.default_backend()
)


def get_public_key(self):
return self._PrivateKey.public_key()


def sign(self, payload):
signature_RFC3279 = self._PrivateKey.sign(
payload,
cryptography.hazmat.primitives.asymmetric.ec.ECDSA(
cryptography.hazmat.primitives.hashes.SHA256()
)
)
r, s = cryptography.hazmat.primitives.asymmetric.utils.decode_dss_signature(signature_RFC3279)
return r, s
10 changes: 2 additions & 8 deletions itss/secure_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def _build_Trailer(self, r, s):
return covered_by_signature, build_var_length_vectors_with_variable_length_encoding(tf)


def finish(self, autorization_ticket, private_key, payload):
def finish(self, autorization_ticket, hsm, payload):
msg = struct.pack(">B", self.Version)
msg += self._build_HeaderField(autorization_ticket)

Expand All @@ -142,13 +142,7 @@ def finish(self, autorization_ticket, private_key, payload):
covered_by_signature, trailer = self._build_Trailer(0, 0)

# Trailer with signature / ecdsa_nistp256_with_sha256
signature_RFC3279 = private_key.sign(
msg + trailer[:covered_by_signature],
cryptography.hazmat.primitives.asymmetric.ec.ECDSA(
cryptography.hazmat.primitives.hashes.SHA256()
)
)
r, s = cryptography.hazmat.primitives.asymmetric.utils.decode_dss_signature(signature_RFC3279)
r, s = hsm.sign(msg + trailer[:covered_by_signature])
_, trailer = self._build_Trailer(r, s)

return msg + trailer

0 comments on commit 1026f35

Please sign in to comment.