Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for > 1 DUT for TC-DA-1.7 #25928

Merged
merged 1 commit into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 48 additions & 29 deletions src/python_testing/TC_DA_1_7.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,21 @@ def extract_akid(cert: Certificate) -> Optional[bytes]:
class TC_DA_1_7(MatterBaseTest):
@async_test_body
async def test_TC_DA_1_7(self):
# For real tests, we require more than one DUT
# On the CI, this doesn't make sense to do since all the examples use the same DAC
# To specify more than 1 DUT, use a list of discriminators and passcodes
allow_sdk_dac = self.user_params.get("allow_sdk_dac", False)
if allow_sdk_dac:
asserts.assert_equal(len(self.matter_test_config.discriminator), 1, "Only one device can be tested with SDK DAC")
if not allow_sdk_dac:
asserts.assert_equal(len(self.matter_test_config.discriminator), 2, "This test requires 2 DUTs")
pk = []
for i in range(len(self.matter_test_config.dut_node_id)):
pk.append(await self.single_DUT(i, self.matter_test_config.dut_node_id[i]))
Comment on lines +78 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for idx, node_id in enumerate(self.matter_test_config.dut_node_id):


asserts.assert_equal(len(pk), len(set(pk)), "Found matching public keys in different DUTs")

async def single_DUT(self, dut_index: int, dut_node_id: int) -> bytes:
# Option to allow SDK roots (skip step 4 check 2)
allow_sdk_dac = self.user_params.get("allow_sdk_dac", False)

Expand All @@ -74,52 +89,56 @@ async def test_TC_DA_1_7(self):
paa_by_skid = load_all_paa(conf.paa_trust_store_path)
logging.info("Found %d PAAs" % len(paa_by_skid))

logging.info("Step 1: Commissioning, already done")
logging.info("DUT {} Step 1: Commissioning, already done".format(dut_index))
dev_ctrl = self.default_controller

