Skip to content

Commit

Permalink
Update TC_RR_1_1.py to match test spec changes.
Browse files Browse the repository at this point in the history
Step 1 was updated to remove any existing extra fabrics.
Step 12 varied from the spec for the verified value.
  • Loading branch information
cletnick committed Jan 26, 2023
1 parent 09ea936 commit 9574707
Showing 1 changed file with 60 additions and 27 deletions.
87 changes: 60 additions & 27 deletions src/python_testing/TC_RR_1_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async def test_TC_RR_1_1(self):
capability_minima = await self.read_single_attribute(dev_ctrl, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.BasicInformation.Attributes.CapabilityMinima)
asserts.assert_greater_equal(capability_minima.caseSessionsPerFabric, 3)

# Step 1: Commission 5 fabrics with maximized NOC chains
# Step 1: Commission 5 fabrics with maximized NOC chains. 1a and 1b have already been completed at this time.
logging.info(f"Step 1: use existing fabric to configure new fabrics so that total is {num_fabrics_to_commission} fabrics")

# Generate Node IDs for subsequent controllers start at 200, follow 200, 300, ...
Expand All @@ -132,7 +132,37 @@ async def test_TC_RR_1_1(self):
controller.name = all_names.pop(0)
client_list.extend(new_controllers)

# Prepare clients for subsequent fabrics
# Step 1c - Ensure there are no leftover fabrics from another process.
commissioned_fabrics: int = await self.read_single_attribute(dev_ctrl, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.CommissionedFabrics)

# Insert a fabric to self-test the next step without having to add a dedicated test run.
if commissioned_fabrics == 1:
logging.info("Commissioning fabric for TH test.")
new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority()
new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2)

new_admin_ctrl = new_fabric_admin.NewController(nodeId=dev_ctrl.nodeId, catTags=[0x0001_0001])
new_admin_ctrl.name = "THTF"
await CommissioningBuildingBlocks.AddNOCForNewFabricFromExisting(commissionerDevCtrl=dev_ctrl, newFabricDevCtrl=new_admin_ctrl, existingNodeId=self.dut_node_id, newNodeId=self.dut_node_id)

commissioned_fabrics = await self.read_single_attribute(dev_ctrl, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.CommissionedFabrics)
asserts.assert_not_equal(commissioned_fabrics, 1, "TH Error: failed to add fabric for testing TH.")

# Step 1c - perform removal.
if commissioned_fabrics > 1:
logging.info("Removing extra fabrics from device.")
fabrics: List[Clusters.OperationalCredentials.Structs.FabricDescriptorStruct] = await self.read_single_attribute(dev_ctrl, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.Fabrics)
for fabric in fabrics:
if fabric.fabricID == dev_ctrl.fabricId():
continue

# This is not the initial client's fabric, so remove it.
await dev_ctrl.SendCommand(self.dut_node_id, 0, Clusters.OperationalCredentials.Commands.RemoveFabric(fabricIndex=fabric.fabricIndex))

commissioned_fabrics = await self.read_single_attribute(dev_ctrl, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.CommissionedFabrics)
asserts.assert_not_equal(commissioned_fabrics, 1, "Failed to remove extra fabrics from DUT.")

# Prepare clients for subsequent fabrics (step 1d)
for i in range(num_fabrics_to_commission - 1):
admin_index = 2 + i
logging.info("Commissioning fabric %d/%d" % (admin_index, num_fabrics_to_commission))
Expand All @@ -155,6 +185,9 @@ async def test_TC_RR_1_1(self):
asserts.assert_equal(len(client_list), num_fabrics_to_commission *
num_controllers_per_fabric, "Must have the right number of clients")

commissioned_fabrics = await self.read_single_attribute(dev_ctrl, node_id=self.dut_node_id, endpoint=0, attribute=Clusters.OperationalCredentials.Attributes.CommissionedFabrics)
asserts.assert_equal(commissioned_fabrics, num_fabrics_to_commission, "Must have the right number of fabrics commissioned")

client_by_name = {client.name: client for client in client_list}
local_session_id_by_client_name = {client.name: client.GetConnectedDeviceSync(
self.dut_node_id).localSessionId for client in client_list}
Expand Down Expand Up @@ -469,32 +502,32 @@ async def fill_and_validate_group_key_sets(self,
for fabric_idx in range(fabrics):
client: Any = clients[fabric_idx]

# Read, skip the IPK key set.
for group_key_cluster_idx in range(1, keys_per_fabric):
group_key_list_idx: int = group_key_cluster_idx - 1
logging.info("Step 12: Reading back group keys on fabric %d" % (fabric_idx+1))
resp = await client.SendCommand(self.dut_node_id, 0,
Clusters.GroupKeyManagement.Commands.KeySetReadAllIndices(),
responseType=Clusters.GroupKeyManagement.Commands.KeySetReadAllIndicesResponse)

group_key_ids: List[int] = resp.groupKeySetIDs
asserts.assert_equal(keys_per_fabric, len(group_key_ids),
"KeySetReadAllIndicesResponse length does not match the key support indicated: %d." % (keys_per_fabric))

known_entries: int = 0
unknown_entries: int = 0
for key_sset in group_keys[fabric_idx]:
if key_sset not in group_key_ids:
unknown_entries += 1
continue

known_entries += 1

asserts.assert_equal(unknown_entries, 1,
"Read more than 1 key ID that did not match written values after IPK (only expected 1 for IPK).")

asserts.assert_equal(unknown_entries + known_entries, keys_per_fabric,
"Key IDs read did not match written qauntity.")

logging.info("Step 12: Reading back group key on fabric %d at index ''%d'" % (fabric_idx+1, group_key_cluster_idx))
key_set = await client.SendCommand(self.dut_node_id, 0,
Clusters.GroupKeyManagement.Commands.KeySetRead(
group_keys[fabric_idx][group_key_list_idx].groupKeySetID),
responseType=Clusters.GroupKeyManagement.Commands.KeySetReadResponse)

asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].groupKeySetID,
key_set.groupKeySet.groupKeySetID, "Received incorrect key set.")
asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].groupKeySecurityPolicy,
key_set.groupKeySet.groupKeySecurityPolicy)
asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].epochStartTime0,
key_set.groupKeySet.epochStartTime0)
asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].epochStartTime1,
key_set.groupKeySet.epochStartTime1)
asserts.assert_equal(group_keys[fabric_idx][group_key_list_idx].epochStartTime2,
key_set.groupKeySet.epochStartTime2)
asserts.assert_equal(NullValue, key_set.groupKeySet.epochKey0,
"Value for epochKey0 included in KeySetReadResponse. It must not be.")
asserts.assert_equal(NullValue, key_set.groupKeySet.epochKey1,
"Value for epochKey1 included in KeySetReadResponse. It must not be.")
asserts.assert_equal(NullValue, key_set.groupKeySet.epochKey2,
"Value for epochKey2 included in KeySetReadResponse. It must not be.")
asserts.assert_equal(len(group_key_ids), len(set(group_key_ids)),
"Key IDs read contain duplicates.")

return group_keys

Expand Down

0 comments on commit 9574707

Please sign in to comment.