Skip to content

Commit

Permalink
Add backup validation (#52)
Browse files Browse the repository at this point in the history
This patch adds the Backup and EncryptedBackup classes.
EncryptedBackup.parse can be used to parse and validate the format of a
backup file.  EncryptedBackup.decrypt can be used to decrypt a backup
and validate the decrypted data.

This also adds validation of the backup file to the unit tests.

Fixes: #49
  • Loading branch information
robin-nitrokey authored Oct 27, 2023
1 parent bd090a7 commit ea83d38
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 16 deletions.
117 changes: 117 additions & 0 deletions nethsm/backup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Based on export_backup.py from the NetHSM source by Sven Anderson <sven@anderson.de>

import hashlib
import struct
from dataclasses import dataclass, field

from cryptography.exceptions import InvalidTag
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes


def _get_length(data: bytes) -> tuple[int, bytes]:
if len(data) < 3:
raise ValueError("Failed to read field length: unexpected EOF")
high, low = struct.unpack(">B H", data[:3])
return ((high << 16) + low, data[3:])


def _get_field(data: bytes) -> tuple[bytes, bytes]:
n, data = _get_length(data)
if len(data) < n:
raise ValueError("Failed to extract field: unexpected EOF")
return data[:n], data[n:]


def _decrypt(key: bytes, adata: bytes, data: bytes) -> bytes:
iv_size = 12
ciphertext = data[iv_size:-16]
tag = data[-16:]
nonce = data[:iv_size]

cipher = Cipher(
algorithms.AES(key), modes.GCM(nonce, tag), backend=default_backend()
)
decryptor = cipher.decryptor()
decryptor.authenticate_additional_data(adata)

try:
decrypted_data = decryptor.update(ciphertext) + decryptor.finalize()
return decrypted_data
except InvalidTag:
raise ValueError(
"Authentication tag verification failed. The data may be tampered."
)


@dataclass
class Backup:
version: int
domain_key: bytes
data: dict[str, bytes] = field(default_factory=dict)


@dataclass
class EncryptedBackup:
version: int
salt: bytes
encrypted_version: bytes
encrypted_domain_key: bytes
encrypted_data: list[bytes] = field(default_factory=list)

@classmethod
def parse(cls, data: bytes) -> "EncryptedBackup":
header = b"_NETHSM_BACKUP_"
header_len = len(header)
if len(data) < header_len + 1 or data[:header_len] != header:
raise ValueError("Data does not contain a NetHSM header")
version = data[header_len]
data = data[header_len + 1 :]

if version != 0:
raise ValueError(
f"Version mismatch on export, provided backup version is {version}, this tool expects 0"
)

salt, data = _get_field(data)
encrypted_version, data = _get_field(data)
encrypted_domain_key, data = _get_field(data)

backup = cls(
version=version,
salt=salt,
encrypted_version=encrypted_version,
encrypted_domain_key=encrypted_domain_key,
)

while data:
item, data = _get_field(data)
backup.encrypted_data.append(item)

return backup

def _key(self, passphrase: str) -> bytes:
return hashlib.scrypt(
password=passphrase.encode(), salt=self.salt, n=16384, r=8, p=1, dklen=32
)

def decrypt(self, passphrase: str) -> Backup:
key = self._key(passphrase)
version_bytes = _decrypt(key, b"backup-version", self.encrypted_version)
if len(version_bytes) != 1:
raise ValueError(f"Overlong version: {version_bytes!r}")
version = version_bytes[0]
if version != self.version:
raise ValueError(
f"Internal and external version mismatch ({version} != {self.version})."
)
domain_key = _decrypt(key, b"domain-key", self.encrypted_domain_key)

backup = Backup(version=version, domain_key=domain_key)

for item in self.encrypted_data:
key_value_pair = _decrypt(key, b"backup", item)
k, v = _get_field(key_value_pair)
backup.data[k.decode()] = v

return backup
32 changes: 16 additions & 16 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ classifiers = ["License :: OSI Approved :: Apache Software License"]
dynamic = ["version", "description"]
dependencies = [
"certifi",
"cryptography >=41.0",
"python-dateutil",
"typing_extensions ~= 4.3.0",
"urllib3 >=2.0,<2.1",
Expand Down Expand Up @@ -59,24 +60,23 @@ extend_skip = ["nethsm/client"]

[tool.mypy]
show_error_codes = true
strict = true
python_version = "3.9"
warn_unused_configs = true
warn_redundant_casts = true

# enable strict checks for the manually written code, see
# disable strict checks for the auto-generated code, see
# - https://github.com/python/mypy/issues/11401
# - https://mypy.readthedocs.io/en/stable/existing_code.html#introduce-stricter-options
[[tool.mypy.overrides]]
module = "nethsm"
check_untyped_defs = true
disallow_any_generics = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_reexport = true
strict_concatenate = true
strict_equality = true
warn_unused_ignores = true
warn_return_any = true
module = "nethsm.client.*"
check_untyped_defs = false
disallow_any_generics = false
disallow_incomplete_defs = false
disallow_subclassing_any = false
disallow_untyped_calls = false
disallow_untyped_decorators = false
disallow_untyped_defs = false
no_implicit_reexport = false
strict_concatenate = false
strict_equality = false
warn_unused_ignores = false
warn_return_any = false
4 changes: 4 additions & 0 deletions tests/test_nethsm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
update,
)

from nethsm.backup import Backup, EncryptedBackup

"""######################### Preparation for the Tests #########################
To run these test on Ubuntu like systems in Terminal you need sudo rights.
Expand Down Expand Up @@ -67,6 +69,8 @@ def test_passphrase_add_user_retrieve_backup(nethsm):
if os.path.exists(C.FILENAME_BACKUP):
os.remove(C.FILENAME_BACKUP)
data = nethsm.backup()
backup = EncryptedBackup.parse(data).decrypt(C.BACKUP_PASSPHRASE)
assert f"/key/{C.KEY_ID_GENERATED}" in backup.data
try:
with open(C.FILENAME_BACKUP, "xb") as f:
f.write(data)
Expand Down

0 comments on commit ea83d38

Please sign in to comment.