logging.info("Step 2: Get PAI of DUT1 with certificate chain request")
result = await dev_ctrl.SendCommand(self.dut_node_id, 0,
logging.info("DUT {} Step 2: Get PAI of DUT1 with certificate chain request".format(dut_index))
result = await dev_ctrl.SendCommand(dut_node_id, 0,
Clusters.OperationalCredentials.Commands.CertificateChainRequest(2))
pai_1 = result.certificate
asserts.assert_less_equal(len(pai_1), 600, "PAI cert must be at most 600 bytes")
self.record_data({"pai_1": hex_from_bytes(pai_1)})
pai = result.certificate
asserts.assert_less_equal(len(pai), 600, "PAI cert must be at most 600 bytes")
key = 'pai_{}'.format(dut_index)
self.record_data({key: hex_from_bytes(pai)})

logging.info("Step 3: Get DAC of DUT1 with certificate chain request")
result = await dev_ctrl.SendCommand(self.dut_node_id, 0,
logging.info("DUT {} Step 3: Get DAC of DUT1 with certificate chain request".format(dut_index))
result = await dev_ctrl.SendCommand(dut_node_id, 0,
Clusters.OperationalCredentials.Commands.CertificateChainRequest(1))
dac_1 = result.certificate
asserts.assert_less_equal(len(dac_1), 600, "DAC cert must be at most 600 bytes")
self.record_data({"dac_1": hex_from_bytes(dac_1)})

logging.info("Step 4 check 1: Ensure PAI's AKID matches a PAA and signature is valid")
pai1_cert = load_der_x509_certificate(pai_1)
pai1_akid = extract_akid(pai1_cert)
if pai1_akid not in paa_by_skid:
asserts.fail("DUT1's PAI (%s) not matched in PAA trust store" % hex_from_bytes(pai1_akid))

filename, paa_cert = paa_by_skid[pai1_akid]
dac = result.certificate
asserts.assert_less_equal(len(dac), 600, "DAC cert must be at most 600 bytes")
key = 'dac_{}'.format(dut_index)
self.record_data({key: hex_from_bytes(dac)})

logging.info("DUT {} Step 4 check 1: Ensure PAI's AKID matches a PAA and signature is valid".format(dut_index))
pai_cert = load_der_x509_certificate(pai)
pai_akid = extract_akid(pai_cert)
if pai_akid not in paa_by_skid:
asserts.fail("DUT %d PAI (%s) not matched in PAA trust store" % (dut_index, hex_from_bytes(pai_akid)))

filename, paa_cert = paa_by_skid[pai_akid]
logging.info("Matched PAA file %s, subject: %s" % (filename, paa_cert.subject))
public_key = paa_cert.public_key()

try:
public_key.verify(signature=pai1_cert.signature, data=pai1_cert.tbs_certificate_bytes,
public_key.verify(signature=pai_cert.signature, data=pai_cert.tbs_certificate_bytes,
signature_algorithm=ec.ECDSA(hashes.SHA256()))
except InvalidSignature as e:
asserts.fail("Failed to verify PAI signature against PAA public key: %s" % str(e))
asserts.fail("DUT %d: Failed to verify PAI signature against PAA public key: %s" % (dut_index, str(e)))
logging.info("Validated PAI signature against PAA")

logging.info("Step 4 check 2: Verify PAI AKID not in denylist of SDK PAIs")
logging.info("DUT {} Step 4 check 2: Verify PAI AKID not in denylist of SDK PAIs".format(dut_index))
if allow_sdk_dac:
logging.warn("===> TEST STEP SKIPPED: Allowing SDK DACs!")
else:
for candidate in FORBIDDEN_AKID:
asserts.assert_not_equal(hex_from_bytes(pai1_akid), hex_from_bytes(candidate), "PAI AKID must not be in denylist")

logging.info("Step 5: Extract subject public key of DAC and save")
dac1_cert = load_der_x509_certificate(dac_1)
pk_1 = dac1_cert.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.UncompressedPoint)
logging.info("Subject public key pk_1: %s" % hex_from_bytes(pk_1))
self.record_data({"pk_1": hex_from_bytes(pk_1)})
asserts.assert_not_equal(hex_from_bytes(pai_akid), hex_from_bytes(candidate), "PAI AKID must not be in denylist")

logging.info("DUT {} Step 5: Extract subject public key of DAC and save".format(dut_index))
dac_cert = load_der_x509_certificate(dac)
pk = dac_cert.public_key().public_bytes(encoding=Encoding.X962, format=PublicFormat.UncompressedPoint)
logging.info("Subject public key pk: %s" % hex_from_bytes(pk))
key = 'pk_{}'.format(dut_index)
self.record_data({key: hex_from_bytes(pk)})
return pk


if __name__ == "__main__":
Expand Down
73 changes: 48 additions & 25 deletions src/python_testing/matter_testing_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ class MatterTestConfig:
tests: List[str] = field(default_factory=list)

commissioning_method: str = None
discriminator: int = None
setup_passcode: int = None
discriminator: List[int] = None
setup_passcode: List[int] = None
commissionee_ip_address_just_for_testing: str = None
maximize_cert_chains: bool = False

Expand All @@ -145,7 +145,7 @@ class MatterTestConfig:
thread_operational_dataset: str = None

# Node ID for basic DUT
dut_node_id: int = _DEFAULT_DUT_NODE_ID
dut_node_id: List[int] = None
# Node ID to use for controller/commissioner
controller_node_id: int = _DEFAULT_CONTROLLER_NODE_ID
# CAT Tags for default controller/commissioner
Expand Down Expand Up @@ -264,7 +264,7 @@ def certificate_authority_manager(self) -> chip.CertificateAuthority.Certificate

@property
def dut_node_id(self) -> int:
return self.matter_test_config.dut_node_id
return self.matter_test_config.dut_node_id[0]

async def read_single_attribute(
self, dev_ctrl: ChipDeviceCtrl, node_id: int, endpoint: int, attribute: object, fabricFiltered: bool = True) -> object:
Expand Down Expand Up @@ -489,6 +489,28 @@ def populate_commissioning_args(args: argparse.Namespace, config: MatterTestConf
print("error: Cannot have both --qr-code and --manual-code present!")
return False

if len(config.discriminator) != len(config.setup_passcode):
print("error: supplied number of discriminators does not match number of passcodes")
return False

if len(config.dut_node_id) > len(config.discriminator):
print("error: More node IDs provided than discriminators")
return False

if len(config.dut_node_id) < len(config.discriminator):
missing = len(config.discriminator) - len(config.dut_node_id)
for i in range(missing):
config.dut_node_id.append(config.dut_node_id[-1] + 1)

if len(config.dut_node_id) != len(set(config.dut_node_id)):
print("error: Duplicate values in node id list")
return False

if len(config.discriminator) != len(set(config.discriminator)):
print("error: Duplicate value in discriminator list")
return False

# TODO: this should also allow multiple once QR and manual codes are supported.
config.qr_code_content = args.qr_code
config.manual_code = args.manual_code

Expand Down Expand Up @@ -591,9 +613,9 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig:
default=_DEFAULT_CONTROLLER_NODE_ID,
help='NodeID to use for initial/default controller (default: %d)' % _DEFAULT_CONTROLLER_NODE_ID)
basic_group.add_argument('-n', '--dut-node-id', type=int_decimal_or_hex,
metavar='NODE_ID', default=_DEFAULT_DUT_NODE_ID,
metavar='NODE_ID', default=[_DEFAULT_DUT_NODE_ID],
help='Node ID for primary DUT communication, '
'and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID)
'and NodeID to assign if commissioning (default: %d)' % _DEFAULT_DUT_NODE_ID, nargs="+")

commission_group = parser.add_argument_group(title="Commissioning", description="Arguments to commission a node")

Expand All @@ -603,13 +625,13 @@ def parse_matter_test_args(argv: List[str]) -> MatterTestConfig:
help='Name of commissioning method to use')
commission_group.add_argument('-d', '--discriminator', type=int_decimal_or_hex,
metavar='LONG_DISCRIMINATOR',
help='Discriminator to use for commissioning')
help='Discriminator to use for commissioning', nargs="+")
commission_group.add_argument('-p', '--passcode', type=int_decimal_or_hex,
metavar='PASSCODE',
help='PAKE passcode to use')
help='PAKE passcode to use', nargs="+")
commission_group.add_argument('-i', '--ip-addr', type=str,
metavar='RAW_IP_ADDRESS',
help='IP address to use (only for method "on-network-ip". ONLY FOR LOCAL TESTING!')
help='IP address to use (only for method "on-network-ip". ONLY FOR LOCAL TESTING!', nargs="+")

