From 1182191f09c3b13ac08eff56b13dacbd1396f881 Mon Sep 17 00:00:00 2001 From: C Freeman Date: Mon, 28 Aug 2023 11:12:33 -0400 Subject: [PATCH] TC-SM-1.2: Implement parts list topology test (#28871) * TC-SM-1.2: Implement parts list topology test * Address review comments --- .../TC_DeviceBasicComposition.py | 137 +++++++++++++++++- .../TestMatterTestingSupport.py | 112 ++++++++++++++ 2 files changed, 247 insertions(+), 2 deletions(-) diff --git a/src/python_testing/TC_DeviceBasicComposition.py b/src/python_testing/TC_DeviceBasicComposition.py index 67f5bc4f952b8c..40ae82be4bad70 100644 --- a/src/python_testing/TC_DeviceBasicComposition.py +++ b/src/python_testing/TC_DeviceBasicComposition.py @@ -157,6 +157,81 @@ def check_non_empty_list_of_ints_in_range(min_value: int, max_value: int, max_si return check_list_of_ints_in_range(min_value, max_value, min_size=1, max_size=max_size, allow_null=allow_null) +def separate_endpoint_types(endpoint_dict: dict[int, Any]) -> tuple[list[int], list[int]]: + """Returns a tuple containing the list of flat endpoints and a list of tree endpoints""" + flat = [] + tree = [] + for endpoint_id, endpoint in endpoint_dict.items(): + if endpoint_id == 0: + continue + aggregator_id = 0x000e + device_types = [d.deviceType for d in endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList]] + if aggregator_id in device_types: + flat.append(endpoint_id) + else: + tree.append(endpoint_id) + return (flat, tree) + + +def get_all_children(endpoint_id, endpoint_dict: dict[int, Any]) -> set[int]: + """Returns all the children (include subchildren) of the given endpoint + This assumes we've already checked that there are no cycles, so we can do the dumb things and just trace the tree + """ + children = set() + + def add_children(endpoint_id, children): + immediate_children = endpoint_dict[endpoint_id][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList] + if not immediate_children: + return + children.update(set(immediate_children)) + for child in immediate_children: + add_children(child, children) + + add_children(endpoint_id, children) + return children + + +def find_tree_roots(tree_endpoints: list[int], endpoint_dict: dict[int, Any]) -> set[int]: + """Returns a set of all the endpoints in tree_endpoints that are roots for a tree (not include singletons)""" + tree_roots = set() + + def find_tree_root(current_id): + for endpoint_id, endpoint in endpoint_dict.items(): + if endpoint_id not in tree_endpoints: + continue + if current_id in endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]: + # this is not the root, move up + return find_tree_root(endpoint_id) + return current_id + + for endpoint_id in tree_endpoints: + root = find_tree_root(endpoint_id) + if root != endpoint_id: + tree_roots.add(root) + return tree_roots + + +def parts_list_cycles(tree_endpoints: list[int], endpoint_dict: dict[int, Any]) -> list[int]: + """Returns a list of all the endpoints in the tree_endpoints list that contain cycles""" + def parts_list_cycle_detect(visited: set, current_id: int) -> bool: + if current_id in visited: + return True + visited.add(current_id) + for child in endpoint_dict[current_id][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]: + child_has_cycles = parts_list_cycle_detect(visited, child) + if child_has_cycles: + return True + return False + + cycles = [] + # This is quick enough that we can do all the endpoints wihtout searching for the roots + for endpoint_id in tree_endpoints: + visited = set() + if parts_list_cycle_detect(visited, endpoint_id): + cycles.append(endpoint_id) + return cycles + + class TC_DeviceBasicComposition(MatterBaseTest): @async_test_body async def setup_class(self): @@ -425,8 +500,66 @@ def test_all_endpoints_have_valid_composition(self): asserts.skip( "TODO: Make a test that verifies each endpoint has valid set of device types, and that the device type conformance is respected for each") - def test_topology_is_valid(self): - asserts.skip("TODO: Make a test that verifies each endpoint only lists direct descendants, except Root Node and Aggregator endpoints that list all their descendants") + def test_TC_SM_1_2(self): + self.print_step(1, "Wildcard read of device - already done") + + self.print_step(2, "Verify the Descriptor cluster PartsList on endpoint 0 exactly lists all the other (non-0) endpoints on the DUT") + parts_list_0 = self.endpoints[0][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList] + cluster_id = Clusters.Descriptor.id + attribute_id = Clusters.Descriptor.Attributes.PartsList.attribute_id + location = AttributePathLocation(endpoint_id=0, cluster_id=cluster_id, attribute_id=attribute_id) + if len(self.endpoints.keys()) != len(set(self.endpoints.keys())): + self.record_error(self.get_test_name(), location=location, + problem='duplicate endpoint ids found in the returned data', spec_location="PartsList Attribute") + self.fail_current_test() + + if len(parts_list_0) != len(set(parts_list_0)): + self.record_error(self.get_test_name(), location=location, + problem='Duplicate endpoint ids found in the parts list on ep0', spec_location="PartsList Attribute") + self.fail_current_test() + + expected_parts = set(self.endpoints.keys()) + expected_parts.remove(0) + if set(parts_list_0) != expected_parts: + self.record_error(self.get_test_name(), location=location, + problem='EP0 Descriptor parts list does not match the set of returned endpoints', spec_location="PartsList Attribute") + self.fail_current_test() + + self.print_step( + 3, "For each endpoint on the DUT (including EP 0), verify the PartsList in the Descriptor cluster on that endpoint does not include itself") + for endpoint_id, endpoint in self.endpoints.items(): + if endpoint_id in endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]: + location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id) + self.record_error(self.get_test_name(), location=location, + problem=f"Endpoint {endpoint_id} parts list includes itself", spec_location="PartsList Attribute") + self.fail_current_test() + + self.print_step(4, "Separate endpoints into flat and tree style") + flat, tree = separate_endpoint_types(self.endpoints) + + self.print_step(5, "Check for cycles in the tree endpoints") + cycles = parts_list_cycles(tree, self.endpoints) + if len(cycles) != 0: + for id in cycles: + location = AttributePathLocation(endpoint_id=id, cluster_id=cluster_id, attribute_id=attribute_id) + self.record_error(self.get_test_name(), location=location, + problem=f"Endpoint {id} parts list includes a cycle", spec_location="PartsList Attribute") + self.fail_current_test() + + self.print_step(6, "Check flat lists include all sub ids") + ok = True + for endpoint_id in flat: + # ensure that every sub-id in the parts list is included in the parent + sub_children = [] + for child in self.endpoints[endpoint_id][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]: + sub_children.update(get_all_children(child)) + if not all(item in sub_children for item in self.endpoints[endpoint_id][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]): + location = AttributePathLocation(endpoint_id=endpoint_id, cluster_id=cluster_id, attribute_id=attribute_id) + self.record_error(self.get_test_name(), location=location, + problem='Flat parts list does not include all the sub-parts', spec_location='Endpoint composition') + ok = False + if not ok: + self.fail_current_test() def test_TC_PS_3_1(self): BRIDGED_NODE_DEVICE_TYPE_ID = 0x13 diff --git a/src/python_testing/TestMatterTestingSupport.py b/src/python_testing/TestMatterTestingSupport.py index ce9f8b88403786..9e63cb9ae58206 100644 --- a/src/python_testing/TestMatterTestingSupport.py +++ b/src/python_testing/TestMatterTestingSupport.py @@ -25,6 +25,7 @@ from matter_testing_support import (MatterBaseTest, async_test_body, compare_time, default_matter_test_main, get_wait_seconds_from_set_time, parse_pics, type_matches, utc_time_in_matter_epoch) from mobly import asserts, signals +from TC_DeviceBasicComposition import find_tree_roots, get_all_children, parts_list_cycles, separate_endpoint_types def get_raw_type_list(): @@ -200,6 +201,117 @@ def test_get_wait_time_function(self): secs = get_wait_seconds_from_set_time(th_utc, 15) asserts.assert_equal(secs, 14) + def create_example_topology(self): + """Creates a limited example of a wildcard read that contains only the descriptor cluster parts list and device types""" + def create_endpoint(parts_list: list[uint], device_types: list[uint]): + endpoint = {} + device_types_structs = [] + for device_type in device_types: + device_types_structs.append(Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=device_type, revision=1)) + endpoint[Clusters.Descriptor] = {Clusters.Descriptor.Attributes.PartsList: parts_list, + Clusters.Descriptor.Attributes.DeviceTypeList: device_types_structs} + return endpoint + + endpoints = {} + # Root node is 0 + # We have two trees in the root node and two trees in the aggregator + # 2 - 1 + # - 3 - 4 + # - 5 - 9 + # 6 - 7 + # - 8 + # 10 + # 11 (aggregator - all remaining are under it) + # 13 - 12 + # - 14 - 15 + # - 16 + # 17 - 18 + # - 19 + # 20 + # 21 + endpoints[0] = create_endpoint([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21], [22]) + endpoints[1] = create_endpoint([], [1]) # Just using a random device id, as long as it's not the aggregator it's fine + endpoints[2] = create_endpoint([1, 3], [1]) + endpoints[3] = create_endpoint([4, 5], [1]) + endpoints[4] = create_endpoint([], [1]) + endpoints[5] = create_endpoint([9], [1]) + endpoints[6] = create_endpoint([7, 8], [1]) + endpoints[7] = create_endpoint([], [1]) + endpoints[8] = create_endpoint([], [1]) + endpoints[9] = create_endpoint([], [1]) + endpoints[10] = create_endpoint([], [1]) + endpoints[11] = create_endpoint([12, 13, 14, 15, 16, 17, 18, 19, 20, 21], [0xe]) # aggregator device type + endpoints[12] = create_endpoint([], [1]) + endpoints[13] = create_endpoint([12, 14], [1]) + endpoints[14] = create_endpoint([15, 16], [1]) + endpoints[15] = create_endpoint([], [1]) + endpoints[16] = create_endpoint([], [1]) + endpoints[17] = create_endpoint([18, 19], [1]) + endpoints[18] = create_endpoint([], [1]) + endpoints[19] = create_endpoint([], [1]) + endpoints[20] = create_endpoint([], [1]) + endpoints[21] = create_endpoint([], [1]) + + return endpoints + + def test_cycle_detection_and_splitting(self): + # Example topology has no cycles + endpoints = self.create_example_topology() + flat, tree = separate_endpoint_types(endpoints) + asserts.assert_equal(len(flat), len(set(flat)), "Duplicate endpoints found in flat list") + asserts.assert_equal(len(tree), len(set(tree)), "Duplicate endpoints found in tree list") + asserts.assert_equal(set(flat), {11}, "Aggregator node not found in list") + asserts.assert_equal(set(tree), {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21}) + + cycles = parts_list_cycles(tree, endpoints) + asserts.assert_equal(len(cycles), 0, "Found cycles in the example tree") + + # Add in several cycles and make sure we detect them all + # ep 10 refers back to itself (0 level cycle) on 10 + endpoints[10][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(10) + cycles = parts_list_cycles(tree, endpoints) + asserts.assert_equal(cycles, [10]) + endpoints[10][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].remove(10) + print(endpoints[10]) + + # ep 4 refers back to 3 (1 level cycle) on 3 (will include 2, 3 and 4 in the cycles list) + endpoints[4][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(3) + cycles = parts_list_cycles(tree, endpoints) + asserts.assert_equal(cycles, [2, 3, 4]) + endpoints[4][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].remove(3) + + # ep 16 refers back to 13 (2 level cycle) on 13 (will include 13, 14 and 16 in cycles) + endpoints[16][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(13) + cycles = parts_list_cycles(tree, endpoints) + asserts.assert_equal(cycles, [13, 14, 16]) + endpoints[16][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].remove(13) + + # ep 9 refers back to 2 (3 level cycle) on 2 (includes 2, 3, 5, and 9) + endpoints[9][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(2) + cycles = parts_list_cycles(tree, endpoints) + asserts.assert_equal(cycles, [2, 3, 5, 9]) + endpoints[9][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].remove(2) + + # make sure we get them all + endpoints[10][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(10) + endpoints[4][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(3) + endpoints[16][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(13) + endpoints[9][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList].append(2) + cycles = parts_list_cycles(tree, endpoints) + asserts.assert_equal(cycles, [2, 3, 4, 5, 9, 10, 13, 14, 16]) + + def test_get_all_children(self): + endpoints = self.create_example_topology() + asserts.assert_equal(get_all_children(2, endpoints), {1, 3, 4, 5, 9}, "Child list for ep2 is incorrect") + asserts.assert_equal(get_all_children(6, endpoints), {7, 8}, "Child list for ep6 is incorrect") + asserts.assert_equal(get_all_children(13, endpoints), {12, 14, 15, 16}, "Child list for ep13 is incorrect") + asserts.assert_equal(get_all_children(17, endpoints), {18, 19}, "Child list for ep17 is incorrect") + + def test_get_tree_roots(self): + endpoints = self.create_example_topology() + _, tree = separate_endpoint_types(endpoints) + asserts.assert_equal(find_tree_roots(tree, endpoints), {2, 6, 13, 17}, "Incorrect tree root list") + if __name__ == "__main__": default_matter_test_main()