Skip to content

Commit

Permalink
fix: mypy error
Browse files Browse the repository at this point in the history
  • Loading branch information
Ernst79 committed Aug 18, 2022
1 parent e2548b0 commit 1364971
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 54 deletions.
25 changes: 25 additions & 0 deletions src/bthome_ble/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import dataclasses

from sensor_state_data import SensorLibrary, description


@dataclasses.dataclass
class MeasTypes:
meas_format: description.BaseSensorDescription
factor: float


MEAS_TYPES: dict[int, MeasTypes] = {
0x01: MeasTypes(meas_format=SensorLibrary.BATTERY__PERCENTAGE, factor=1),
0x02: MeasTypes(meas_format=SensorLibrary.TEMPERATURE__CELSIUS, factor=0.01),
0x03: MeasTypes(meas_format=SensorLibrary.HUMIDITY__PERCENTAGE, factor=0.01),
0x04: MeasTypes(meas_format=SensorLibrary.PRESSURE__MBAR, factor=0.01),
0x05: MeasTypes(meas_format=SensorLibrary.LIGHT__LIGHT_LUX, factor=0.01),
0x0A: MeasTypes(meas_format=SensorLibrary.ENERGY__ENERGY_KILO_WATT_HOUR, factor=0.001),
0x0B: MeasTypes(meas_format=SensorLibrary.POWER__POWER_WATT, factor=0.01),
0x0C: MeasTypes(meas_format=SensorLibrary.VOLTAGE__ELECTRIC_POTENTIAL_VOLT, factor=0.001),
0x0D: MeasTypes(meas_format=SensorLibrary.PM25__CONCENTRATION_MICROGRAMS_PER_CUBIC_METER, factor=1),
0x0E: MeasTypes(meas_format=SensorLibrary.PM10__CONCENTRATION_MICROGRAMS_PER_CUBIC_METER, factor=1),
0x12: MeasTypes(meas_format=SensorLibrary.CO2__CONCENTRATION_PARTS_PER_MILLION, factor=1),
0x13: MeasTypes(meas_format=SensorLibrary.VOLATILE_ORGANIC_COMPOUNDS__CONCENTRATION_PARTS_PER_MILLION, factor=1),
}
96 changes: 42 additions & 54 deletions src/bthome_ble/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from bluetooth_sensor_state_data import BluetoothData
from Cryptodome.Cipher import AES
from home_assistant_bluetooth import BluetoothServiceInfo
from sensor_state_data import SensorLibrary

from .const import MEAS_TYPES

_LOGGER = logging.getLogger(__name__)

Expand All @@ -36,23 +37,23 @@ def to_mac(addr: bytes) -> str:
return ":".join(f"{i:02X}" for i in addr)


def parse_uint(data_obj: bytes, factor: float = 1) -> float:
def parse_uint(data_obj: bytes, factor: float = 1.0) -> float:
"""convert bytes (as unsigned integer) and factor to float"""
decimal_places = -int(f"{factor:e}".split("e")[-1])
return round(
int.from_bytes(data_obj, "little", signed=False) * factor, decimal_places
)


def parse_int(data_obj: bytes, factor: float = 1) -> float:
def parse_int(data_obj: bytes, factor: float = 1.0) -> float:
"""convert bytes (as signed integer) and factor to float"""
decimal_places = -int(f"{factor:e}".split("e")[-1])
return round(
int.from_bytes(data_obj, "little", signed=True) * factor, decimal_places
)


def parse_float(data_obj: bytes, factor: float = 1) -> float | None:
def parse_float(data_obj: bytes, factor: float = 1.0) -> float | None:
"""convert bytes (as float) and factor to float"""
decimal_places = -int(f"{factor:e}".split("e")[-1])
if len(data_obj) == 2:
Expand Down Expand Up @@ -81,32 +82,6 @@ def parse_mac(data_obj: bytes) -> bytes | None:
return None


dispatch = {
0x00: parse_uint,
0x01: parse_int,
0x02: parse_float,
}


DATA_MEAS_DICT = {
0x01: [SensorLibrary.BATTERY__PERCENTAGE, 1],
0x02: [SensorLibrary.TEMPERATURE__CELSIUS, 0.01],
0x03: [SensorLibrary.HUMIDITY__PERCENTAGE, 0.01],
0x04: [SensorLibrary.PRESSURE__MBAR, 0.01],
0x05: [SensorLibrary.LIGHT__LIGHT_LUX, 0.01],
0x0A: [SensorLibrary.ENERGY__ENERGY_KILO_WATT_HOUR, 0.001],
0x0B: [SensorLibrary.POWER__POWER_WATT, 0.01],
0x0C: [SensorLibrary.VOLTAGE__ELECTRIC_POTENTIAL_VOLT, 0.001],
0x0D: [SensorLibrary.PM25__CONCENTRATION_MICROGRAMS_PER_CUBIC_METER, 1],
0x0E: [SensorLibrary.PM10__CONCENTRATION_MICROGRAMS_PER_CUBIC_METER, 1],
0x12: [SensorLibrary.CO2__CONCENTRATION_PARTS_PER_MILLION, 1],
0x13: [
SensorLibrary.VOLATILE_ORGANIC_COMPOUNDS__CONCENTRATION_PARTS_PER_MILLION,
1,
],
}


