diff --git a/src/python_testing/TC_OCC_3_1.py b/src/python_testing/TC_OCC_3_1.py index 4380866333f339..3fd082a62bc974 100644 --- a/src/python_testing/TC_OCC_3_1.py +++ b/src/python_testing/TC_OCC_3_1.py @@ -16,11 +16,11 @@ # # === BEGIN CI TEST ARGUMENTS === # test-runner-runs: run1 -# test-runner-run/run1/app: ${TYPE_OF_APP} +# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} # test-runner-run/run1/factoryreset: True # test-runner-run/run1/quiet: True # test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json -# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto +# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto --endpoint 1 # === END CI TEST ARGUMENTS === # There are CI issues to be followed up for the test cases below that implements manually controlling sensor device for # the occupancy state ON/OFF change. @@ -29,19 +29,29 @@ import logging import time +from typing import Any, Optional import chip.clusters as Clusters -from chip import ChipDeviceCtrl from chip.interaction_model import Status from matter_testing_support import MatterBaseTest, TestStep, async_test_body, default_matter_test_main from mobly import asserts class TC_OCC_3_1(MatterBaseTest): - async def read_occ_attribute_expect_success(self, endpoint, attribute): + async def read_occ_attribute_expect_success(self, attribute): cluster = Clusters.Objects.OccupancySensing + endpoint = self.matter_test_config.endpoint return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) + async def write_hold_time(self, hold_time: Optional[Any]) -> Status: + dev_ctrl = self.default_controller + node_id = self.dut_node_id + endpoint = self.matter_test_config.endpoint + + cluster = Clusters.OccupancySensing + write_result = await dev_ctrl.WriteAttribute(node_id, [(endpoint, cluster.Attributes.HoldTime(hold_time))]) + return write_result[0].Status + def desc_TC_OCC_3_1(self) -> str: return "[TC-OCC-3.1] Primary functionality with server as DUT" @@ -63,39 +73,37 @@ def pics_TC_OCC_3_1(self) -> list[str]: @async_test_body async def test_TC_OCC_3_1(self): - - endpoint = self.user_params.get("endpoint", 1) - node_id = self.matter_test_config.dut_node_ids[0] hold_time = 10 # 10 seconds for occupancy state hold time self.step(1) # commissioning and getting cluster attribute list - attributes = Clusters.OccupancySensing.Attributes - attribute_list = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.AttributeList) + cluster = Clusters.OccupancySensing + attributes = cluster.Attributes + attribute_list = await self.read_occ_attribute_expect_success(attribute=attributes.AttributeList) - self.step(2) - if attributes.HoldTime.attribute_id in attribute_list: - # write 10 as a HoldTime attibute - write_res = await ChipDeviceCtrl.WriteAttribute(node_id, [(endpoint, attributes.HoldTime(hold_time))]) - asserts.assert_equal(write_res[0].status, Status.Success, "Write HoldTime failed") + has_hold_time = attributes.HoldTime.attribute_id in attribute_list + self.step(2) + if has_hold_time: + # write 10 as a HoldTime attribute + await self.write_single_attribute(cluster.Attributes.HoldTime(hold_time)) else: logging.info("No HoldTime attribute supports. Will test only occupancy attribute triggering functionality") self.step(3) # check if Occupancy attribute is 0 - occupancy_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.Occupancy) + occupancy_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) # if occupancy is on, then wait until the sensor occupancy state is 0. if occupancy_dut == 1: # Don't trigger occupancy sensor to render occupancy attribute to 0 - if attributes.HoldTime.attribute_id in attribute_list: - time.sleep(hold_time + 2) # add some extra 2 seconds to ensure hold time has passed. + if has_hold_time: + time.sleep(hold_time + 2.0) # add some extra 2 seconds to ensure hold time has passed. else: # a user wait until a sensor specific time to change occupancy attribute to 0. This is the case where the sensor doesn't support HoldTime. self.wait_for_user_input( prompt_msg="Type any letter and press ENTER after the sensor occupancy is detection ready state (occupancy attribute = 0)") # check sensor occupancy state is 0 for the next test step - occupancy_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.Occupancy) + occupancy_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) asserts.assert_equal(occupancy_dut, 0, "Occupancy attribute is still 1.") self.step(4) @@ -103,18 +111,18 @@ async def test_TC_OCC_3_1(self): self.wait_for_user_input(prompt_msg="Type any letter and press ENTER after a sensor occupancy is triggered.") # And then check if Occupancy attribute has changed. - occupancy_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.Occupancy) + occupancy_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) asserts.assert_equal(occupancy_dut, 1, "Occupancy state is not changed to 1") self.step(5) # check if Occupancy attribute is back to 0 after HoldTime attribute period # Tester should not be triggering the sensor for this test step. - if attributes.HoldTime.attribute_id in attribute_list: + if has_hold_time: # Start a timer based on HoldTime - time.sleep(hold_time+2) # add some extra 2 seconds to ensure hold time has passed. + time.sleep(hold_time + 2.0) # add some extra 2 seconds to ensure hold time has passed. - occupancy_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.Occupancy) + occupancy_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) asserts.assert_equal(occupancy_dut, 0, "Occupancy state is not 0 after HoldTime period") else: diff --git a/src/python_testing/TC_OCC_3_2.py b/src/python_testing/TC_OCC_3_2.py index 3624f1044c40e8..7e811362e2e2a4 100644 --- a/src/python_testing/TC_OCC_3_2.py +++ b/src/python_testing/TC_OCC_3_2.py @@ -19,72 +19,31 @@ # # === BEGIN CI TEST ARGUMENTS === # test-runner-runs: run1 -# test-runner-run/run1/app: ${TYPE_OF_APP} +# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} # test-runner-run/run1/factoryreset: True # test-runner-run/run1/quiet: True # test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json -# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto +# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto --endpoint 1 # === END CI TEST ARGUMENTS === + # TODO: There are CI issues to be followed up for the test cases below that implements manually controlling sensor device for # the occupancy state ON/OFF change. # [TC-OCC-3.1] test procedure step 4 # [TC-OCC-3.2] test precedure step 3a, 3c import logging -import queue -import time -from typing import Any import chip.clusters as Clusters -from chip import ChipDeviceCtrl -from chip.clusters.Attribute import TypedAttributePath -from matter_testing_support import (AttributeValue, ClusterAttributeChangeAccumulator, MatterBaseTest, TestStep, async_test_body, - default_matter_test_main) +from matter_testing_support import (ClusterAttributeChangeAccumulator, MatterBaseTest, TestStep, async_test_body, + await_sequence_of_reports, default_matter_test_main) from mobly import asserts class TC_OCC_3_2(MatterBaseTest): - async def read_occ_attribute_expect_success(self, endpoint, attribute): + async def read_occ_attribute_expect_success(self, attribute): cluster = Clusters.Objects.OccupancySensing - return await self.read_single_attribute_check_success(endpoint=endpoint, cluster=cluster, attribute=attribute) - - def _await_sequence_of_reports(self, report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float): - start_time = time.time() - elapsed = 0.0 - time_remaining = timeout_sec - - sequence_idx = 0 - actual_values = [] - - while time_remaining > 0: - expected_value = sequence[sequence_idx] - logging.info(f"Expecting value {expected_value} for attribute {attribute} on endpoint {endpoint_id}") - try: - item: AttributeValue = report_queue.get(block=True, timeout=time_remaining) - - # Track arrival of all values for the given attribute. - if item.endpoint_id == endpoint_id and item.attribute == attribute: - actual_values.append(item.value) - - if item.value == expected_value: - logging.info(f"Got expected attribute change {sequence_idx+1}/{len(sequence)} for attribute {attribute}") - sequence_idx += 1 - else: - asserts.assert_equal(item.value, expected_value, - msg="Did not get expected attribute value in correct sequence.") - - # We are done waiting when we have accumulated all results. - if sequence_idx == len(sequence): - logging.info("Got all attribute changes, done waiting.") - return - except queue.Empty: - # No error, we update timeouts and keep going - pass - - elapsed = time.time() - start_time - time_remaining = timeout_sec - elapsed - - asserts.fail(f"Did not get full sequence {sequence} in {timeout_sec:.1f} seconds. Got {actual_values} before time-out.") + endpoint_id = self.matter_test_config.endpoint + return await self.read_single_attribute_check_success(endpoint=endpoint_id, cluster=cluster, attribute=attribute) def desc_TC_OCC_3_2(self) -> str: return "[TC-OCC-3.2] Subscription Report Verification with server as DUT" @@ -124,22 +83,32 @@ def pics_TC_OCC_3_2(self) -> list[str]: @async_test_body async def test_TC_OCC_3_2(self): - - endpoint = self.user_params.get("endpoint", 1) endpoint_id = self.matter_test_config.endpoint - node_id = self.matter_test_config.dut_node_ids[0] + node_id = self.dut_node_id + dev_ctrl = self.default_controller + post_prompt_settle_delay_seconds = 10.0 cluster = Clusters.Objects.OccupancySensing - attributes = Clusters.OccupancySensing.Attributes - occupancy_sensor_type_bitmap_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.OccupancySensorTypeBitmap) + attributes = cluster.Attributes self.step(1) - attribute_list = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.AttributeList) + + occupancy_sensor_type_bitmap_dut = await self.read_occ_attribute_expect_success(attribute=attributes.OccupancySensorTypeBitmap) + has_type_pir = ((occupancy_sensor_type_bitmap_dut & cluster.Enums.OccupancySensorTypeEnum.kPir) != 0) or \ + ((occupancy_sensor_type_bitmap_dut & cluster.Enums.OccupancySensorTypeEnum.kPIRAndUltrasonic) != 0) + has_type_ultrasonic = ((occupancy_sensor_type_bitmap_dut & cluster.Enums.OccupancySensorTypeEnum.kUltrasonic) != 0) or \ + ((occupancy_sensor_type_bitmap_dut & cluster.Enums.OccupancySensorTypeEnum.kPIRAndUltrasonic) != 0) + has_type_contact = (occupancy_sensor_type_bitmap_dut & cluster.Enums.OccupancySensorTypeEnum.kPhysicalContact) != 0 + + attribute_list = await self.read_occ_attribute_expect_success(attribute=attributes.AttributeList) + has_pir_timing_attrib = attributes.PIROccupiedToUnoccupiedDelay.attribute_id in attribute_list + has_ultrasonic_timing_attrib = attributes.UltrasonicOccupiedToUnoccupiedDelay.attribute_id in attribute_list + has_contact_timing_attrib = attributes.PhysicalContactOccupiedToUnoccupiedDelay.attribute_id in attribute_list self.step(2) # min interval = 0, and max interval = 30 seconds attrib_listener = ClusterAttributeChangeAccumulator(Clusters.Objects.OccupancySensing) - await attrib_listener.start(ChipDeviceCtrl, node_id, endpoint=endpoint_id) + await attrib_listener.start(dev_ctrl, node_id, endpoint=endpoint_id, min_interval_sec=0, max_interval_sec=30) # TODO - Will add Namepiped to assimilate the manual sensor untrigger here self.step("3a") @@ -147,7 +116,7 @@ async def test_TC_OCC_3_2(self): self.step("3b") if attributes.Occupancy.attribute_id in attribute_list: - initial_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.Occupancy) + initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.Occupancy) asserts.assert_equal(initial_dut, 0, "Occupancy attribute is still detected state") # TODO - Will add Namepiped to assimilate the manual sensor trigger here @@ -156,8 +125,8 @@ async def test_TC_OCC_3_2(self): prompt_msg="Type any letter and press ENTER after the sensor occupancy is triggered and its occupancy state changed.") self.step("3d") - self._await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.Occupancy, sequence=[ - 0, 1], timeout_sec=post_prompt_settle_delay_seconds) + await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.Occupancy, sequence=[ + 0, 1], timeout_sec=post_prompt_settle_delay_seconds) self.step("4a") if attributes.HoldTime.attribute_id not in attribute_list: @@ -165,83 +134,73 @@ async def test_TC_OCC_3_2(self): self.skip_all_remaining_steps("4b") self.step("4b") - initial_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.HoldTime) + initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.HoldTime) self.step("4c") - # write a different a HoldTime attibute + # write a different a HoldTime attribute value diff_val = 12 - await ChipDeviceCtrl.WriteAttribute(node_id, [(endpoint, attributes.HoldTime(diff_val))]) + await self.write_single_attribute(attributes.HoldTime(diff_val)) self.step("4d") - self._await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.HoldTime, sequence=[ - initial_dut, diff_val], timeout_sec=post_prompt_settle_delay_seconds) + await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.HoldTime, sequence=[ + initial_dut, diff_val], timeout_sec=post_prompt_settle_delay_seconds) self.step("5a") - if (occupancy_sensor_type_bitmap_dut & Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kPir) == 0: - logging.info("No PIR timing attribute supports. Skip this test cases, 5b, 5c, 5d") + if not has_type_pir or not has_pir_timing_attrib: + logging.info("No PIR timing attribute support. Skip this steps 5b, 5c, 5d") self.skip_step("5b") - self.skip_test("5c") + self.skip_step("5c") self.skip_step("5d") else: self.step("5b") - if attributes.PIROccupiedToUnoccupiedDelay.attribute_id in attribute_list: - initial_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.PIROccupiedToUnoccupiedDelay) - - else: - logging.info("No PIROccupiedToUnoccupiedDelay attribute supports. Terminate this test case") + initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.PIROccupiedToUnoccupiedDelay) self.step("5c") # write the new attribute value diff_val = 11 - await ChipDeviceCtrl.WriteAttribute(node_id, [(endpoint, attributes.PIROccupiedToUnoccupiedDelay(diff_val))]) + await self.write_single_attribute(attributes.PIROccupiedToUnoccupiedDelay(diff_val)) self.step("5d") - self._await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.PIROccupiedToUnoccupiedDelay, sequence=[ - initial_dut, diff_val], timeout_sec=post_prompt_settle_delay_seconds) + await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.PIROccupiedToUnoccupiedDelay, sequence=[ + initial_dut, diff_val], timeout_sec=post_prompt_settle_delay_seconds) self.step("6a") - if (occupancy_sensor_type_bitmap_dut & Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kUltrasonic) == 0: - logging.info("No Ultrasonic timing attribute supports. Skip this test cases, 6b, 6c, 6d") + if not has_type_ultrasonic or not has_ultrasonic_timing_attrib: + logging.info("No Ultrasonic timing attribute supports. Skip steps 6b, 6c, 6d") self.skip_step("6b") - self.skip_test("6c") + self.skip_step("6c") self.skip_step("6d") else: self.step("6b") - if attributes.UltrasonicOccupiedToUnoccupiedDelay.attribute_id in attribute_list: - initial_dut = await self.read_occ_attribute_expect_success(endpoint=endpoint, attribute=attributes.UltrasonicOccupiedToUnoccupiedDelay) - - else: - logging.info("No UltrasonicOccupiedToUnoccupiedDelay attribute supports. Skip this test case") + initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.UltrasonicOccupiedToUnoccupiedDelay) self.step("6c") # write the new attribute value diff_val = 14 - await ChipDeviceCtrl.WriteAttribute(node_id, [(endpoint, attributes.UltrasonicOccupiedToUnoccupiedDelay(diff_val))]) + await self.write_single_attribute(attributes.UltrasonicOccupiedToUnoccupiedDelay(diff_val)) self.step("6d") - self._await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.UltrasonicOccupiedToUnoccupiedDelay, sequence=[ - initial_dut, diff_val], timeout_sec=post_prompt_settle_delay_seconds) + await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.UltrasonicOccupiedToUnoccupiedDelay, sequence=[ + initial_dut, diff_val], timeout_sec=post_prompt_settle_delay_seconds) self.step("7a") - if (occupancy_sensor_type_bitmap_dut & Clusters.OccupancySensing.Enums.OccupancySensorTypeEnum.kPhysicalContact) == 0: + if not has_type_contact or not has_contact_timing_attrib: logging.info("No Physical contact timing attribute supports. Skip this test case") - self.skip_all_remaining_steps("7b") - - self.step("7b") - if attributes.PhysicalContactOccupiedToUnoccupiedDelay.attribute_id in attribute_list: - initial_dut = await self.t_success(endpoint=endpoint, attribute=attributes.PhysicalContactOccupiedToUnoccupiedDelay) - + self.skip_step("7b") + self.skip_step("7c") + self.skip_step("7d") else: - logging.info("No PhysicalContactOccupiedToUnoccupiedDelay attribute supports. Skip this test case") + self.step("7b") + initial_dut = await self.read_occ_attribute_expect_success(attribute=attributes.PhysicalContactOccupiedToUnoccupiedDelay) - self.step("7c") - # write the new attribute value - diff_val = 9 - await ChipDeviceCtrl.WriteAttribute(node_id, [(endpoint, attributes.PhysicalContactOccupiedToUnoccupiedDelay(diff_val))]) + self.step("7c") + # write the new attribute value + diff_val = 9 + await self.write_single_attribute(attributes.PhysicalContactOccupiedToUnoccupiedDelay(diff_val)) - self.step("7d") - self._await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.PhysicalContactOccupiedToUnoccupiedDelay, sequence=[ - initial_dut, diff_val], timeout_sec=post_prompt_settle_delay_seconds) + self.step("7d") + await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.PhysicalContactOccupiedToUnoccupiedDelay, sequence=[ + initial_dut, diff_val], timeout_sec=post_prompt_settle_delay_seconds) if __name__ == "__main__": diff --git a/src/python_testing/TC_SWTCH.py b/src/python_testing/TC_SWTCH.py index 08bf23d91c30d4..e2ef307973003b 100644 --- a/src/python_testing/TC_SWTCH.py +++ b/src/python_testing/TC_SWTCH.py @@ -36,10 +36,10 @@ import chip.clusters as Clusters import test_plan_support from chip.clusters import ClusterObjects as ClusterObjects -from chip.clusters.Attribute import EventReadResult, TypedAttributePath +from chip.clusters.Attribute import EventReadResult from chip.tlv import uint -from matter_testing_support import (AttributeValue, ClusterAttributeChangeAccumulator, EventChangeCallback, MatterBaseTest, - TestStep, default_matter_test_main, has_feature, per_endpoint_test) +from matter_testing_support import (ClusterAttributeChangeAccumulator, EventChangeCallback, MatterBaseTest, TestStep, + await_sequence_of_reports, default_matter_test_main, has_feature, per_endpoint_test) from mobly import asserts logger = logging.getLogger(__name__) @@ -170,44 +170,6 @@ def _ask_for_release(self): else: time.sleep(self.keep_pressed_delay/1000) - def _await_sequence_of_reports(self, report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float): - start_time = time.time() - elapsed = 0.0 - time_remaining = timeout_sec - - sequence_idx = 0 - actual_values = [] - - while time_remaining > 0: - expected_value = sequence[sequence_idx] - logging.info(f"Expecting value {expected_value} for attribute {attribute} on endpoint {endpoint_id}") - try: - item: AttributeValue = report_queue.get(block=True, timeout=time_remaining) - - # Track arrival of all values for the given attribute. - if item.endpoint_id == endpoint_id and item.attribute == attribute: - actual_values.append(item.value) - - if item.value == expected_value: - logging.info(f"Got expected attribute change {sequence_idx+1}/{len(sequence)} for attribute {attribute}") - sequence_idx += 1 - else: - asserts.assert_equal(item.value, expected_value, - msg="Did not get expected attribute value in correct sequence.") - - # We are done waiting when we have accumulated all results. - if sequence_idx == len(sequence): - logging.info("Got all attribute changes, done waiting.") - return - except queue.Empty: - # No error, we update timeouts and keep going - pass - - elapsed = time.time() - start_time - time_remaining = timeout_sec - elapsed - - asserts.fail(f"Did not get full sequence {sequence} in {timeout_sec:.1f} seconds. Got {actual_values} before time-out.") - def _await_sequence_of_events(self, event_queue: queue.Queue, endpoint_id: int, sequence: list[ClusterObjects.ClusterEvent], timeout_sec: float): start_time = time.time() elapsed = 0.0 @@ -511,8 +473,8 @@ async def test_TC_SWTCH_2_4(self): # - TH expects report of CurrentPosition 1, followed by a report of Current Position 0. logging.info( f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for CurrentPosition to go {switch_pressed_position}, then 0.") - self._await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.CurrentPosition, sequence=[ - switch_pressed_position, 0], timeout_sec=post_prompt_settle_delay_seconds) + await_sequence_of_reports(report_queue=attrib_listener.attribute_queue, endpoint_id=endpoint_id, attribute=cluster.Attributes.CurrentPosition, sequence=[ + switch_pressed_position, 0], timeout_sec=post_prompt_settle_delay_seconds) # - TH expects at least InitialPress with NewPosition = 1 logging.info(f"Starting to wait for {post_prompt_settle_delay_seconds:.1f} seconds for InitialPress event.") diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 273ffe94b5f821..8275b8f0f83326 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -27,6 +27,7 @@ import random import re import sys +import time import typing import uuid from binascii import hexlify, unhexlify @@ -331,6 +332,58 @@ def wait_for_report(self): asserts.fail("[AttributeChangeCallback] Attribute {expected_attribute} not found in returned report") +def await_sequence_of_reports(report_queue: queue.Queue, endpoint_id: int, attribute: TypedAttributePath, sequence: list[Any], timeout_sec: float): + """Given a queue.Queue hooked-up to an attribute change accumulator, await a given expected sequence of attribute reports. + + Args: + - report_queue: the queue that receives all the reports. + - endpoint_id: endpoint ID to match for reports to check. + - attribute: attribute to match for reports to check. + - sequence: list of attribute values in order that are expected. + - timeout_sec: number of seconds to wait for. + + This will fail current Mobly test with assertion failure if the data is not as expected in order. + + Returns nothing on success so the test can go on. + """ + start_time = time.time() + elapsed = 0.0 + time_remaining = timeout_sec + + sequence_idx = 0 + actual_values = [] + + while time_remaining > 0: + expected_value = sequence[sequence_idx] + logging.info(f"Expecting value {expected_value} for attribute {attribute} on endpoint {endpoint_id}") + try: + item: AttributeValue = report_queue.get(block=True, timeout=time_remaining) + + # Track arrival of all values for the given attribute. + if item.endpoint_id == endpoint_id and item.attribute == attribute: + actual_values.append(item.value) + + if item.value == expected_value: + logging.info(f"Got expected attribute change {sequence_idx+1}/{len(sequence)} for attribute {attribute}") + sequence_idx += 1 + else: + asserts.assert_equal(item.value, expected_value, + msg="Did not get expected attribute value in correct sequence.") + + # We are done waiting when we have accumulated all results. + if sequence_idx == len(sequence): + logging.info("Got all attribute changes, done waiting.") + return + except queue.Empty: + # No error, we update timeouts and keep going + pass + + elapsed = time.time() - start_time + time_remaining = timeout_sec - elapsed + + asserts.fail(f"Did not get full sequence {sequence} in {timeout_sec:.1f} seconds. Got {actual_values} before time-out.") + + @dataclass class AttributeValue: endpoint_id: int @@ -360,7 +413,7 @@ async def start(self, dev_ctrl, node_id: int, endpoint: int, fabric_filtered: bo self._subscription = await dev_ctrl.ReadAttribute( nodeid=node_id, attributes=[(endpoint, self._expected_cluster)], - reportInterval=(min_interval_sec, max_interval_sec), + reportInterval=(int(min_interval_sec), int(max_interval_sec)), fabricFiltered=fabric_filtered, keepSubscriptions=True ) @@ -977,6 +1030,25 @@ async def read_single_attribute_expect_error( return attr_ret + async def write_single_attribute(self, attribute_value: object, endpoint_id: int = None, expect_success: bool = True) -> Status: + """Write a single `attribute_value` on a given `endpoint_id` and assert on failure. + + If `endpoint_id` is None, the default DUT endpoint for the test is selected. + + If `expect_success` is True, a test assertion fails on error status codes + + Status code is returned. + """ + dev_ctrl = self.default_controller + node_id = self.dut_node_id + endpoint = self.matter_test_config.endpoint if endpoint_id is None else endpoint_id + + write_result = await dev_ctrl.WriteAttribute(node_id, [(endpoint, attribute_value)]) + if expect_success: + asserts.assert_equal(write_result[0].Status, Status.Success, + f"Expected write success for write to attribute {attribute_value} on endpoint {endpoint}") + return write_result[0].Status + async def send_single_cmd( self, cmd: Clusters.ClusterObjects.ClusterCommand, dev_ctrl: ChipDeviceCtrl = None, node_id: int = None, endpoint: int = None,