Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:add chacha20 cipher + z85 encoding #50

Merged
merged 12 commits into from
Jan 3, 2025
14 changes: 8 additions & 6 deletions hivemind_bus_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from hivemind_bus_client.message import HiveMessage, HiveMessageType
from hivemind_bus_client.serialization import HiveMindBinaryPayloadType
from hivemind_bus_client.serialization import get_bitstring, decode_bitstring
from hivemind_bus_client.util import serialize_message, \
encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin
from hivemind_bus_client.util import serialize_message
from hivemind_bus_client.encryption import encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin, JsonCiphers, BinaryCiphers
from poorman_handshake.asymmetric.utils import encrypt_RSA, load_RSA_key, sign_RSA


Expand Down Expand Up @@ -104,6 +104,8 @@ def __init__(self, key: Optional[str] = None,
internal_bus: Optional[OVOSBusClient] = None,
bin_callbacks: BinaryDataCallbacks = BinaryDataCallbacks()):
self.bin_callbacks = bin_callbacks
self.json_cipher = JsonCiphers.JSON_HEX_AES_GCM_128 # server defaults before it was made configurable
self.bin_cipher = BinaryCiphers.BINARY_AES_GCM_128 # server defaults before it was made configurable
JarbasAl marked this conversation as resolved.
Show resolved Hide resolved

self.identity = identity or None
self._password = password
Expand Down Expand Up @@ -271,11 +273,11 @@ def on_message(self, *args):
if self.crypto_key:
# handle binary encryption
if isinstance(message, bytes):
message = decrypt_bin(self.crypto_key, message)
message = decrypt_bin(self.crypto_key, message, cipher=self.bin_cipher)
# handle json encryption
elif "ciphertext" in message:
# LOG.debug(f"got encrypted message: {len(message)}")
message = decrypt_from_json(self.crypto_key, message)
message = decrypt_from_json(self.crypto_key, message, cipher=self.json_cipher)
else:
LOG.debug("Message was unencrypted")

Expand Down Expand Up @@ -367,14 +369,14 @@ def emit(self, message: Union[MycroftMessage, HiveMessage],
binary_type=binary_type,
hivemeta=message.metadata)
if self.crypto_key:
ws_payload = encrypt_bin(self.crypto_key, bitstr.bytes)
ws_payload = encrypt_bin(self.crypto_key, bitstr.bytes, cipher=self.bin_cipher)
else:
ws_payload = bitstr.bytes
self.client.send(ws_payload, ABNF.OPCODE_BINARY)
else:
ws_payload = serialize_message(message)
if self.crypto_key:
ws_payload = encrypt_as_json(self.crypto_key, ws_payload)
ws_payload = encrypt_as_json(self.crypto_key, ws_payload, cipher=self.json_cipher)
self.client.send(ws_payload)

except WebSocketConnectionClosedException:
Expand Down
298 changes: 298 additions & 0 deletions hivemind_bus_client/encryption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
import enum
import json
from binascii import hexlify, unhexlify
from typing import Union, Optional, Dict, Any

import pybase64
from Cryptodome.Cipher import AES, ChaCha20_Poly1305

from cpuinfo import get_cpu_info
from hivemind_bus_client.exceptions import EncryptionKeyError, DecryptionKeyError, InvalidCipher


def cpu_supports_AES() -> bool:
return "aes" in get_cpu_info()["flags"]


class JsonCiphers(str, enum.Enum):
"""
Enum representing JSON-based encryption ciphers.
"""
JSON_B64_AES_GCM_128 = "JSON-B64-AES-GCM-128" # JSON text output with Base64 encoding
JSON_HEX_AES_GCM_128 = "JSON-HEX-AES-GCM-128" # JSON text output with Hex encoding
JSON_B64_CHACHA20_POLY1305 = "JSON-B64-CHACHA20-POLY1305" # JSON text output with Base64 encoding
JSON_HEX_CHACHA20_POLY1305 = "JSON-HEX-CHACHA20-POLY1305" # JSON text output with Hex encoding