class BThomeBluetoothDeviceData(BluetoothData):
"""Date for BThome Bluetooth devices."""

Expand Down Expand Up @@ -170,21 +145,21 @@ def _parse_bthome(
# Encrypted BThome BLE format
try:
payload = self._decrypt_bthome(data, source_mac)
except TypeError:
except (ValueError, TypeError):
return False
firmware = "BThome BLE (encrypted)"
packet_id = parse_uint(data[-8:-4]) # noqa: F841
else:
return False

if not payload:
return False

payload_length = len(payload)
payload_start = 0
result = False

while payload_length >= payload_start + 1:
meas_float = None
meas_str = None

obj_control_byte = payload[payload_start]
obj_data_length = (obj_control_byte >> 0) & 31 # 5 bits (0-4)
obj_data_format = (obj_control_byte >> 5) & 7 # 3 bits (5-7)
Expand All @@ -196,21 +171,38 @@ def _parse_bthome(

if obj_data_length != 0:
if obj_data_format <= 3:
if obj_meas_type in DATA_MEAS_DICT:
if obj_meas_type in MEAS_TYPES:
next_payload = payload_start + 2
meas_data = payload[next_payload:next_start]
meas_type = DATA_MEAS_DICT[obj_meas_type][0]
meas_factor = DATA_MEAS_DICT[obj_meas_type][1]
if obj_data_format == 3:
meas = parse_string(meas_data)
meas_type = MEAS_TYPES[obj_meas_type]
meas_format = meas_type.meas_format
meas_factor = meas_type.factor

if obj_data_format == 0:
meas_float = parse_uint(meas_data, meas_factor)
elif obj_data_format == 1:
meas_float = parse_int(meas_data, meas_factor)
elif obj_data_format == 2:
meas_float = parse_float(meas_data, meas_factor)
elif obj_data_format == 3:
meas_str = parse_string(meas_data)

if meas_float:
self.update_predefined_sensor(meas_format, meas_float)
result = True
elif meas_str:
_LOGGER.debug(
"String data type not supported yet! Adv: %s",
data.hex(),
)
else:
meas = dispatch[obj_data_format](meas_data, meas_factor)

self.update_predefined_sensor(meas_type, meas)
result = True
_LOGGER.debug(
"UNKNOWN dataobject in BThome BLE payload! Adv: %s",
data.hex(),
)
else:
_LOGGER.debug(
"UNKNOWN dataobject in BThome BLE payload! Adv: %s",
"UNKNOWN measurement type in BThome BLE payload! Adv: %s",
data.hex(),
)
elif obj_data_format == 4:
Expand All @@ -228,11 +220,6 @@ def _parse_bthome(
payload_start = next_start

if not result:
_LOGGER.info(
"BLE ADV from UNKNOWN BThome BLE DEVICE: MAC: %s, ADV: %s",
to_mac(source_mac),
data.hex(),
)
return False

identifier = service_info.address
Expand All @@ -241,21 +228,22 @@ def _parse_bthome(
self.set_device_sw_version(firmware)
return True

def _decrypt_bthome(self, data: bytes, bthome_mac: bytes) -> bytes | None:
def _decrypt_bthome(self, data: bytes, bthome_mac: bytes) -> bytes:
"""Decrypt encrypted BThome BLE advertisements"""
if not self.bindkey:
self.bindkey_verified = False
_LOGGER.debug("Encryption key not set and adv is encrypted")
return None
raise ValueError

if not self.bindkey or len(self.bindkey) != 16:
self.bindkey_verified = False
_LOGGER.error("Encryption key should be 16 bytes (32 characters) long")
return None
raise ValueError

# check for minimum length of encrypted advertisement
if len(data) < 15:
_LOGGER.debug("Invalid data length (for decryption), adv: %s", data.hex())
raise ValueError

# prepare the data for decryption
uuid = b"\x1e\x18"
Expand All @@ -276,14 +264,14 @@ def _decrypt_bthome(self, data: bytes, bthome_mac: bytes) -> bytes | None:
_LOGGER.debug("mic: %s", mic.hex())
_LOGGER.debug("nonce: %s", nonce.hex())
_LOGGER.debug("encrypted_payload: %s", encrypted_payload.hex())
return None
raise ValueError
if decrypted_payload is None:
self.bindkey_verified = False
_LOGGER.error(
"Decryption failed for %s, decrypted payload is None",
to_mac(bthome_mac),
)
return None
raise ValueError
self.bindkey_verified = True

return decrypted_payload

0 comments on commit 1364971

Please sign in to comment.