Skip to content

Commit

Permalink
feat: multiple measurements of the same type
Browse files Browse the repository at this point in the history
  • Loading branch information
Ernst79 committed Oct 16, 2022
1 parent 4a4fe68 commit e57ed3d
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 22 deletions.
76 changes: 54 additions & 22 deletions src/bthome_ble/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,48 +295,77 @@ def _parse_payload(self, payload: bytes) -> bool:
payload_length = len(payload)
next_obj_start = 0
result = False
measurements: list[dict[str, Any]] = []
device_id_dict: dict[int, int] = {}

# Create a list with all individual objects
while payload_length >= next_obj_start + 1:
obj_start = next_obj_start

obj_control_byte = payload[obj_start]
obj_meas_type = payload[obj_start + 1]
obj_data_length = (obj_control_byte >> 0) & 31 # 5 bits (0-4)
obj_data_format = (obj_control_byte >> 5) & 7 # 3 bits (5-7)
obj_meas_type = payload[obj_start + 1]

obj_end = obj_start + obj_data_length
next_obj_start = obj_end + 1
if obj_data_length == 0:
continue

if payload_length < next_obj_start:
_LOGGER.debug("Invalid payload data length, payload: %s", payload.hex())
break

if obj_data_length == 0:
continue
obj_data_start = obj_start + 2
obj_end = obj_start + obj_data_length
next_obj_start = obj_end + 1
measurements.append(
{
"data format": obj_data_format,
"data length": obj_data_length,
"measurement type": obj_meas_type,
"measurement data": payload[obj_data_start:next_obj_start],
"device id": None,
}
)

if obj_meas_type not in MEAS_TYPES:
# Get a list of measurement types that are included more than once.
meas_list = []
for meas in measurements:
meas_list.append(meas["measurement type"])
dup_meas_types = {x for x in meas_list if meas_list.count(x) > 1}

# Parse each object into readable information
for meas in measurements:
if meas["measurement type"] not in MEAS_TYPES:
_LOGGER.debug(
"UNKNOWN measurement type %s in BTHome BLE payload! Adv: %s",
obj_meas_type,
meas["measurement type"],
payload.hex(),
)
continue

meas_data_start = obj_start + 2
meas_data = payload[meas_data_start:next_obj_start]
meas_type = MEAS_TYPES[obj_meas_type]
if meas["measurement type"] in dup_meas_types:
# Add a device_id for advertisements with multiple measurements of the same type
if meas["measurement type"] not in device_id_dict:
device_id_counter = 1
else:
device_id_counter = device_id_dict[meas["measurement type"]] + 1
device_id_dict.update({meas["measurement type"]: device_id_counter})
device_id = str(device_id_counter)
else:
device_id = None

meas_type = MEAS_TYPES[meas["measurement type"]]
meas_format = meas_type.meas_format
meas_factor = meas_type.factor
value: None | str | int | float

if obj_data_format == 0:
value = parse_uint(meas_data, meas_factor)
elif obj_data_format == 1:
value = parse_int(meas_data, meas_factor)
elif obj_data_format == 2:
value = parse_float(meas_data, meas_factor)
elif obj_data_format == 3:
value = parse_string(meas_data)
if meas["data format"] == 0:
value = parse_uint(meas["measurement data"], meas_factor)
elif meas["data format"] == 1:
value = parse_int(meas["measurement data"], meas_factor)
elif meas["data format"] == 2:
value = parse_float(meas["measurement data"], meas_factor)
elif meas["data format"] == 3:
value = parse_string(meas["measurement data"])
else:
_LOGGER.error(
"UNKNOWN dataobject in BTHome BLE payload! Adv: %s",
Expand All @@ -354,6 +383,7 @@ def _parse_payload(self, payload: bytes) -> bool:
native_unit_of_measurement=meas_format.native_unit_of_measurement,
native_value=value,
device_class=meas_format.device_class,
device_id=device_id,
)
elif (
type(meas_format) == BaseBinarySensorDescription
Expand All @@ -363,18 +393,20 @@ def _parse_payload(self, payload: bytes) -> bool:
key=str(meas_format.device_class),
device_class=meas_format.device_class,
native_value=bool(value),
device_id=device_id,
)
elif type(meas_format) == EventDeviceKeys:
event_type = EVENT_TYPES[meas_data[0]]
event_type = EVENT_TYPES[meas["measurement data"][0]]
event_properties = None
if len(meas_data) >= 2:
if len(meas["measurement data"]) >= 2:
event_properties = parse_event_properties(
event_type, meas_data[1:]
event_type, meas["measurement data"][1:]
)
self.fire_event(
key=str(meas_format),
event_type=event_type,
event_properties=event_properties,
device_id=device_id,
)
result = True
else:
Expand Down
150 changes: 150 additions & 0 deletions tests/test_parser_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -1169,3 +1169,153 @@ def test_bthome_event_dimmer_rotate_left_3_steps(caplog):
),
},
)


def test_bthome_double_temperature(caplog):
"""Test BTHome parser for double temperature reading without encryption."""
data_string = b"B\x01\x00#\x02\xca\t#\x02\xcf\t"
advertisement = bytes_to_service_info(
data_string, local_name="A4:C1:38:8D:18:B2", address="A4:C1:38:8D:18:B2"
)

