diff --git a/hivemind_bus_client/client.py b/hivemind_bus_client/client.py index 0ecfcb4..e68e4b5 100644 --- a/hivemind_bus_client/client.py +++ b/hivemind_bus_client/client.py @@ -1,7 +1,7 @@ import json import ssl from threading import Event -from typing import Union, Optional, Callable +from typing import Union, Optional, Callable, Literal import pybase64 from Cryptodome.PublicKey import RSA @@ -17,8 +17,8 @@ from hivemind_bus_client.message import HiveMessage, HiveMessageType from hivemind_bus_client.serialization import HiveMindBinaryPayloadType from hivemind_bus_client.serialization import get_bitstring, decode_bitstring -from hivemind_bus_client.util import serialize_message, \ - encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin +from hivemind_bus_client.util import serialize_message +from hivemind_bus_client.encryption import encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin, JsonCiphers, BinaryCiphers from poorman_handshake.asymmetric.utils import encrypt_RSA, load_RSA_key, sign_RSA @@ -102,8 +102,13 @@ def __init__(self, key: Optional[str] = None, binarize: bool = True, identity: NodeIdentity = None, internal_bus: Optional[OVOSBusClient] = None, - bin_callbacks: BinaryDataCallbacks = BinaryDataCallbacks()): + bin_callbacks: BinaryDataCallbacks = BinaryDataCallbacks(), + json_cipher: Literal[JsonCiphers] = JsonCiphers.JSON_HEX_AES_GCM_128, # TODO - default to b64 at some point + bin_cipher: Literal[BinaryCiphers] = BinaryCiphers.BINARY_AES_GCM_128, # TODO - ChaCha20 if certain hardware detected + ): self.bin_callbacks = bin_callbacks + self.json_cipher = json_cipher + self.bin_cipher = bin_cipher self.identity = identity or None self._password = password @@ -271,11 +276,11 @@ def on_message(self, *args): if self.crypto_key: # handle binary encryption if isinstance(message, bytes): - message = decrypt_bin(self.crypto_key, message) + message = decrypt_bin(self.crypto_key, message, cipher=self.bin_cipher) # handle json encryption elif "ciphertext" in message: # LOG.debug(f"got encrypted message: {len(message)}") - message = decrypt_from_json(self.crypto_key, message) + message = decrypt_from_json(self.crypto_key, message, cipher=self.json_cipher) else: LOG.debug("Message was unencrypted") @@ -367,14 +372,14 @@ def emit(self, message: Union[MycroftMessage, HiveMessage], binary_type=binary_type, hivemeta=message.metadata) if self.crypto_key: - ws_payload = encrypt_bin(self.crypto_key, bitstr.bytes) + ws_payload = encrypt_bin(self.crypto_key, bitstr.bytes, cipher=self.bin_cipher) else: ws_payload = bitstr.bytes self.client.send(ws_payload, ABNF.OPCODE_BINARY) else: ws_payload = serialize_message(message) if self.crypto_key: - ws_payload = encrypt_as_json(self.crypto_key, ws_payload) + ws_payload = encrypt_as_json(self.crypto_key, ws_payload, cipher=self.json_cipher) self.client.send(ws_payload) except WebSocketConnectionClosedException: diff --git a/hivemind_bus_client/encryption.py b/hivemind_bus_client/encryption.py new file mode 100644 index 0000000..b1c4ea6 --- /dev/null +++ b/hivemind_bus_client/encryption.py @@ -0,0 +1,298 @@ +import enum +import json +from binascii import hexlify, unhexlify +from typing import Union, Optional, Dict, Any + +import pybase64 +from Cryptodome.Cipher import AES, ChaCha20_Poly1305 + +from hivemind_bus_client.exceptions import EncryptionKeyError, DecryptionKeyError, InvalidCipher + + +class JsonCiphers(str, enum.Enum): + """ + Enum representing JSON-based encryption ciphers. + """ + JSON_B64_AES_GCM_128 = "JSON-B64-AES-GCM-128" # JSON text output with Base64 encoding + JSON_HEX_AES_GCM_128 = "JSON-HEX-AES-GCM-128" # JSON text output with Hex encoding + JSON_B64_CHACHA20_POLY1305 = "JSON-B64-CHACHA20-POLY1305" # JSON text output with Base64 encoding + JSON_HEX_CHACHA20_POLY1305 = "JSON-HEX-CHACHA20-POLY1305" # JSON text output with Hex encoding + + +class BinaryCiphers(str, enum.Enum): + """ + Enum representing binary encryption ciphers. + + Specifications: + - AES - http://csrc.nist.gov/publications/fips/fips197/fips-197.pdf + - GCM - http://csrc.nist.gov/publications/nistpubs/800-38D/SP-800-38D.pdf + - CHACHA20-POLY1305 - https://datatracker.ietf.org/doc/html/rfc7539 + """ + BINARY_AES_GCM_128 = "BINARY-AES-GCM-128" # Binary output + BINARY_CHACHA20_POLY1305 = "BINARY-CHACHA20-POLY1305" # specified in RFC7539. + + +class TextCiphers(str, enum.Enum): + """ + Enum representing text-based encryption ciphers. + """ + B64_AES_GCM_128 = "B64-AES-GCM-128" # Text output with Base64 encoding + HEX_AES_GCM_128 = "HEX-AES-GCM-128" # Text output with Hex encoding + B64_CHACHA20_POLY1305 = "B64-CHACHA20-POLY1305" # Text output with Base64 encoding + HEX_CHACHA20_POLY1305 = "HEX-CHACHA20-POLY1305" # Text output with Hex encoding + + +def encrypt_as_json(key: Union[str, bytes], data: Union[str, Dict[str, Any]], + cipher: JsonCiphers = JsonCiphers.JSON_B64_AES_GCM_128) -> str: + """ + Encrypts the given data and outputs it as a JSON string. + + Args: + key (Union[str, bytes]): The encryption key, up to 16 bytes. Longer keys will be truncated. + data (Union[str, Dict[str, Any]]): The data to encrypt. If a dictionary, it will be serialized to JSON. + cipher (JsonCiphers): The encryption cipher. Supported options: + - JSON-B64-AES-GCM-128: Outputs Base64-encoded JSON. + - JSON-HEX-AES-GCM-128: Outputs Hex-encoded JSON. + + Returns: + str: A JSON string containing the encrypted data, nonce, and tag. + + Raises: + InvalidCipher: If an unsupported cipher is provided. + """ + if cipher not in JsonCiphers: + raise InvalidCipher(f"Invalid JSON cipher: {str(cipher)}") + + if isinstance(data, dict): + data = json.dumps(data) + + aes_ciphers = {JsonCiphers.JSON_B64_AES_GCM_128, JsonCiphers.JSON_HEX_AES_GCM_128} + b64_ciphers = {JsonCiphers.JSON_B64_AES_GCM_128, JsonCiphers.JSON_B64_CHACHA20_POLY1305} + + bcipher = BinaryCiphers.BINARY_AES_GCM_128 if cipher in aes_ciphers else BinaryCiphers.BINARY_CHACHA20_POLY1305 + + ciphertext = encrypt_bin(key, data, cipher=bcipher) + + # TODO - adjust depending on cipher, sizes are different + nonce, ciphertext, tag = ciphertext[:16], ciphertext[16:-16], ciphertext[-16:] + + encoder = pybase64.b64encode if cipher in b64_ciphers else hexlify + + return json.dumps({ + "ciphertext": encoder(ciphertext).decode('utf-8'), + "tag": encoder(tag).decode('utf-8'), + "nonce": encoder(nonce).decode('utf-8') + }) + + +def decrypt_from_json(key: Union[str, bytes], data: Union[str, bytes], cipher: JsonCiphers) -> str: + """ + Decrypts data from a JSON string. + + Args: + key (Union[str, bytes]): The decryption key, up to 16 bytes. Longer keys will be truncated. + data (Union[str, bytes]): The encrypted data as a JSON string or bytes. + cipher (Optional[JsonCiphers]): The cipher used for encryption. If None, it is auto-detected. + + Returns: + str: The decrypted plaintext data. + + Raises: + InvalidCipher: If an unsupported cipher is provided. + DecryptionKeyError: If decryption fails due to an invalid key or corrupted data. + """ + + aes_ciphers = {JsonCiphers.JSON_B64_AES_GCM_128, JsonCiphers.JSON_HEX_AES_GCM_128} + b64_ciphers = {JsonCiphers.JSON_B64_AES_GCM_128, JsonCiphers.JSON_B64_CHACHA20_POLY1305} + + if isinstance(data, str): + data = json.loads(data) + + decoder = pybase64.b64decode if cipher in b64_ciphers else unhexlify + bcipher = BinaryCiphers.BINARY_AES_GCM_128 if cipher in aes_ciphers else BinaryCiphers.BINARY_CHACHA20_POLY1305 + + ciphertext = decoder(data["ciphertext"]) + if "tag" not in data: # web crypto compatibility + # TODO - adjust depending on cipher, sizes are different + ciphertext, tag = ciphertext[:-16], ciphertext[-16:] + else: + tag = decoder(data["tag"]) + nonce = decoder(data["nonce"]) + + decryptor = decrypt_AES_GCM_128 if bcipher == BinaryCiphers.BINARY_AES_GCM_128 else decrypt_ChaCha20_Poly1305 + try: + plaintext = decryptor(key, ciphertext, tag, nonce) + return plaintext.decode("utf-8") + except ValueError as e: + raise DecryptionKeyError from e + + +def encrypt_bin(key: Union[str, bytes], data: Union[str, bytes], cipher: BinaryCiphers) -> bytes: + """ + Encrypts the given data and returns it as binary. + + Args: + key (Union[str, bytes]): The encryption key, up to 16 bytes. Longer keys will be truncated. + data (Union[str, bytes]): The data to encrypt. Strings will be encoded as UTF-8. + cipher (BinaryCiphers): The encryption cipher. Only BINARY_AES_GCM_128 is supported. + + Returns: + bytes: The encrypted data, including the nonce and tag. + + Raises: + InvalidCipher: If an unsupported cipher is provided. + EncryptionKeyError: If encryption fails. + """ + if cipher not in BinaryCiphers: + raise InvalidCipher(f"Invalid binary cipher: {str(cipher)}") + + encryptor = encrypt_AES_GCM_128 if cipher == BinaryCiphers.BINARY_AES_GCM_128 else encrypt_ChaCha20_Poly1305 + try: + ciphertext, tag, nonce = encryptor(key, data) + except Exception as e: + raise EncryptionKeyError from e + + return nonce + ciphertext + tag + + +def decrypt_bin(key: Union[str, bytes], ciphertext: bytes, cipher: BinaryCiphers) -> bytes: + """ + Decrypts binary data. + + Args: + key (Union[str, bytes]): The decryption key, up to 16 bytes. Longer keys will be truncated. + ciphertext (bytes): The binary encrypted data. Must include nonce and tag. + cipher (BinaryCiphers): The cipher used for encryption. Only BINARY_AES_GCM_128 is supported. + + Returns: + bytes: The decrypted plaintext data. + + Raises: + InvalidCipher: If an unsupported cipher is provided. + DecryptionKeyError: If decryption fails due to an invalid key or corrupted data. + """ + if cipher not in BinaryCiphers: + raise InvalidCipher(f"Invalid binary cipher: {str(cipher)}") + + # TODO - adjust depending on cipher, sizes are different + nonce, ciphertext, tag = ciphertext[:16], ciphertext[16:-16], ciphertext[-16:] + + decryptor = decrypt_AES_GCM_128 if cipher == BinaryCiphers.BINARY_AES_GCM_128 else decrypt_ChaCha20_Poly1305 + try: + return decryptor(key, ciphertext, tag, nonce) + except ValueError as e: + raise DecryptionKeyError from e + + +############################# +# Cipher Implementations +def encrypt_AES_GCM_128(key: Union[str, bytes], text: Union[str, bytes], + nonce: Optional[bytes] = None) -> tuple[bytes, bytes, bytes]: + """ + Encrypts plaintext using AES-GCM-128. + + Args: + key (Union[str, bytes]): The encryption key. Strings will be encoded as UTF-8. + text (Union[str, bytes]): The plaintext to encrypt. + nonce (Optional[bytes]): An optional nonce. If None, a new one is generated. + + Returns: + tuple[bytes, bytes, bytes]: The ciphertext, authentication tag, and nonce. + """ + if not isinstance(text, bytes): + text = bytes(text, encoding="utf-8") + if not isinstance(key, bytes): + key = bytes(key, encoding="utf-8") + cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) + ciphertext, tag = cipher.encrypt_and_digest(text) + return ciphertext, tag, cipher.nonce + + +def decrypt_AES_GCM_128(key: Union[str, bytes], ciphertext: bytes, tag: bytes, nonce: bytes) -> bytes: + """ + Decrypts ciphertext encrypted using AES-GCM-128. + + Args: + key (Union[str, bytes]): The decryption key. Strings will be encoded as UTF-8. + ciphertext (bytes): The encrypted ciphertext. + tag (bytes): The authentication tag. + nonce (bytes): The nonce used during encryption. + + Returns: + str: The decrypted plaintext. + + Raises: + ValueError: If decryption or authentication fails. + """ + if not isinstance(key, bytes): + key = bytes(key, encoding="utf-8") + cipher = AES.new(key, AES.MODE_GCM, nonce) + return cipher.decrypt_and_verify(ciphertext, tag) + + +def encrypt_ChaCha20_Poly1305(key: Union[str, bytes], + text: Union[str, bytes], + nonce: Optional[bytes] = None) -> tuple[bytes, bytes, bytes]: + """ + Encrypts plaintext using AES-GCM-128. + + Args: + key (Union[str, bytes]): The encryption key. Strings will be encoded as UTF-8. + text (Union[str, bytes]): The plaintext to encrypt. + nonce (Optional[bytes]): An optional nonce. If None, a new one is generated. + + Returns: + tuple[bytes, bytes, bytes]: The ciphertext, authentication tag, and nonce. + """ + if not isinstance(text, bytes): + text = bytes(text, encoding="utf-8") + if not isinstance(key, bytes): + key = bytes(key, encoding="utf-8") + assert len(key) == 32 # ChaCha20 uses 256 bit/32 byte keys + if nonce: + assert len(nonce) == 12 # 92bits/12bytes bytes per RFC7539 + cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce) + ciphertext, tag = cipher.encrypt_and_digest(text) + return ciphertext, tag, cipher.nonce + + +def decrypt_ChaCha20_Poly1305(key: Union[str, bytes], + ciphertext: bytes, + tag: bytes, + nonce: bytes) -> bytes: + """ + Decrypts ciphertext encrypted using AES-GCM-128. + + Args: + key (Union[str, bytes]): The decryption key. Strings will be encoded as UTF-8. + ciphertext (bytes): The encrypted ciphertext. + tag (bytes): The authentication tag. + nonce (bytes): The nonce used during encryption. + + Returns: + str: The decrypted plaintext. + + Raises: + ValueError: If decryption or authentication fails. + """ + if not isinstance(key, bytes): + key = bytes(key, encoding="utf-8") + + assert len(key) == 32 # ChaCha20 uses 256 bit/32 byte keys + if nonce: + assert len(nonce) == 12 # 92bits/12bytes per RFC7539 + cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce) + return cipher.decrypt_and_verify(ciphertext, tag) + + +if __name__ == "__main__": + from Cryptodome.Random import get_random_bytes + + print("JSON-B64-AES-GCM-128" == JsonCiphers.JSON_B64_AES_GCM_128) + + key = get_random_bytes(32) + plaintext = b'Attack at dawn' + ciphertext, tag, nonce = encrypt_ChaCha20_Poly1305(key, plaintext) + recovered = decrypt_ChaCha20_Poly1305(key, ciphertext, tag, nonce) + print(recovered) + assert recovered == plaintext diff --git a/hivemind_bus_client/exceptions.py b/hivemind_bus_client/exceptions.py index 9beeedc..c767563 100644 --- a/hivemind_bus_client/exceptions.py +++ b/hivemind_bus_client/exceptions.py @@ -11,6 +11,10 @@ class UnauthorizedKeyError(HiveMindException): """ Invalid Key provided """ +class InvalidCipher(HiveMindException): + """unknown encryption scheme requested""" + + class WrongEncryptionKey(HiveMindException): """ Wrong Encryption Key""" diff --git a/hivemind_bus_client/protocol.py b/hivemind_bus_client/protocol.py index 0fc1aae..9be4bb8 100644 --- a/hivemind_bus_client/protocol.py +++ b/hivemind_bus_client/protocol.py @@ -9,6 +9,7 @@ from ovos_utils.log import LOG from hivemind_bus_client.client import HiveMessageBusClient +from hivemind_bus_client.encryption import JsonCiphers, BinaryCiphers from hivemind_bus_client.identity import NodeIdentity from hivemind_bus_client.message import HiveMessage, HiveMessageType from poorman_handshake import HandShake, PasswordHandShake @@ -143,14 +144,15 @@ def start_handshake(self): else: LOG.info("hivemind does not support binarization protocol") + payload = {"binarize": self.binarize, + "json_ciphers": list(c for c in JsonCiphers), + "binary_ciphers": list(c for c in BinaryCiphers)} if self.pswd_handshake is not None: - envelope = self.pswd_handshake.generate_handshake() - msg = HiveMessage(HiveMessageType.HANDSHAKE, {"envelope": envelope, - "binarize": self.binarize}) + payload["envelope"] = self.pswd_handshake.generate_handshake() else: - msg = HiveMessage(HiveMessageType.HANDSHAKE, {"pubkey": self.handshake.pubkey, - "binarize": self.binarize}) - self.hm.emit(msg) + payload["pubkey"] = self.handshake.pubkey + + self.hm.emit(HiveMessage(HiveMessageType.HANDSHAKE, payload)) def receive_handshake(self, envelope): if self.pswd_handshake is not None: diff --git a/hivemind_bus_client/util.py b/hivemind_bus_client/util.py index 96bcc33..ff0cd26 100644 --- a/hivemind_bus_client/util.py +++ b/hivemind_bus_client/util.py @@ -1,12 +1,7 @@ import json import zlib -from binascii import hexlify, unhexlify from typing import Union, Dict -import pybase64 -from ovos_utils.security import encrypt, decrypt, AES - -from hivemind_bus_client.exceptions import EncryptionKeyError, DecryptionKeyError from hivemind_bus_client.message import HiveMessage, HiveMessageType, Message @@ -102,87 +97,6 @@ def get_mycroft_msg(pload: Union[HiveMessage, str, Dict]) -> Message: return pload -def encrypt_as_json(key, data, b64=False) -> str: - # TODO default b64 to True in a future release - # we dont want clients to update before servers, otherwise servers won't be able to decode - # after a reasonable time all servers should support decoding both schemes and the default can change - if isinstance(data, dict): - data = json.dumps(data) - if len(key) > 16: - key = key[0:16] - ciphertext = encrypt_bin(key, data) - nonce, ciphertext, tag = ciphertext[:16], ciphertext[16:-16], ciphertext[-16:] - if b64: - return json.dumps({"ciphertext": pybase64.b64encode(ciphertext).decode('utf-8'), - "tag": pybase64.b64encode(tag).decode('utf-8'), - "nonce": pybase64.b64encode(nonce).decode('utf-8')}) - return json.dumps({"ciphertext": hexlify(ciphertext).decode('utf-8'), - "tag": hexlify(tag).decode('utf-8'), - "nonce": hexlify(nonce).decode('utf-8')}) - - -def decrypt_from_json(key, data: Union[str, bytes]): - if isinstance(data, str): - data = json.loads(data) - if len(key) > 16: - key = key[0:16] - - # payloads can be either hex encoded (old style) - # or b64 encoded (new style) - def decode(b64=False): - if b64: - decoder = pybase64.b64decode - else: - decoder = unhexlify - - ciphertext = decoder(data["ciphertext"]) - if data.get("tag") is None: # web crypto - ciphertext, tag = ciphertext[:-16], ciphertext[-16:] - else: - tag = decoder(data["tag"]) - nonce = decoder(data["nonce"]) - return ciphertext, tag, nonce - - is_b64 = any(a.isupper() for a in str(data)) # if any letter is uppercase, it must be b64 - ciphertext, tag, nonce = decode(is_b64) - try: - return decrypt(key, ciphertext, tag, nonce) - except ValueError as e: - if not is_b64: - try: # maybe it was b64 after all? unlikely but.... - ciphertext, tag, nonce = decode(b64=True) - return decrypt(key, ciphertext, tag, nonce) - except ValueError: - pass - raise DecryptionKeyError from e - - -def encrypt_bin(key, data: Union[str, bytes]): - if len(key) > 16: - key = key[0:16] - try: - ciphertext, tag, nonce = encrypt(key, data) - except: - raise EncryptionKeyError - - return nonce + ciphertext + tag - - -def decrypt_bin(key, ciphertext: bytes): - if len(key) > 16: - key = key[0:16] - - nonce, ciphertext, tag = ciphertext[:16], ciphertext[16:-16], ciphertext[-16:] - - try: - if not isinstance(key, bytes): - key = bytes(key, encoding="utf-8") - cipher = AES.new(key, AES.MODE_GCM, nonce) - return cipher.decrypt_and_verify(ciphertext, tag) - except ValueError: - raise DecryptionKeyError - - def compress_payload(text: Union[str, bytes]) -> bytes: # Compressing text if isinstance(text, str): @@ -192,14 +106,7 @@ def compress_payload(text: Union[str, bytes]) -> bytes: return zlib.compress(decompressed) -def decompress_payload(compressed: Union[str, bytes]) -> bytes: - # Decompressing text - if isinstance(compressed, str): # we really should be getting bytes here and not a str - if any(a.isupper() for a in compressed): - decoder = pybase64.b64decode - else: # assume hex - decoder = unhexlify - compressed = decoder(compressed) +def decompress_payload(compressed: bytes) -> bytes: return zlib.decompress(compressed) @@ -221,6 +128,62 @@ def bytes2str(payload: bytes, compressed=False) -> str: return payload.decode("utf-8") +############### +# deprecated +import warnings + + +def encrypt_as_json(key, data, b64=False) -> str: + warnings.warn( + "encrypt_as_json is deprecated and will be removed in future versions. " + "Use 'from hivemind_bus_client.encryption import encrypt_as_json' instead", + DeprecationWarning, + stacklevel=2 + ) + from hivemind_bus_client.encryption import encrypt_as_json as _ej, JsonCiphers + c = JsonCiphers.JSON_B64_AES_GCM_128 if b64 else JsonCiphers.JSON_HEX_AES_GCM_128 + return _ej(key, data, c) + + +def decrypt_from_json(key, data: Union[str, bytes]): + warnings.warn( + "decrypt_from_json is deprecated and will be removed in future versions. " + "Use 'from hivemind_bus_client.encryption import decrypt_from_json' instead", + DeprecationWarning, + stacklevel=2 + ) + from hivemind_bus_client.encryption import decrypt_from_json as _dj, JsonCiphers + try: + return _dj(key, data, JsonCiphers.JSON_HEX_AES_GCM_128) + except Exception as e: + try: + return _dj(key, data, JsonCiphers.JSON_B64_AES_GCM_128) + except: + raise e + + +def encrypt_bin(key, data: Union[str, bytes]): + warnings.warn( + "encrypt_bin is deprecated and will be removed in future versions. " + "Use 'from hivemind_bus_client.encryption import encrypt_bin' instead", + DeprecationWarning, + stacklevel=2 + ) + from hivemind_bus_client.encryption import encrypt_bin as _eb, BinaryCiphers + return _eb(key, data, BinaryCiphers.BINARY_AES_GCM_128) + + +def decrypt_bin(key, ciphertext: bytes): + warnings.warn( + "decrypt_bin is deprecated and will be removed in future versions. " + "Use 'from hivemind_bus_client.encryption import decrypt_bin' instead", + DeprecationWarning, + stacklevel=2 + ) + from hivemind_bus_client.encryption import decrypt_bin as _db, BinaryCiphers + return _db(key, ciphertext, BinaryCiphers.BINARY_AES_GCM_128) + + if __name__ == "__main__": k = "*" * 16 test = "this is a test text for checking size of encryption and stuff"