class BinaryCiphers(str, enum.Enum):
"""
Enum representing binary encryption ciphers.

Specifications:
- AES - http://csrc.nist.gov/publications/fips/fips197/fips-197.pdf
- GCM - http://csrc.nist.gov/publications/nistpubs/800-38D/SP-800-38D.pdf
- CHACHA20-POLY1305 - https://datatracker.ietf.org/doc/html/rfc7539
"""
BINARY_AES_GCM_128 = "BINARY-AES-GCM-128" # Binary output
BINARY_CHACHA20_POLY1305 = "BINARY-CHACHA20-POLY1305" # specified in RFC7539.


def encrypt_as_json(key: Union[str, bytes], data: Union[str, Dict[str, Any]],
cipher: JsonCiphers = JsonCiphers.JSON_B64_AES_GCM_128) -> str:
"""
Encrypts the given data and outputs it as a JSON string.

Args:
key (Union[str, bytes]): The encryption key, up to 16 bytes. Longer keys will be truncated.
data (Union[str, Dict[str, Any]]): The data to encrypt. If a dictionary, it will be serialized to JSON.
cipher (JsonCiphers): The encryption cipher. Supported options:
- JSON-B64-AES-GCM-128: Outputs Base64-encoded JSON.
- JSON-HEX-AES-GCM-128: Outputs Hex-encoded JSON.

Returns:
str: A JSON string containing the encrypted data, nonce, and tag.

Raises:
InvalidCipher: If an unsupported cipher is provided.
"""
if cipher not in JsonCiphers:
raise InvalidCipher(f"Invalid JSON cipher: {str(cipher)}")

if isinstance(data, dict):
data = json.dumps(data)

aes_ciphers = {JsonCiphers.JSON_B64_AES_GCM_128, JsonCiphers.JSON_HEX_AES_GCM_128}
b64_ciphers = {JsonCiphers.JSON_B64_AES_GCM_128, JsonCiphers.JSON_B64_CHACHA20_POLY1305}

bcipher = BinaryCiphers.BINARY_AES_GCM_128 if cipher in aes_ciphers else BinaryCiphers.BINARY_CHACHA20_POLY1305

ciphertext = encrypt_bin(key, data, cipher=bcipher)

# extract nonce/tag depending on cipher, sizes are different
if cipher in aes_ciphers:
nonce, ciphertext, tag = ciphertext[:16], ciphertext[16:-16], ciphertext[-16:]
else:
nonce, ciphertext, tag = ciphertext[:12], ciphertext[12:-16], ciphertext[-16:]

encoder = pybase64.b64encode if cipher in b64_ciphers else hexlify

return json.dumps({
"ciphertext": encoder(ciphertext).decode('utf-8'),
"tag": encoder(tag).decode('utf-8'),
"nonce": encoder(nonce).decode('utf-8')
})


def decrypt_from_json(key: Union[str, bytes], data: Union[str, bytes], cipher: JsonCiphers) -> str:
"""
Decrypts data from a JSON string.

Args:
key (Union[str, bytes]): The decryption key, up to 16 bytes. Longer keys will be truncated.
data (Union[str, bytes]): The encrypted data as a JSON string or bytes.
cipher (Optional[JsonCiphers]): The cipher used for encryption. If None, it is auto-detected.

Returns:
str: The decrypted plaintext data.

Raises:
InvalidCipher: If an unsupported cipher is provided.
DecryptionKeyError: If decryption fails due to an invalid key or corrupted data.
"""

aes_ciphers = {JsonCiphers.JSON_B64_AES_GCM_128, JsonCiphers.JSON_HEX_AES_GCM_128}
b64_ciphers = {JsonCiphers.JSON_B64_AES_GCM_128, JsonCiphers.JSON_B64_CHACHA20_POLY1305}

if isinstance(data, str):
data = json.loads(data)

decoder = pybase64.b64decode if cipher in b64_ciphers else unhexlify
bcipher = BinaryCiphers.BINARY_AES_GCM_128 if cipher in aes_ciphers else BinaryCiphers.BINARY_CHACHA20_POLY1305