commission_group.add_argument('--wifi-ssid', type=str,
metavar='SSID',
Expand Down Expand Up @@ -692,46 +714,47 @@ class CommissionDeviceTest(MatterBaseTest):

def test_run_commissioning(self):
conf = self.matter_test_config
logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" %
(conf.root_of_trust_index, conf.fabric_id, conf.dut_node_id))
logging.info("Commissioning method: %s" % conf.commissioning_method)
for i in range(len(conf.dut_node_id)):
logging.info("Starting commissioning for root index %d, fabric ID 0x%016X, node ID 0x%016X" %
(conf.root_of_trust_index, conf.fabric_id, conf.dut_node_id[i]))
logging.info("Commissioning method: %s" % conf.commissioning_method)

if not self._commission_device():
raise signals.TestAbortAll("Failed to commission node")
if not self._commission_device(i):
raise signals.TestAbortAll("Failed to commission node")

def _commission_device(self) -> bool:
def _commission_device(self, i) -> bool:
dev_ctrl = self.default_controller
conf = self.matter_test_config

# TODO: support by manual code and QR

if conf.commissioning_method == "on-network":
return dev_ctrl.CommissionOnNetwork(
nodeId=conf.dut_node_id,
setupPinCode=conf.setup_passcode,
nodeId=conf.dut_node_id[i],
setupPinCode=conf.setup_passcode[i],
filterType=DiscoveryFilterType.LONG_DISCRIMINATOR,
filter=conf.discriminator
filter=conf.discriminator[i]
)
elif conf.commissioning_method == "ble-wifi":
return dev_ctrl.CommissionWiFi(
conf.discriminator,
conf.setup_passcode,
conf.dut_node_id,
conf.discriminator[i],
conf.setup_passcode[i],
conf.dut_node_id[i],
conf.wifi_ssid,
conf.wifi_passphrase
)
elif conf.commissioning_method == "ble-thread":
return dev_ctrl.CommissionThread(
conf.discriminator,
conf.setup_passcode,
conf.dut_node_id,
conf.discriminator[i],
conf.setup_passcode[i],
conf.dut_node_id[i],
conf.thread_operational_dataset
)
elif conf.commissioning_method == "on-network-ip":
logging.warning("==== USING A DIRECT IP COMMISSIONING METHOD NOT SUPPORTED IN THE LONG TERM ====")
return dev_ctrl.CommissionIP(
ipaddr=conf.commissionee_ip_address_just_for_testing,
setupPinCode=conf.setup_passcode, nodeid=conf.dut_node_id
setupPinCode=conf.setup_passcode[i], nodeid=conf.dut_node_id[i]
)
else:
raise ValueError("Invalid commissioning method %s!" % conf.commissioning_method)
Expand Down