device = BTHomeBluetoothDeviceData()
assert device.update(advertisement) == SensorUpdate(
title="ATC Temperature/Humidity Sensor 18B2",
devices={
None: SensorDeviceInfo(
name="ATC Temperature/Humidity Sensor 18B2",
manufacturer="pvvx",
model="LYWSD03MMC",
sw_version="BTHome BLE v2",
hw_version=None,
)
},
entity_descriptions={
DeviceKey(key="temperature", device_id="1"): SensorDescription(
device_key=DeviceKey(key="temperature", device_id="1"),
device_class=SensorDeviceClass.TEMPERATURE,
native_unit_of_measurement=Units.TEMP_CELSIUS,
),
DeviceKey(key="temperature", device_id="2"): SensorDescription(
device_key=DeviceKey(key="temperature", device_id="2"),
device_class=SensorDeviceClass.TEMPERATURE,
native_unit_of_measurement=Units.TEMP_CELSIUS,
),
KEY_SIGNAL_STRENGTH: SensorDescription(
device_key=KEY_SIGNAL_STRENGTH,
device_class=SensorDeviceClass.SIGNAL_STRENGTH,
native_unit_of_measurement=Units.SIGNAL_STRENGTH_DECIBELS_MILLIWATT,
),
},
entity_values={
DeviceKey(key="temperature", device_id="1"): SensorValue(
device_key=DeviceKey(key="temperature", device_id="1"),
name="Temperature",
native_value=25.06,
),
DeviceKey(key="temperature", device_id="2"): SensorValue(
device_key=DeviceKey(key="temperature", device_id="2"),
name="Temperature",
native_value=25.11,
),
KEY_SIGNAL_STRENGTH: SensorValue(
device_key=KEY_SIGNAL_STRENGTH, name="Signal Strength", native_value=-60
),
},
)


def test_bthome_tripple_temperature_double_humidity_battery(caplog):
"""
Test BTHome parser for triple temperature, double humidity and
single battery reading without encryption.
"""
data_string = (
b"B\x01\x00#\x02\xca\t#\x02\xcf\t#\x02"
b"\xcf\x08\x03\x03\xb7\x18\x03\x03\xb7\x17\x02\x01]"
)
advertisement = bytes_to_service_info(
data_string, local_name="A4:C1:38:8D:18:B2", address="A4:C1:38:8D:18:B2"
)

device = BTHomeBluetoothDeviceData()
assert device.update(advertisement) == SensorUpdate(
title="ATC Temperature/Humidity Sensor 18B2",
devices={
None: SensorDeviceInfo(
name="ATC Temperature/Humidity Sensor 18B2",
manufacturer="pvvx",
model="LYWSD03MMC",
sw_version="BTHome BLE v2",
hw_version=None,
)
},
entity_descriptions={
DeviceKey(key="temperature", device_id="1"): SensorDescription(
device_key=DeviceKey(key="temperature", device_id="1"),
device_class=SensorDeviceClass.TEMPERATURE,
native_unit_of_measurement=Units.TEMP_CELSIUS,
),
DeviceKey(key="temperature", device_id="2"): SensorDescription(
device_key=DeviceKey(key="temperature", device_id="2"),
device_class=SensorDeviceClass.TEMPERATURE,
native_unit_of_measurement=Units.TEMP_CELSIUS,
),
DeviceKey(key="temperature", device_id="3"): SensorDescription(
device_key=DeviceKey(key="temperature", device_id="3"),
device_class=SensorDeviceClass.TEMPERATURE,
native_unit_of_measurement=Units.TEMP_CELSIUS,
),
DeviceKey(key="humidity", device_id="1"): SensorDescription(
device_key=DeviceKey(key="humidity", device_id="1"),
device_class=SensorDeviceClass.HUMIDITY,
native_unit_of_measurement=Units.PERCENTAGE,
),
DeviceKey(key="humidity", device_id="2"): SensorDescription(
device_key=DeviceKey(key="humidity", device_id="2"),
device_class=SensorDeviceClass.HUMIDITY,
native_unit_of_measurement=Units.PERCENTAGE,
),
KEY_BATTERY: SensorDescription(
device_key=KEY_BATTERY,
device_class=SensorDeviceClass.BATTERY,
native_unit_of_measurement=Units.PERCENTAGE,
),
KEY_SIGNAL_STRENGTH: SensorDescription(
device_key=KEY_SIGNAL_STRENGTH,
device_class=SensorDeviceClass.SIGNAL_STRENGTH,
native_unit_of_measurement=Units.SIGNAL_STRENGTH_DECIBELS_MILLIWATT,
),
},
entity_values={
DeviceKey(key="temperature", device_id="1"): SensorValue(
device_key=DeviceKey(key="temperature", device_id="1"),
name="Temperature",
native_value=25.06,
),
DeviceKey(key="temperature", device_id="2"): SensorValue(
device_key=DeviceKey(key="temperature", device_id="2"),
name="Temperature",
native_value=25.11,
),
DeviceKey(key="temperature", device_id="3"): SensorValue(
device_key=DeviceKey(key="temperature", device_id="3"),
name="Temperature",
native_value=22.55,
),
DeviceKey(key="humidity", device_id="1"): SensorValue(
device_key=DeviceKey(key="humidity", device_id="1"),
name="Humidity",
native_value=63.27,
),
DeviceKey(key="humidity", device_id="2"): SensorValue(
device_key=DeviceKey(key="humidity", device_id="2"),
name="Humidity",
native_value=60.71,
),
KEY_BATTERY: SensorValue(KEY_BATTERY, name="Battery", native_value=93),
KEY_SIGNAL_STRENGTH: SensorValue(
device_key=KEY_SIGNAL_STRENGTH, name="Signal Strength", native_value=-60
),
},
)

0 comments on commit e57ed3d

Please sign in to comment.