ciphertext = decoder(data["ciphertext"])
if "tag" not in data: # web crypto compatibility
ciphertext, tag = ciphertext[:-16], ciphertext[-16:]
else:
tag = decoder(data["tag"])
nonce = decoder(data["nonce"])

decryptor = decrypt_AES_GCM_128 if bcipher == BinaryCiphers.BINARY_AES_GCM_128 else decrypt_ChaCha20_Poly1305
try:
plaintext = decryptor(key, ciphertext, tag, nonce)
return plaintext.decode("utf-8")
except ValueError as e:
raise DecryptionKeyError from e


def encrypt_bin(key: Union[str, bytes], data: Union[str, bytes], cipher: BinaryCiphers) -> bytes:
"""
Encrypts the given data and returns it as binary.

Args:
key (Union[str, bytes]): The encryption key, up to 16 bytes. Longer keys will be truncated.
data (Union[str, bytes]): The data to encrypt. Strings will be encoded as UTF-8.
cipher (BinaryCiphers): The encryption cipher. Only BINARY_AES_GCM_128 is supported.
JarbasAl marked this conversation as resolved.
Show resolved Hide resolved

Returns:
bytes: The encrypted data, including the nonce and tag.

Raises:
InvalidCipher: If an unsupported cipher is provided.
EncryptionKeyError: If encryption fails.
"""
if cipher not in BinaryCiphers:
raise InvalidCipher(f"Invalid binary cipher: {str(cipher)}")

encryptor = encrypt_AES_GCM_128 if cipher == BinaryCiphers.BINARY_AES_GCM_128 else encrypt_ChaCha20_Poly1305
try:
ciphertext, tag, nonce = encryptor(key, data)
except Exception as e:
raise EncryptionKeyError from e

return nonce + ciphertext + tag


def decrypt_bin(key: Union[str, bytes], ciphertext: bytes, cipher: BinaryCiphers) -> bytes:
"""
Decrypts binary data.

Args:
key (Union[str, bytes]): The decryption key, up to 16 bytes. Longer keys will be truncated.
ciphertext (bytes): The binary encrypted data. Must include nonce and tag.
cipher (BinaryCiphers): The cipher used for encryption. Only BINARY_AES_GCM_128 is supported.

Returns:
bytes: The decrypted plaintext data.

Raises:
InvalidCipher: If an unsupported cipher is provided.
DecryptionKeyError: If decryption fails due to an invalid key or corrupted data.
"""
if cipher not in BinaryCiphers:
raise InvalidCipher(f"Invalid binary cipher: {str(cipher)}")

# extract nonce/tag depending on cipher, sizes are different
if cipher == BinaryCiphers.BINARY_AES_GCM_128:
nonce, ciphertext, tag = ciphertext[:16], ciphertext[16:-16], ciphertext[-16:]
else:
nonce, ciphertext, tag = ciphertext[:12], ciphertext[12:-16], ciphertext[-16:]

decryptor = decrypt_AES_GCM_128 if cipher == BinaryCiphers.BINARY_AES_GCM_128 else decrypt_ChaCha20_Poly1305
try:
return decryptor(key, ciphertext, tag, nonce)
except ValueError as e:
raise DecryptionKeyError from e


#############################
# Cipher Implementations
def encrypt_AES_GCM_128(key: Union[str, bytes], text: Union[str, bytes],
nonce: Optional[bytes] = None) -> tuple[bytes, bytes, bytes]:
"""
Encrypts plaintext using AES-GCM-128.

Args:
key (Union[str, bytes]): The encryption key. Strings will be encoded as UTF-8.
text (Union[str, bytes]): The plaintext to encrypt.
nonce (Optional[bytes]): An optional nonce. If None, a new one is generated.

Returns:
tuple[bytes, bytes, bytes]: The ciphertext, authentication tag, and nonce.
"""
if not isinstance(text, bytes):
text = bytes(text, encoding="utf-8")
if not isinstance(key, bytes):
key = bytes(key, encoding="utf-8")
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
ciphertext, tag = cipher.encrypt_and_digest(text)
return ciphertext, tag, cipher.nonce
JarbasAl marked this conversation as resolved.
Show resolved Hide resolved


