From ea83d385b7b8da7f9ce9125da87fda95cdd6bcd4 Mon Sep 17 00:00:00 2001 From: Robin Krahl Date: Fri, 27 Oct 2023 19:02:04 +0200 Subject: [PATCH] Add backup validation (#52) This patch adds the Backup and EncryptedBackup classes. EncryptedBackup.parse can be used to parse and validate the format of a backup file. EncryptedBackup.decrypt can be used to decrypt a backup and validate the decrypted data. This also adds validation of the backup file to the unit tests. Fixes: https://github.com/Nitrokey/nethsm-sdk-py/issues/49 --- nethsm/backup.py | 117 ++++++++++++++++++++++++++++++++++++ pyproject.toml | 32 +++++----- tests/test_nethsm_system.py | 4 ++ 3 files changed, 137 insertions(+), 16 deletions(-) create mode 100644 nethsm/backup.py diff --git a/nethsm/backup.py b/nethsm/backup.py new file mode 100644 index 0000000..1ede915 --- /dev/null +++ b/nethsm/backup.py @@ -0,0 +1,117 @@ +# Based on export_backup.py from the NetHSM source by Sven Anderson + +import hashlib +import struct +from dataclasses import dataclass, field + +from cryptography.exceptions import InvalidTag +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + + +def _get_length(data: bytes) -> tuple[int, bytes]: + if len(data) < 3: + raise ValueError("Failed to read field length: unexpected EOF") + high, low = struct.unpack(">B H", data[:3]) + return ((high << 16) + low, data[3:]) + + +def _get_field(data: bytes) -> tuple[bytes, bytes]: + n, data = _get_length(data) + if len(data) < n: + raise ValueError("Failed to extract field: unexpected EOF") + return data[:n], data[n:] + + +def _decrypt(key: bytes, adata: bytes, data: bytes) -> bytes: + iv_size = 12 + ciphertext = data[iv_size:-16] + tag = data[-16:] + nonce = data[:iv_size] + + cipher = Cipher( + algorithms.AES(key), modes.GCM(nonce, tag), backend=default_backend() + ) + decryptor = cipher.decryptor() + decryptor.authenticate_additional_data(adata) + + try: + decrypted_data = decryptor.update(ciphertext) + decryptor.finalize() + return decrypted_data + except InvalidTag: + raise ValueError( + "Authentication tag verification failed. The data may be tampered." + ) + + +@dataclass +class Backup: + version: int + domain_key: bytes + data: dict[str, bytes] = field(default_factory=dict) + + +@dataclass +class EncryptedBackup: + version: int + salt: bytes + encrypted_version: bytes + encrypted_domain_key: bytes + encrypted_data: list[bytes] = field(default_factory=list) + + @classmethod + def parse(cls, data: bytes) -> "EncryptedBackup": + header = b"_NETHSM_BACKUP_" + header_len = len(header) + if len(data) < header_len + 1 or data[:header_len] != header: + raise ValueError("Data does not contain a NetHSM header") + version = data[header_len] + data = data[header_len + 1 :] + + if version != 0: + raise ValueError( + f"Version mismatch on export, provided backup version is {version}, this tool expects 0" + ) + + salt, data = _get_field(data) + encrypted_version, data = _get_field(data) + encrypted_domain_key, data = _get_field(data) + + backup = cls( + version=version, + salt=salt, + encrypted_version=encrypted_version, + encrypted_domain_key=encrypted_domain_key, + ) + + while data: + item, data = _get_field(data) + backup.encrypted_data.append(item) + + return backup + + def _key(self, passphrase: str) -> bytes: + return hashlib.scrypt( + password=passphrase.encode(), salt=self.salt, n=16384, r=8, p=1, dklen=32 + ) + + def decrypt(self, passphrase: str) -> Backup: + key = self._key(passphrase) + version_bytes = _decrypt(key, b"backup-version", self.encrypted_version) + if len(version_bytes) != 1: + raise ValueError(f"Overlong version: {version_bytes!r}") + version = version_bytes[0] + if version != self.version: + raise ValueError( + f"Internal and external version mismatch ({version} != {self.version})." + ) + domain_key = _decrypt(key, b"domain-key", self.encrypted_domain_key) + + backup = Backup(version=version, domain_key=domain_key) + + for item in self.encrypted_data: + key_value_pair = _decrypt(key, b"backup", item) + k, v = _get_field(key_value_pair) + backup.data[k.decode()] = v + + return backup diff --git a/pyproject.toml b/pyproject.toml index 483852c..3f1fdb3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ classifiers = ["License :: OSI Approved :: Apache Software License"] dynamic = ["version", "description"] dependencies = [ "certifi", + "cryptography >=41.0", "python-dateutil", "typing_extensions ~= 4.3.0", "urllib3 >=2.0,<2.1", @@ -59,24 +60,23 @@ extend_skip = ["nethsm/client"] [tool.mypy] show_error_codes = true +strict = true python_version = "3.9" -warn_unused_configs = true -warn_redundant_casts = true -# enable strict checks for the manually written code, see +# disable strict checks for the auto-generated code, see # - https://github.com/python/mypy/issues/11401 # - https://mypy.readthedocs.io/en/stable/existing_code.html#introduce-stricter-options [[tool.mypy.overrides]] -module = "nethsm" -check_untyped_defs = true -disallow_any_generics = true -disallow_incomplete_defs = true -disallow_subclassing_any = true -disallow_untyped_calls = true -disallow_untyped_decorators = true -disallow_untyped_defs = true -no_implicit_reexport = true -strict_concatenate = true -strict_equality = true -warn_unused_ignores = true -warn_return_any = true +module = "nethsm.client.*" +check_untyped_defs = false +disallow_any_generics = false +disallow_incomplete_defs = false +disallow_subclassing_any = false +disallow_untyped_calls = false +disallow_untyped_decorators = false +disallow_untyped_defs = false +no_implicit_reexport = false +strict_concatenate = false +strict_equality = false +warn_unused_ignores = false +warn_return_any = false diff --git a/tests/test_nethsm_system.py b/tests/test_nethsm_system.py index ec33aa4..4aaffaa 100644 --- a/tests/test_nethsm_system.py +++ b/tests/test_nethsm_system.py @@ -17,6 +17,8 @@ update, ) +from nethsm.backup import Backup, EncryptedBackup + """######################### Preparation for the Tests ######################### To run these test on Ubuntu like systems in Terminal you need sudo rights. @@ -67,6 +69,8 @@ def test_passphrase_add_user_retrieve_backup(nethsm): if os.path.exists(C.FILENAME_BACKUP): os.remove(C.FILENAME_BACKUP) data = nethsm.backup() + backup = EncryptedBackup.parse(data).decrypt(C.BACKUP_PASSPHRASE) + assert f"/key/{C.KEY_ID_GENERATED}" in backup.data try: with open(C.FILENAME_BACKUP, "xb") as f: f.write(data)