diff --git a/hivemind_bus_client/client.py b/hivemind_bus_client/client.py index 0ecfcb4..e8d1a6c 100644 --- a/hivemind_bus_client/client.py +++ b/hivemind_bus_client/client.py @@ -17,8 +17,9 @@ from hivemind_bus_client.message import HiveMessage, HiveMessageType from hivemind_bus_client.serialization import HiveMindBinaryPayloadType from hivemind_bus_client.serialization import get_bitstring, decode_bitstring -from hivemind_bus_client.util import serialize_message, \ - encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin +from hivemind_bus_client.util import serialize_message +from hivemind_bus_client.encryption import (encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin, + SupportedEncodings, SupportedCiphers) from poorman_handshake.asymmetric.utils import encrypt_RSA, load_RSA_key, sign_RSA @@ -104,6 +105,8 @@ def __init__(self, key: Optional[str] = None, internal_bus: Optional[OVOSBusClient] = None, bin_callbacks: BinaryDataCallbacks = BinaryDataCallbacks()): self.bin_callbacks = bin_callbacks + self.json_encoding = SupportedEncodings.JSON_HEX # server defaults before it was made configurable + self.cipher = SupportedCiphers.AES_GCM # server defaults before it was made configurable self.identity = identity or None self._password = password @@ -271,11 +274,12 @@ def on_message(self, *args): if self.crypto_key: # handle binary encryption if isinstance(message, bytes): - message = decrypt_bin(self.crypto_key, message) + message = decrypt_bin(self.crypto_key, message, cipher=self.cipher) # handle json encryption elif "ciphertext" in message: # LOG.debug(f"got encrypted message: {len(message)}") - message = decrypt_from_json(self.crypto_key, message) + message = decrypt_from_json(self.crypto_key, message, + cipher=self.cipher, encoding=self.json_encoding) else: LOG.debug("Message was unencrypted") @@ -367,14 +371,15 @@ def emit(self, message: Union[MycroftMessage, HiveMessage], binary_type=binary_type, hivemeta=message.metadata) if self.crypto_key: - ws_payload = encrypt_bin(self.crypto_key, bitstr.bytes) + ws_payload = encrypt_bin(self.crypto_key, bitstr.bytes, cipher=self.cipher) else: ws_payload = bitstr.bytes self.client.send(ws_payload, ABNF.OPCODE_BINARY) else: ws_payload = serialize_message(message) if self.crypto_key: - ws_payload = encrypt_as_json(self.crypto_key, ws_payload) + ws_payload = encrypt_as_json(self.crypto_key, ws_payload, + cipher=self.cipher, encoding=self.json_encoding) self.client.send(ws_payload) except WebSocketConnectionClosedException: diff --git a/hivemind_bus_client/encryption.py b/hivemind_bus_client/encryption.py new file mode 100644 index 0000000..61c9ac3 --- /dev/null +++ b/hivemind_bus_client/encryption.py @@ -0,0 +1,487 @@ +import enum +import json +from binascii import hexlify, unhexlify +from typing import Union, Optional, Dict, Any, Literal, List, Callable + +import pybase64 +from hivemind_bus_client.z85b import Z85B +from Cryptodome.Cipher import AES, ChaCha20_Poly1305 +from cpuinfo import get_cpu_info + +from hivemind_bus_client.exceptions import EncryptionKeyError, DecryptionKeyError, InvalidEncoding, InvalidCipher, \ + InvalidKeySize + +# Cipher-specific constants +AES_KEY_SIZES = [16, 24, 32] # poorman_handshake generates 32 bit secrets +AES_NONCE_SIZE = 16 +AES_TAG_SIZE = 16 +CHACHA20_KEY_SIZE = 32 +CHACHA20_NONCE_SIZE = 12 +CHACHA20_TAG_SIZE = 16 + + +def cpu_supports_AES() -> bool: + """ + Check if the CPU supports AES encryption. + + This function checks the CPU flags to determine if the hardware supports + AES encryption. It does so by querying the CPU information and checking + if the 'aes' flag is present. + + Returns: + bool: True if AES is supported by the CPU, False otherwise. + """ + return "aes" in get_cpu_info()["flags"] + + +class SupportedEncodings(str, enum.Enum): + """ + Enum representing JSON-based encryption encodings. + + Ciphers output binary data, and JSON needs to transmit that data as plaintext. + The supported encodings include Base64 and Hex encoding. + """ + JSON_Z85B = "JSON-Z85B" # JSON text output with Z85B encoding + JSON_B64 = "JSON-B64" # JSON text output with Base64 encoding + JSON_HEX = "JSON-HEX" # JSON text output with Hex encoding + + +def get_encoder(encoding: SupportedEncodings) -> Callable[[bytes], bytes]: + encoding = _norm_encoding(encoding) + if encoding == SupportedEncodings.JSON_B64: + return pybase64.b64encode + if encoding == SupportedEncodings.JSON_HEX: + return hexlify + if encoding == SupportedEncodings.JSON_Z85B: + return Z85B.encode + raise InvalidEncoding(f"Invalid encoding: {encoding}") + + +def get_decoder(encoding: SupportedEncodings) -> Callable[[bytes], bytes]: + encoding = _norm_encoding(encoding) + if encoding == SupportedEncodings.JSON_B64: + return pybase64.b64decode + if encoding == SupportedEncodings.JSON_HEX: + return unhexlify + if encoding == SupportedEncodings.JSON_Z85B: + return Z85B.decode + raise InvalidEncoding(f"Invalid encoding: {encoding}") + + +class SupportedCiphers(str, enum.Enum): + """ + Enum representing binary encryption ciphers. + + Specifications: + - AES - http://csrc.nist.gov/publications/fips/fips197/fips-197.pdf + - GCM - http://csrc.nist.gov/publications/nistpubs/800-38D/SP-800-38D.pdf + - CHACHA20-POLY1305 - https://datatracker.ietf.org/doc/html/rfc7539 + """ + AES_GCM = "AES-GCM" + CHACHA20_POLY1305 = "CHACHA20-POLY1305" # specified in RFC7539. + + +AES_CIPHERS = {c for c in SupportedCiphers if "AES" in c} +BLOCK_CIPHERS = AES_CIPHERS # Blowfish etc can be added in the future + + +def optimal_ciphers() -> List[SupportedCiphers]: + """ + Determine the optimal ciphers based on CPU support. + + This function checks if the CPU supports AES encryption. If it does, it + returns a list of ciphers with AES first, followed by other supported ciphers. + If AES is not supported, the function returns a list of ciphers with + ChaCha20-Poly1305 first. + + Returns: + List[SupportedCiphers]: A list of optimal ciphers based on CPU support. + """ + if not cpu_supports_AES(): + return [SupportedCiphers.CHACHA20_POLY1305, SupportedCiphers.AES_GCM] + return [SupportedCiphers.AES_GCM, SupportedCiphers.CHACHA20_POLY1305] + + +def _norm_cipher(cipher: Union[SupportedCiphers, str]) -> SupportedCiphers: + """ + Normalize a cipher to an enum member. + + This function takes either a cipher string or an enum member and ensures it + is converted to the corresponding enum member of SupportedCiphers. If the input + is invalid, an InvalidCipher exception is raised. + + Args: + cipher (Union[SupportedCiphers, str]): The cipher to normalize, either a string or an enum member. + + Returns: + SupportedCiphers: The corresponding enum member of SupportedCiphers. + + Raises: + InvalidCipher: If the cipher is invalid. + """ + if isinstance(cipher, SupportedCiphers): + return cipher # If already an enum member, just return it + + # Convert string to enum member by matching the value + for member in SupportedCiphers: + if member.value == cipher: + return member + + raise InvalidCipher(f"Invalid cipher: {cipher}") + + +def _norm_encoding(encoding: Union[SupportedEncodings, str]) -> SupportedEncodings: + """ + Normalize an encoding to an enum member. + + This function takes either an encoding string or an enum member and ensures it + is converted to the corresponding enum member of SupportedEncodings. If the input + is invalid, an InvalidEncoding exception is raised. + + Args: + encoding (Union[SupportedEncodings, str]): The encoding to normalize, either a string or an enum member. + + Returns: + SupportedEncodings: The corresponding enum member of SupportedEncodings. + + Raises: + InvalidEncoding: If the encoding is invalid. + """ + if isinstance(encoding, SupportedEncodings): + return encoding # If already an enum member, just return it + + # Convert string to enum member by matching the value + for member in SupportedEncodings: + if member.value == encoding: + return member + + raise InvalidEncoding(f"Invalid JSON encoding: {encoding}") + + +def encrypt_as_json( + key: Union[str, bytes], + plaintext: Union[str, Dict[str, Any]], + cipher: Union[SupportedCiphers, str] = SupportedCiphers.AES_GCM, + encoding: Union[SupportedEncodings, str] = SupportedEncodings.JSON_B64 +) -> str: + """ + Encrypts the given data and outputs it as a JSON string. + + Args: + key (Union[str, bytes]): The encryption key, up to 16 bytes. Longer keys will be truncated. + plaintext (Union[str, Dict[str, Any]]): The data to encrypt. If a dictionary, it will be serialized to JSON. + cipher (Union[SupportedCiphers, str]): The encryption cipher. Supported options: + - AES-GCM (default) + encoding (Union[SupportedEncodings, str]): The encoding type for JSON. Supported options: + - JSON-B64 (default) + + Returns: + str: A JSON string containing the encrypted data, nonce, and tag. + + Raises: + InvalidCipher: If an unsupported cipher is provided. + InvalidEncoding: If an unsupported encoding is provided. + """ + + cipher = _norm_cipher(cipher) + encoding = _norm_encoding(encoding) + + # If plaintext is a dictionary, convert it to a JSON string + if isinstance(plaintext, dict): + plaintext = json.dumps(plaintext) + + try: + ciphertext = encrypt_bin(key=key, plaintext=plaintext, cipher=cipher) + except InvalidKeySize as e: + raise e + except Exception as e: + raise EncryptionKeyError from e + + # Extract nonce/tag depending on cipher, sizes are different + if cipher in AES_CIPHERS: + nonce, ciphertext, tag = ( + ciphertext[:AES_NONCE_SIZE], + ciphertext[AES_NONCE_SIZE:-AES_TAG_SIZE], + ciphertext[-AES_TAG_SIZE:] + ) + else: + nonce, ciphertext, tag = ( + ciphertext[:CHACHA20_NONCE_SIZE], + ciphertext[CHACHA20_NONCE_SIZE:-CHACHA20_TAG_SIZE], + ciphertext[-CHACHA20_TAG_SIZE:] + ) + + # Choose encoder based on the encoding + encoder = get_encoder(encoding) + + # Return the JSON-encoded ciphertext, tag, and nonce + return json.dumps({ + "ciphertext": encoder(ciphertext).decode('utf-8'), + "tag": encoder(tag).decode('utf-8'), + "nonce": encoder(nonce).decode('utf-8') + }) + + +def decrypt_from_json(key: Union[str, bytes], ciphertext_json: Union[str, bytes], + cipher: Union[SupportedCiphers, str] = SupportedCiphers.AES_GCM, + encoding: Union[SupportedEncodings, str] = SupportedEncodings.JSON_B64) -> str: + """ + Decrypts data from a JSON string. + + Args: + key (Union[str, bytes]): The decryption key, up to 16 bytes. Longer keys will be truncated. + ciphertext_json (Union[str, bytes]): The encrypted data as a JSON string or bytes. + cipher (SupportedEncodings): The cipher used for encryption. + + Returns: + str: The decrypted plaintext data. + + Raises: + InvalidCipher: If an unsupported cipher is provided. + InvalidEncoding: If an unsupported encoding is provided. + DecryptionKeyError: If decryption fails due to an invalid key or corrupted data. + """ + cipher = _norm_cipher(cipher) + encoding = _norm_encoding(encoding) + + if isinstance(ciphertext_json, str): + ciphertext_json = json.loads(ciphertext_json) + + decoder = get_decoder(encoding) + + ciphertext: bytes = decoder(ciphertext_json["ciphertext"]) + + if "tag" not in ciphertext_json: # web crypto compatibility + if cipher in AES_CIPHERS: + ciphertext, tag = ciphertext[:-AES_TAG_SIZE], ciphertext[-AES_TAG_SIZE:] + else: + ciphertext, tag = ciphertext[:-CHACHA20_TAG_SIZE], ciphertext[-CHACHA20_TAG_SIZE:] + else: + tag = decoder(ciphertext_json["tag"]) + nonce = decoder(ciphertext_json["nonce"]) + + try: + ciphertext = decrypt_bin(key=key, + ciphertext=nonce + ciphertext + tag, + cipher=cipher) + return ciphertext.decode("utf-8") + except InvalidKeySize as e: + raise e + except Exception as e: + raise DecryptionKeyError from e + + +def encrypt_bin(key: Union[str, bytes], plaintext: Union[str, bytes], cipher: Union[SupportedCiphers, str]) -> bytes: + """ + Encrypts the given data and returns it as binary. + + Args: + key (Union[str, bytes]): The encryption key, up to 16 bytes. Longer keys will be truncated. + plaintext (Union[str, bytes]): The data to encrypt. Strings will be encoded as UTF-8. + cipher (SupportedCiphers): The encryption cipher. Supported options: + - AES_GCM: AES-GCM with 128-bit/256-bit key + - CHACHA20_POLY1305: ChaCha20-Poly1305 with 256-bit key + + Returns: + bytes: The encrypted data, including the nonce and tag. + + Raises: + InvalidCipher: If an unsupported cipher is provided. + InvalidKeySize: If an invalid key size is provided. + """ + cipher = _norm_cipher(cipher) + + encryptor = encrypt_AES if cipher in AES_CIPHERS else encrypt_ChaCha20_Poly1305 + + try: + if cipher in BLOCK_CIPHERS: + if cipher == SupportedCiphers.AES_GCM: + mode = AES.MODE_GCM + else: + raise ValueError("invalid block cipher mode") + ciphertext, tag, nonce = encryptor(key, plaintext, mode=mode) + else: + ciphertext, tag, nonce = encryptor(key, plaintext) + except InvalidKeySize as e: + raise e + except Exception as e: + raise EncryptionKeyError from e + + return nonce + ciphertext + tag + + +def decrypt_bin(key: Union[str, bytes], ciphertext: Union[str, bytes], cipher: Union[SupportedCiphers, str]) -> bytes: + """ + Decrypts the given binary data. + + Args: + key (Union[str, bytes]): The decryption key, up to 16 bytes. Longer keys will be truncated. + ciphertext (Union[str, bytes]): The data to decrypt, including the nonce and tag. + cipher (SupportedCiphers): The cipher used for encryption. + + Returns: + bytes: The decrypted data. + + Raises: + InvalidCipher: If an unsupported cipher is provided. + DecryptionKeyError: If decryption fails due to an invalid key or corrupted data. + """ + cipher = _norm_cipher(cipher) + + # extract nonce/tag depending on cipher, sizes are different + if cipher == SupportedCiphers.AES_GCM: + nonce, ciphertext, tag = (ciphertext[:AES_NONCE_SIZE], + ciphertext[AES_NONCE_SIZE:-AES_TAG_SIZE], + ciphertext[-AES_TAG_SIZE:]) + else: + nonce, ciphertext, tag = (ciphertext[:CHACHA20_NONCE_SIZE], + ciphertext[CHACHA20_NONCE_SIZE:-CHACHA20_TAG_SIZE], + ciphertext[-CHACHA20_TAG_SIZE:]) + + decryptor = decrypt_AES_128 if cipher in AES_CIPHERS else decrypt_ChaCha20_Poly1305 + try: + if cipher in BLOCK_CIPHERS: + if cipher == SupportedCiphers.AES_GCM: + mode = AES.MODE_GCM + else: + raise ValueError("invalid block cipher mode") + return decryptor(key, ciphertext, tag, nonce, mode=mode) + return decryptor(key, ciphertext, tag, nonce) + except InvalidKeySize as e: + raise e + except Exception as e: + raise DecryptionKeyError from e + + +############################# +# Cipher Implementations +def encrypt_AES(key: Union[str, bytes], text: Union[str, bytes], + nonce: Optional[bytes] = None, + mode: Literal[AES.MODE_GCM] = AES.MODE_GCM) -> tuple[bytes, bytes, bytes]: + """ + Encrypts plaintext using AES-GCM-128. + + Args: + key (Union[str, bytes]): The encryption key. Strings will be encoded as UTF-8. + text (Union[str, bytes]): The plaintext to encrypt. + nonce (Optional[bytes]): An optional nonce. If None, a new one is generated. + + Returns: + tuple[bytes, bytes, bytes]: The ciphertext, authentication tag, and nonce. + """ + if not isinstance(text, bytes): + text = bytes(text, encoding="utf-8") + if not isinstance(key, bytes): + key = bytes(key, encoding="utf-8") + # AES-128 uses 128 bit/16 byte keys + # AES-256 uses 256 bit/32 byte keys + if len(key) not in AES_KEY_SIZES: + raise InvalidKeySize("AES-GCM requires a 16/24/32 bytes key") + cipher = AES.new(key, mode, nonce=nonce) + ciphertext, tag = cipher.encrypt_and_digest(text) + return ciphertext, tag, cipher.nonce + + +def decrypt_AES_128(key: Union[str, bytes], + ciphertext: bytes, + tag: bytes, + nonce: bytes, + mode: Literal[AES.MODE_GCM] = AES.MODE_GCM) -> bytes: + """ + Decrypts ciphertext encrypted using AES-GCM-128. + + Args: + key (Union[str, bytes]): The decryption key. Strings will be encoded as UTF-8. + ciphertext (bytes): The encrypted ciphertext. + tag (bytes): The authentication tag. + nonce (bytes): The nonce used during encryption. + + Returns: + str: The decrypted plaintext. + + Raises: + InvalidKeySize: If key size is not valid + ValueError: If decryption or authentication fails. + """ + if isinstance(key, str): + key = key.encode("utf-8") + # AES-128 uses 128 bit/16 byte keys + # AES-256 uses 256 bit/32 byte keys + if len(key) not in AES_KEY_SIZES: + raise InvalidKeySize("AES-GCM requires a 16/24/32 bytes key") + cipher = AES.new(key, mode, nonce) + return cipher.decrypt_and_verify(ciphertext, tag) + + +def encrypt_ChaCha20_Poly1305(key: Union[str, bytes], + text: Union[str, bytes], + nonce: Optional[bytes] = None) -> tuple[bytes, bytes, bytes]: + """ + Encrypts plaintext using ChaCha20-Poly1305. + + Args: + key (Union[str, bytes]): The encryption key. Strings will be encoded as UTF-8. + text (Union[str, bytes]): The plaintext to encrypt. + nonce (Optional[bytes]): An optional nonce. If None, a new one is generated. + + Returns: + tuple[bytes, bytes, bytes]: The ciphertext, authentication tag, and nonce. + """ + if isinstance(text, str): + text = text.encode("utf-8") + if isinstance(key, str): + key = key.encode("utf-8") + + if len(key) != CHACHA20_KEY_SIZE: # ChaCha20 uses 256 bit/32 byte keys + raise InvalidKeySize("CHACHA20-POLY1305 requires a 32-byte key") + if nonce: + if len(nonce) != CHACHA20_NONCE_SIZE: # 92bits/12bytes per RFC7539 + raise InvalidKeySize("CHACHA20-POLY1305 requires a 12-byte nonce per RFC7539") + cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce) + ciphertext, tag = cipher.encrypt_and_digest(text) + return ciphertext, tag, cipher.nonce + + +def decrypt_ChaCha20_Poly1305(key: Union[str, bytes], + ciphertext: bytes, + tag: bytes, + nonce: bytes) -> bytes: + """ + Decrypts ciphertext encrypted using AES-GCM-128. + + Args: + key (Union[str, bytes]): The decryption key. Strings will be encoded as UTF-8. + ciphertext (bytes): The encrypted ciphertext. + tag (bytes): The authentication tag. + nonce (bytes): The nonce used during encryption. + + Returns: + str: The decrypted plaintext. + + Raises: + InvalidKeySize: + ValueError: If decryption or authentication fails. + """ + if isinstance(key, str): + key = key.encode("utf-8") + + if len(key) != CHACHA20_KEY_SIZE: # ChaCha20 uses 256 bit/32 byte keys + raise InvalidKeySize("CHACHA20-POLY1305 requires a 32-byte key") + if nonce: + if len(nonce) != CHACHA20_NONCE_SIZE: # 92bits/12bytes per RFC7539 + raise InvalidKeySize("CHACHA20-POLY1305 requires a 12-byte nonce per RFC7539") + cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce) + return cipher.decrypt_and_verify(ciphertext, tag) + + +if __name__ == "__main__": + from Cryptodome.Random import get_random_bytes + + print("JSON-B64" == SupportedEncodings.JSON_B64) + + key = get_random_bytes(CHACHA20_KEY_SIZE) + plaintext = b'Attack at dawn' + ciphertext, tag, nonce = encrypt_ChaCha20_Poly1305(key, plaintext) + recovered = decrypt_ChaCha20_Poly1305(key, ciphertext, tag, nonce) + print(recovered) + assert recovered == plaintext diff --git a/hivemind_bus_client/exceptions.py b/hivemind_bus_client/exceptions.py index 9beeedc..2073669 100644 --- a/hivemind_bus_client/exceptions.py +++ b/hivemind_bus_client/exceptions.py @@ -11,6 +11,18 @@ class UnauthorizedKeyError(HiveMindException): """ Invalid Key provided """ +class InvalidCipher(HiveMindException): + """unknown encryption scheme requested""" + + +class InvalidEncoding(HiveMindException): + """unknown encoding scheme requested""" + + +class InvalidKeySize(HiveMindException): + """ Encryption Key size does not obey specification""" + + class WrongEncryptionKey(HiveMindException): """ Wrong Encryption Key""" @@ -33,3 +45,8 @@ class SecureConnectionFailed(HiveMindConnectionError): class HiveMindEntryPointNotFound(HiveMindConnectionError): """ can not connect to provided address """ + + +class Z85DecodeError(Exception): + """Exception raised for errors in decoding Z85b.""" + pass diff --git a/hivemind_bus_client/message.py b/hivemind_bus_client/message.py index ff0e5e2..7bc9f18 100644 --- a/hivemind_bus_client/message.py +++ b/hivemind_bus_client/message.py @@ -163,7 +163,7 @@ def as_dict(self) -> dict: @property def as_json(self) -> str: - return json.dumps(self.as_dict) + return json.dumps(self.as_dict, ensure_ascii=False) def serialize(self) -> str: return self.as_json diff --git a/hivemind_bus_client/protocol.py b/hivemind_bus_client/protocol.py index 0fc1aae..7efda07 100644 --- a/hivemind_bus_client/protocol.py +++ b/hivemind_bus_client/protocol.py @@ -1,6 +1,6 @@ import pybase64 from dataclasses import dataclass -from typing import Optional +from typing import Optional, Tuple from ovos_bus_client import Message as MycroftMessage from ovos_bus_client import MessageBusClient @@ -9,6 +9,7 @@ from ovos_utils.log import LOG from hivemind_bus_client.client import HiveMessageBusClient +from hivemind_bus_client.encryption import SupportedEncodings, SupportedCiphers, optimal_ciphers from hivemind_bus_client.identity import NodeIdentity from hivemind_bus_client.message import HiveMessage, HiveMessageType from poorman_handshake import HandShake, PasswordHandShake @@ -118,7 +119,6 @@ def node_id(self): # this is how ovos-core bus refers to this slave's master return self.internal_protocol.node_id - # TODO - handshake handlers # hivemind events def handle_illegal_msg(self, message: HiveMessage): # this should not happen, @@ -143,14 +143,17 @@ def start_handshake(self): else: LOG.info("hivemind does not support binarization protocol") + payload = {"binarize": self.binarize, + "encodings": [SupportedEncodings.JSON_B64, + SupportedEncodings.JSON_Z85B, + SupportedEncodings.JSON_HEX], + "ciphers": optimal_ciphers()} if self.pswd_handshake is not None: - envelope = self.pswd_handshake.generate_handshake() - msg = HiveMessage(HiveMessageType.HANDSHAKE, {"envelope": envelope, - "binarize": self.binarize}) + payload["envelope"] = self.pswd_handshake.generate_handshake() else: - msg = HiveMessage(HiveMessageType.HANDSHAKE, {"pubkey": self.handshake.pubkey, - "binarize": self.binarize}) - self.hm.emit(msg) + payload["pubkey"] = self.handshake.pubkey + + self.hm.emit(HiveMessage(HiveMessageType.HANDSHAKE, payload)) def receive_handshake(self, envelope): if self.pswd_handshake is not None: @@ -182,7 +185,12 @@ def handle_handshake(self, message: HiveMessage): # master is performing the handshake if "envelope" in message.payload: envelope = message.payload["envelope"] + self.hm.json_encoding = message.payload.get("encoding") or SupportedEncodings.JSON_HEX + self.hm.cipher = message.payload.get("cipher") or SupportedCiphers.AES_GCM self.receive_handshake(envelope) + LOG.debug(f"Encoding: {self.hm.json_encoding}") + LOG.debug(f"Cipher: {self.hm.cipher}") + LOG.debug(f"Key size: {len(self.pswd_handshake.secret) * 8}bit") # master is requesting handshake start else: @@ -191,13 +199,17 @@ def handle_handshake(self, message: HiveMessage): # self.hm.handshake_event.set() # don't wait # return + encodings = message.payload.get("encodings") or [SupportedEncodings.JSON_HEX] + ciphers = message.payload.get("ciphers") or [SupportedCiphers.AES_GCM] + LOG.debug(f"Server supported encodings: {encodings}") + LOG.debug(f"Server supported ciphers: {ciphers}") if message.payload.get("crypto_key") and self.hm.crypto_key: pass # we can use the pre-shared key instead of handshake # TODO - flag to give preference to pre-shared key over handshake self.binarize = message.payload.get("binarize", False) - # TODO - flag to give preference to / require password or not + # TODO - flag to give preference to / require password or use RSA handshake # currently if password is set then it is always used if message.payload.get("password") and self.identity.password: self.pswd_handshake = PasswordHandShake(self.identity.password) diff --git a/hivemind_bus_client/scripts.py b/hivemind_bus_client/scripts.py index e1eabb8..e7c4f0e 100644 --- a/hivemind_bus_client/scripts.py +++ b/hivemind_bus_client/scripts.py @@ -198,8 +198,8 @@ def test_identity(): node.close() -@hmclient_cmds.command(help="recreate the PGP key for inter-node communication", name="reset-pgp") -def reset_pgp_key(): +@hmclient_cmds.command(help="recreate the private RSA key for inter-node communication", name="reset-pgp") +def reset_keys(): identity = NodeIdentity() identity.create_keys() print("PUBKEY:", identity.public_key) diff --git a/hivemind_bus_client/util.py b/hivemind_bus_client/util.py index 96bcc33..c41540d 100644 --- a/hivemind_bus_client/util.py +++ b/hivemind_bus_client/util.py @@ -1,12 +1,8 @@ import json import zlib -from binascii import hexlify, unhexlify from typing import Union, Dict -import pybase64 -from ovos_utils.security import encrypt, decrypt, AES - -from hivemind_bus_client.exceptions import EncryptionKeyError, DecryptionKeyError +from hivemind_bus_client.encryption import SupportedEncodings, SupportedCiphers from hivemind_bus_client.message import HiveMessage, HiveMessageType, Message @@ -102,87 +98,6 @@ def get_mycroft_msg(pload: Union[HiveMessage, str, Dict]) -> Message: return pload -def encrypt_as_json(key, data, b64=False) -> str: - # TODO default b64 to True in a future release - # we dont want clients to update before servers, otherwise servers won't be able to decode - # after a reasonable time all servers should support decoding both schemes and the default can change - if isinstance(data, dict): - data = json.dumps(data) - if len(key) > 16: - key = key[0:16] - ciphertext = encrypt_bin(key, data) - nonce, ciphertext, tag = ciphertext[:16], ciphertext[16:-16], ciphertext[-16:] - if b64: - return json.dumps({"ciphertext": pybase64.b64encode(ciphertext).decode('utf-8'), - "tag": pybase64.b64encode(tag).decode('utf-8'), - "nonce": pybase64.b64encode(nonce).decode('utf-8')}) - return json.dumps({"ciphertext": hexlify(ciphertext).decode('utf-8'), - "tag": hexlify(tag).decode('utf-8'), - "nonce": hexlify(nonce).decode('utf-8')}) - - -def decrypt_from_json(key, data: Union[str, bytes]): - if isinstance(data, str): - data = json.loads(data) - if len(key) > 16: - key = key[0:16] - - # payloads can be either hex encoded (old style) - # or b64 encoded (new style) - def decode(b64=False): - if b64: - decoder = pybase64.b64decode - else: - decoder = unhexlify - - ciphertext = decoder(data["ciphertext"]) - if data.get("tag") is None: # web crypto - ciphertext, tag = ciphertext[:-16], ciphertext[-16:] - else: - tag = decoder(data["tag"]) - nonce = decoder(data["nonce"]) - return ciphertext, tag, nonce - - is_b64 = any(a.isupper() for a in str(data)) # if any letter is uppercase, it must be b64 - ciphertext, tag, nonce = decode(is_b64) - try: - return decrypt(key, ciphertext, tag, nonce) - except ValueError as e: - if not is_b64: - try: # maybe it was b64 after all? unlikely but.... - ciphertext, tag, nonce = decode(b64=True) - return decrypt(key, ciphertext, tag, nonce) - except ValueError: - pass - raise DecryptionKeyError from e - - -def encrypt_bin(key, data: Union[str, bytes]): - if len(key) > 16: - key = key[0:16] - try: - ciphertext, tag, nonce = encrypt(key, data) - except: - raise EncryptionKeyError - - return nonce + ciphertext + tag - - -def decrypt_bin(key, ciphertext: bytes): - if len(key) > 16: - key = key[0:16] - - nonce, ciphertext, tag = ciphertext[:16], ciphertext[16:-16], ciphertext[-16:] - - try: - if not isinstance(key, bytes): - key = bytes(key, encoding="utf-8") - cipher = AES.new(key, AES.MODE_GCM, nonce) - return cipher.decrypt_and_verify(ciphertext, tag) - except ValueError: - raise DecryptionKeyError - - def compress_payload(text: Union[str, bytes]) -> bytes: # Compressing text if isinstance(text, str): @@ -192,14 +107,7 @@ def compress_payload(text: Union[str, bytes]) -> bytes: return zlib.compress(decompressed) -def decompress_payload(compressed: Union[str, bytes]) -> bytes: - # Decompressing text - if isinstance(compressed, str): # we really should be getting bytes here and not a str - if any(a.isupper() for a in compressed): - decoder = pybase64.b64decode - else: # assume hex - decoder = unhexlify - compressed = decoder(compressed) +def decompress_payload(compressed: bytes) -> bytes: return zlib.decompress(compressed) @@ -221,6 +129,62 @@ def bytes2str(payload: bytes, compressed=False) -> str: return payload.decode("utf-8") +############### +# deprecated +import warnings + + +def encrypt_as_json(key, data, b64=False) -> str: + warnings.warn( + "encrypt_as_json is deprecated and will be removed in future versions. " + "Use 'from hivemind_bus_client.encryption import encrypt_as_json' instead", + DeprecationWarning, + stacklevel=2 + ) + from hivemind_bus_client.encryption import encrypt_as_json as _ej + c = SupportedEncodings.JSON_B64 if b64 else SupportedEncodings.JSON_HEX + return _ej(key, data, encoding=c, cipher=SupportedCiphers.AES_GCM) + + +def decrypt_from_json(key, data: Union[str, bytes]): + warnings.warn( + "decrypt_from_json is deprecated and will be removed in future versions. " + "Use 'from hivemind_bus_client.encryption import decrypt_from_json' instead", + DeprecationWarning, + stacklevel=2 + ) + from hivemind_bus_client.encryption import decrypt_from_json as _dj + try: + return _dj(key, data, encoding=SupportedEncodings.JSON_HEX, cipher=SupportedCiphers.AES_GCM) + except Exception as e: + try: + return _dj(key, data, encoding=SupportedEncodings.JSON_B64, cipher=SupportedCiphers.AES_GCM) + except: + raise e + + +def encrypt_bin(key, data: Union[str, bytes]): + warnings.warn( + "encrypt_bin is deprecated and will be removed in future versions. " + "Use 'from hivemind_bus_client.encryption import encrypt_bin' instead", + DeprecationWarning, + stacklevel=2 + ) + from hivemind_bus_client.encryption import encrypt_bin as _eb + return _eb(key, data, cipher=SupportedCiphers.AES_GCM) + + +def decrypt_bin(key, ciphertext: bytes): + warnings.warn( + "decrypt_bin is deprecated and will be removed in future versions. " + "Use 'from hivemind_bus_client.encryption import decrypt_bin' instead", + DeprecationWarning, + stacklevel=2 + ) + from hivemind_bus_client.encryption import decrypt_bin as _db + return _db(key, ciphertext, SupportedCiphers.AES_GCM) + + if __name__ == "__main__": k = "*" * 16 test = "this is a test text for checking size of encryption and stuff" diff --git a/hivemind_bus_client/z85b.py b/hivemind_bus_client/z85b.py new file mode 100644 index 0000000..1697d7b --- /dev/null +++ b/hivemind_bus_client/z85b.py @@ -0,0 +1,102 @@ +""" +Python implementation of Z85b 85-bit encoding. + +Z85b is a variation of ZMQ RFC 32 Z85 85-bit encoding with the following differences: +1. Little-endian encoding (to facilitate alignment with lower byte indices). +2. No requirement for a multiple of 4/5 length. +3. `decode_z85b()` eliminates whitespace from the input. +4. `decode_z85b()` raises a clear exception if invalid characters are encountered. + +This file is a derivative work of z85.py from pyzmq. + +Copyright (c) 2013 Brian Granger, Min Ragan-Kelley +Distributed under the terms of the New BSD License. +""" +import re +import struct +from typing import Union + +from hivemind_bus_client.exceptions import Z85DecodeError + + +class Z85B: + # Z85CHARS is the base 85 symbol table + Z85CHARS = bytearray(b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#") + + # Z85MAP maps integers in [0, 84] to the appropriate character in Z85CHARS + Z85MAP = {char: idx for idx, char in enumerate(Z85CHARS)} + + # Powers of 85 for encoding/decoding + _85s = [85 ** i for i in range(5)] + + # Padding lengths for encoding and decoding + _E_PADDING = [0, 3, 2, 1] + _D_PADDING = [0, 4, 3, 2, 1] + + @classmethod + def encode(cls, rawbytes: Union[str, bytes]) -> bytes: + """ + Encode raw bytes into Z85b format. + + Args: + rawbytes (Union[str, bytes]): Input data to encode. + + Returns: + bytes: Z85b-encoded bytes. + """ + rawbytes = bytearray(rawbytes) if isinstance(rawbytes, (bytes, str)) else rawbytes + padding = cls._E_PADDING[len(rawbytes) % 4] + rawbytes += b'\x00' * padding + nvalues = len(rawbytes) // 4 + + # Pack the raw bytes into little-endian 32-bit integers + values = struct.unpack(f'<{nvalues}I', rawbytes) + encoded = bytearray() + + for value in values: + for offset in cls._85s: + encoded.append(cls.Z85CHARS[(value // offset) % 85]) + + # Remove padding characters from the encoded output + if padding: + encoded = encoded[:-padding] + return bytes(encoded) + + @classmethod + def decode(cls, z85bytes: Union[str, bytes]) -> bytes: + """ + Decode Z85b-encoded bytes into raw bytes. + + Args: + z85bytes (Union[str, bytes]): Z85b-encoded data. + + Returns: + bytes: Decoded raw bytes. + + Raises: + Z85DecodeError: If invalid characters are encountered during decoding. + """ + # Normalize input by removing whitespace + z85bytes = bytearray(re.sub(rb'\s+', b'', z85bytes if isinstance(z85bytes, bytes) else z85bytes.encode())) + padding = cls._D_PADDING[len(z85bytes) % 5] + nvalues = (len(z85bytes) + padding) // 5 + + values = [] + for i in range(0, len(z85bytes), 5): + value = 0 + for j, offset in enumerate(cls._85s): + try: + value += cls.Z85MAP[z85bytes[i + j]] * offset + except IndexError: + break # End of input reached + except KeyError as e: + raise Z85DecodeError(f"Invalid byte code: {e.args[0]!r}") + values.append(value) + + # Unpack the values back into raw bytes + decoded = struct.pack(f'<{nvalues}I', *values) + + # Remove padding from the decoded output + if padding: + decoded = decoded[:-padding] + return decoded diff --git a/requirements.txt b/requirements.txt index b2dac71..cacd61b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ ovos_utils>=0.0.38 bitstring>=4.1.1 cryptography>=41.0.1 pycryptodomex>=3.18.0 -pybase64 \ No newline at end of file +pybase64 +py-cpuinfo \ No newline at end of file