def decrypt_AES_GCM_128(key: Union[str, bytes], ciphertext: bytes, tag: bytes, nonce: bytes) -> bytes:
"""
Decrypts ciphertext encrypted using AES-GCM-128.

Args:
key (Union[str, bytes]): The decryption key. Strings will be encoded as UTF-8.
ciphertext (bytes): The encrypted ciphertext.
tag (bytes): The authentication tag.
nonce (bytes): The nonce used during encryption.

Returns:
str: The decrypted plaintext.

Raises:
ValueError: If decryption or authentication fails.
"""
if not isinstance(key, bytes):
key = bytes(key, encoding="utf-8")
cipher = AES.new(key, AES.MODE_GCM, nonce)
return cipher.decrypt_and_verify(ciphertext, tag)


def encrypt_ChaCha20_Poly1305(key: Union[str, bytes],
text: Union[str, bytes],
nonce: Optional[bytes] = None) -> tuple[bytes, bytes, bytes]:
"""
Encrypts plaintext using AES-GCM-128.
JarbasAl marked this conversation as resolved.
Show resolved Hide resolved

Args:
key (Union[str, bytes]): The encryption key. Strings will be encoded as UTF-8.
text (Union[str, bytes]): The plaintext to encrypt.
nonce (Optional[bytes]): An optional nonce. If None, a new one is generated.

Returns:
tuple[bytes, bytes, bytes]: The ciphertext, authentication tag, and nonce.
"""
if not isinstance(text, bytes):
text = bytes(text, encoding="utf-8")
if not isinstance(key, bytes):
key = bytes(key, encoding="utf-8")
assert len(key) == 32 # ChaCha20 uses 256 bit/32 byte keys
if nonce:
assert len(nonce) == 12 # 92bits/12bytes bytes per RFC7539
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
JarbasAl marked this conversation as resolved.
Show resolved Hide resolved
ciphertext, tag = cipher.encrypt_and_digest(text)
return ciphertext, tag, cipher.nonce


def decrypt_ChaCha20_Poly1305(key: Union[str, bytes],
ciphertext: bytes,
tag: bytes,
nonce: bytes) -> bytes:
"""
Decrypts ciphertext encrypted using AES-GCM-128.

Args:
key (Union[str, bytes]): The decryption key. Strings will be encoded as UTF-8.
ciphertext (bytes): The encrypted ciphertext.
tag (bytes): The authentication tag.
nonce (bytes): The nonce used during encryption.

Returns:
str: The decrypted plaintext.

Raises:
ValueError: If decryption or authentication fails.
"""
if not isinstance(key, bytes):
key = bytes(key, encoding="utf-8")

assert len(key) == 32 # ChaCha20 uses 256 bit/32 byte keys
if nonce:
assert len(nonce) == 12 # 92bits/12bytes per RFC7539
cipher = ChaCha20_Poly1305.new(key=key, nonce=nonce)
return cipher.decrypt_and_verify(ciphertext, tag)


if __name__ == "__main__":
from Cryptodome.Random import get_random_bytes

print("JSON-B64-AES-GCM-128" == JsonCiphers.JSON_B64_AES_GCM_128)

key = get_random_bytes(32)
plaintext = b'Attack at dawn'
ciphertext, tag, nonce = encrypt_ChaCha20_Poly1305(key, plaintext)
recovered = decrypt_ChaCha20_Poly1305(key, ciphertext, tag, nonce)
print(recovered)
assert recovered == plaintext
4 changes: 4 additions & 0 deletions hivemind_bus_client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ class UnauthorizedKeyError(HiveMindException):
""" Invalid Key provided """


class InvalidCipher(HiveMindException):
"""unknown encryption scheme requested"""


class WrongEncryptionKey(HiveMindException):
""" Wrong Encryption Key"""

Expand Down
Loading
Loading