From 1186123900229ec808367bf34fd36427d1cd7d15 Mon Sep 17 00:00:00 2001 From: C Freeman Date: Tue, 18 Jul 2023 13:28:36 -0400 Subject: [PATCH] TC-TIMESYNC-events test automations (#27917) * TC-TIMESYNC-events test automations Adds automation for all the event check test plans for timesync NOTE: test plan updates for these test cases added in https://github.com/CHIP-Specifications/chip-test-plans/pull/3051 * Remove unused includes * Restyled by isort --------- Co-authored-by: Restyled.io --- src/python_testing/TC_TIMESYNC_2_10.py | 126 ++++++++++++++ src/python_testing/TC_TIMESYNC_2_11.py | 155 ++++++++++++++++++ src/python_testing/TC_TIMESYNC_2_12.py | 136 +++++++++++++++ src/python_testing/TC_TIMESYNC_2_13.py | 96 +++++++++++ .../TestMatterTestingSupport.py | 20 ++- src/python_testing/matter_testing_support.py | 25 +++ 6 files changed, 556 insertions(+), 2 deletions(-) create mode 100644 src/python_testing/TC_TIMESYNC_2_10.py create mode 100644 src/python_testing/TC_TIMESYNC_2_11.py create mode 100644 src/python_testing/TC_TIMESYNC_2_12.py create mode 100644 src/python_testing/TC_TIMESYNC_2_13.py diff --git a/src/python_testing/TC_TIMESYNC_2_10.py b/src/python_testing/TC_TIMESYNC_2_10.py new file mode 100644 index 00000000000000..4b66b707c01113 --- /dev/null +++ b/src/python_testing/TC_TIMESYNC_2_10.py @@ -0,0 +1,126 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import queue +import time +import typing +from datetime import datetime, timedelta, timezone + +import chip.clusters as Clusters +from chip.clusters.Types import NullValue +from chip.interaction_model import InteractionModelError +from chip.tlv import uint +from matter_testing_support import (MatterBaseTest, SimpleEventCallback, async_test_body, default_matter_test_main, + utc_time_in_matter_epoch) +from mobly import asserts + + +def get_wait_seconds_from_set_time(set_time_matter_us: int, wait_seconds: int): + seconds_passed = int((utc_time_in_matter_epoch() - set_time_matter_us)/1000000) + return wait_seconds - seconds_passed + + +class TC_TIMESYNC_2_10(MatterBaseTest): + async def send_set_time_zone_cmd(self, tz: typing.List[Clusters.Objects.TimeSynchronization.Structs.TimeZoneStruct]) -> Clusters.Objects.TimeSynchronization.Commands.SetTimeZoneResponse: + ret = await self.send_single_cmd(cmd=Clusters.Objects.TimeSynchronization.Commands.SetTimeZone(timeZone=tz), endpoint=self.endpoint) + return ret + + async def send_set_dst_cmd(self, dst: typing.List[Clusters.Objects.TimeSynchronization.Structs.DSTOffsetStruct]) -> None: + await self.send_single_cmd(cmd=Clusters.Objects.TimeSynchronization.Commands.SetDSTOffset(DSTOffset=dst)) + + async def send_set_utc_cmd(self, utc: uint) -> None: + await self.send_single_cmd(cmd=Clusters.Objects.TimeSynchronization.Commands.SetUTCTime(UTCTime=utc, granularity=Clusters.Objects.TimeSynchronization.Enums.GranularityEnum.kMillisecondsGranularity)) + + @async_test_body + async def test_TC_TIMESYNC_2_10(self): + + self.endpoint = 0 + + self.print_step(0, "Commissioning, already done") + time_cluster = Clusters.Objects.TimeSynchronization + tz_struct = time_cluster.Structs.TimeZoneStruct + dst_struct = time_cluster.Structs.DSTOffsetStruct + + self.print_step(1, "Send SetUTCCommand") + # It doesn't actually matter if this succeeds. The DUT is free to reject this command and use its own time. + # If the DUT fails to get the time completely, all other tests will fail. + try: + await self.send_set_utc_cmd(utc_time_in_matter_epoch()) + except InteractionModelError: + pass + + self.print_step(2, "Send SetTimeZone command") + tz = [tz_struct(offset=7200, validAt=0)] + ret = await self.send_set_time_zone_cmd(tz) + asserts.assert_true(ret.DSTOffsetRequired, "DSTOffsetRequired not set to true") + + self.print_step(3, "Send SetDSTOffset command") + dst = [dst_struct(offset=3600, validStarting=0, validUntil=NullValue)] + await self.send_set_dst_cmd(dst) + + self.print_step(4, "Subscribe to DSTTableEmpy event") + event = time_cluster.Events.DSTTableEmpty + q = queue.Queue() + cb = SimpleEventCallback("DSTTableEmpty", event.cluster_id, event.event_id, q) + urgent = 1 + subscription = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(self.endpoint, event, urgent)], reportInterval=[1, 3]) + subscription.SetEventUpdateCallback(callback=cb) + + self.print_step(5, "Send SetTimeZone command") + tz = [tz_struct(offset=3600, validAt=0)] + ret = await self.send_set_time_zone_cmd(tz) + asserts.assert_true(ret.DSTOffsetRequired, "DSTOffsetRequired not set to true") + + self.print_step(6, "Wait for DSTTableEmpty event") + try: + q.get(block=True, timeout=5) + except queue.Empty: + asserts.fail("Did not receive DSTTableEmpy event") + + self.print_step(7, "Set DSTOffset to expire in 10 seconds") + th_utc = utc_time_in_matter_epoch(datetime.now(tz=timezone.utc)) + expiry = utc_time_in_matter_epoch(datetime.now(tz=timezone.utc) + timedelta(seconds=10)) + dst = [dst_struct(offset=3600, validStarting=0, validUntil=expiry)] + await self.send_set_dst_cmd(dst) + + self.print_step(8, "Wait until th_utc + 15s") + time.sleep(get_wait_seconds_from_set_time(th_utc, 15)) + + self.print_step(9, "Read LocalTime from the DUT") + await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, + attribute=Clusters.TimeSynchronization.Attributes.LocalTime) + + self.print_step(10, "Wait for DSTTableEmpty event") + timeout = get_wait_seconds_from_set_time(th_utc, 20) + try: + q.get(block=True, timeout=timeout) + except queue.Empty: + asserts.fail("Did not receive DSTTableEmpy event") + pass + + self.print_step(11, "Set time zone back to 0") + tz = [tz_struct(offset=0, validAt=0)] + ret = await self.send_set_time_zone_cmd(tz) + asserts.assert_true(ret.DSTOffsetRequired, "DSTOffsetRequired not set to true") + + self.print_step(12, "Set DST back to 0") + dst = [dst_struct(offset=0, validStarting=0, validUntil=NullValue)] + await self.send_set_dst_cmd(dst) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/TC_TIMESYNC_2_11.py b/src/python_testing/TC_TIMESYNC_2_11.py new file mode 100644 index 00000000000000..5d9258044df9d6 --- /dev/null +++ b/src/python_testing/TC_TIMESYNC_2_11.py @@ -0,0 +1,155 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import queue +import time +import typing +from datetime import datetime, timedelta, timezone + +import chip.clusters as Clusters +from chip.clusters.Types import NullValue +from chip.interaction_model import InteractionModelError +from chip.tlv import uint +from matter_testing_support import (MatterBaseTest, SimpleEventCallback, async_test_body, default_matter_test_main, + get_wait_seconds_from_set_time, type_matches, utc_time_in_matter_epoch) +from mobly import asserts + + +class TC_TIMESYNC_2_11(MatterBaseTest): + async def send_set_time_zone_cmd(self, tz: typing.List[Clusters.Objects.TimeSynchronization.Structs.TimeZoneStruct]) -> Clusters.Objects.TimeSynchronization.Commands.SetTimeZoneResponse: + ret = await self.send_single_cmd(cmd=Clusters.Objects.TimeSynchronization.Commands.SetTimeZone(timeZone=tz), endpoint=self.endpoint) + return ret + + async def send_set_dst_cmd(self, dst: typing.List[Clusters.Objects.TimeSynchronization.Structs.DSTOffsetStruct]) -> None: + await self.send_single_cmd(cmd=Clusters.Objects.TimeSynchronization.Commands.SetDSTOffset(DSTOffset=dst)) + + async def send_set_utc_cmd(self, utc: uint) -> None: + await self.send_single_cmd(cmd=Clusters.Objects.TimeSynchronization.Commands.SetUTCTime(UTCTime=utc, granularity=Clusters.Objects.TimeSynchronization.Enums.GranularityEnum.kMillisecondsGranularity)) + + def wait_for_dst_status(self, th_utc, wait_s, expect_active): + timeout = get_wait_seconds_from_set_time(th_utc, wait_s) + try: + ret = self.q.get(block=True, timeout=timeout) + asserts.assert_true(type_matches(received_value=ret.Data, + desired_type=Clusters.TimeSynchronization.Events.DSTStatus), "Unexpected event type returned") + asserts.assert_equal(ret.Data.DSTOffsetActive, expect_active, "Unexpected value for DSTOffsetActive") + except queue.Empty: + asserts.fail("Did not receive DSTStatus event") + pass + + @async_test_body + async def test_TC_TIMESYNC_2_11(self): + + self.endpoint = 0 + + self.print_step(0, "Commissioning, already done") + time_cluster = Clusters.Objects.TimeSynchronization + tz_struct = time_cluster.Structs.TimeZoneStruct + dst_struct = time_cluster.Structs.DSTOffsetStruct + + self.print_step(1, "Send SetUTCCommand") + # It doesn't actually matter if this succeeds. The DUT is free to reject this command and use its own time. + # If the DUT fails to get the time completely, all other tests will fail. + try: + await self.send_set_utc_cmd(utc_time_in_matter_epoch()) + except InteractionModelError: + pass + + self.print_step(2, "Send SetTimeZone command") + tz = [tz_struct(offset=7200, validAt=0)] + ret = await self.send_set_time_zone_cmd(tz) + asserts.assert_true(ret.DSTOffsetRequired, "DSTOffsetRequired not set to true") + + self.print_step(3, "Subscribe to DSTStatus event") + event = time_cluster.Events.DSTStatus + self.q = queue.Queue() + cb = SimpleEventCallback("DSTStatus", event.cluster_id, event.event_id, self.q) + urgent = 1 + subscription = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(self.endpoint, event, urgent)], reportInterval=[1, 3]) + subscription.SetEventUpdateCallback(callback=cb) + + self.print_step(4, "TH reads the DSTOffsetListMaxSize") + dst_list_size = await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.DSTOffsetListMaxSize) + asserts.assert_greater_equal(dst_list_size, 1, "Invalid dst list size") + + self.print_step(5, "TH sets two DST items if dst_list_size > 1") + th_utc = utc_time_in_matter_epoch(datetime.now(tz=timezone.utc)) + expiry_first = utc_time_in_matter_epoch(datetime.now(tz=timezone.utc) + timedelta(seconds=10)) + dst_first = dst_struct(offset=3600, validStarting=0, validUntil=expiry_first) + if dst_list_size > 1: + start_second = utc_time_in_matter_epoch(datetime.now(tz=timezone.utc) + timedelta(seconds=25)) + expiry_second = utc_time_in_matter_epoch(datetime.now(tz=timezone.utc) + timedelta(seconds=40)) + dst_second = dst_struct(offset=3600, validStarting=start_second, validUntil=expiry_second) + dst = [dst_first, dst_second] + await self.send_set_dst_cmd(dst) + + self.print_step(6, "TH sets 1 DST item if dst_list_size == 1") + if dst_list_size == 1: + dst = [dst_first] + await self.send_set_dst_cmd(dst) + + self.print_step(7, "TH reads LocalTime") + await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.LocalTime) + + self.print_step(8, "TH waits for DSTStatus event until th_utc + 5s") + self.wait_for_dst_status(th_utc, 5, True) + + self.print_step(9, "TH waits until th_utc + 15s") + time.sleep(get_wait_seconds_from_set_time(th_utc, 15)) + + self.print_step(10, "TH reads LocalTime") + await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.LocalTime) + + self.print_step(11, "TH waits for DSTStatus event until th_utc + 20s") + self.wait_for_dst_status(th_utc, 20, False) + + self.print_step(12, "If dst_list_size > 1, TH waits until th_utc + 30s") + if dst_list_size > 1: + time.sleep(get_wait_seconds_from_set_time(th_utc, 30)) + + self.print_step(13, "If dst_list_size > 1, TH reads LocalTime") + if dst_list_size > 1: + await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.LocalTime) + + self.print_step(14, "If dst_list_size > 1, TH waits for a DSTStatus event until th_utc + 35s") + if dst_list_size > 1: + self.wait_for_dst_status(th_utc, 35, True) + + self.print_step(15, "If dst_list_size > 1, TH waits until th_utc + 45s") + if dst_list_size > 1: + time.sleep(get_wait_seconds_from_set_time(th_utc, 45)) + + self.print_step(16, "If dst_list_size > 1, TH reads the LocalTime") + if dst_list_size > 1: + await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.LocalTime) + + self.print_step(17, "If dst_list_size > 1, TH waits for a DSTStatus event until th_utc + 50s") + if dst_list_size > 1: + self.wait_for_dst_status(th_utc, 50, False) + + self.print_step(18, "Set time zone back to 0") + tz = [tz_struct(offset=0, validAt=0)] + ret = await self.send_set_time_zone_cmd(tz) + asserts.assert_true(ret.DSTOffsetRequired, "DSTOffsetRequired not set to true") + + self.print_step(19, "Set DST back to 0") + dst = [dst_struct(offset=0, validStarting=0, validUntil=NullValue)] + await self.send_set_dst_cmd(dst) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/TC_TIMESYNC_2_12.py b/src/python_testing/TC_TIMESYNC_2_12.py new file mode 100644 index 00000000000000..500fda6f985abb --- /dev/null +++ b/src/python_testing/TC_TIMESYNC_2_12.py @@ -0,0 +1,136 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import queue +import time +import typing +from datetime import datetime, timedelta, timezone + +import chip.clusters as Clusters +from chip.clusters.Types import NullValue +from chip.interaction_model import InteractionModelError +from chip.tlv import uint +from matter_testing_support import (MatterBaseTest, SimpleEventCallback, async_test_body, default_matter_test_main, + get_wait_seconds_from_set_time, type_matches, utc_time_in_matter_epoch) +from mobly import asserts + + +class TC_TIMESYNC_2_12(MatterBaseTest): + async def send_set_time_zone_cmd(self, tz: typing.List[Clusters.Objects.TimeSynchronization.Structs.TimeZoneStruct]) -> Clusters.Objects.TimeSynchronization.Commands.SetTimeZoneResponse: + ret = await self.send_single_cmd(cmd=Clusters.Objects.TimeSynchronization.Commands.SetTimeZone(timeZone=tz), endpoint=self.endpoint) + return ret + + async def send_set_dst_cmd(self, dst: typing.List[Clusters.Objects.TimeSynchronization.Structs.DSTOffsetStruct]) -> None: + await self.send_single_cmd(cmd=Clusters.Objects.TimeSynchronization.Commands.SetDSTOffset(DSTOffset=dst)) + + async def send_set_utc_cmd(self, utc: uint) -> None: + await self.send_single_cmd(cmd=Clusters.Objects.TimeSynchronization.Commands.SetUTCTime(UTCTime=utc, granularity=Clusters.Objects.TimeSynchronization.Enums.GranularityEnum.kMillisecondsGranularity)) + + def wait_for_tz_status(self, th_utc, wait_s, expected_offset, expected_name): + timeout = get_wait_seconds_from_set_time(th_utc, wait_s) + try: + ret = self.q.get(block=True, timeout=timeout) + asserts.assert_true(type_matches(received_value=ret.Data, + desired_type=Clusters.TimeSynchronization.Events.TimeZoneStatus), "Incorrect type received for event") + asserts.assert_equal(ret.Data.offset, expected_offset, "Unexpected offset returned") + asserts.assert_equal(ret.Data.name, expected_name, "Unexpected name returned") + except queue.Empty: + asserts.fail("Did not receive TZStatus event") + + @async_test_body + async def test_TC_TIMESYNC_2_12(self): + + self.endpoint = 0 + + self.print_step(0, "Commissioning, already done") + time_cluster = Clusters.Objects.TimeSynchronization + tz_struct = time_cluster.Structs.TimeZoneStruct + dst_struct = time_cluster.Structs.DSTOffsetStruct + + self.print_step(1, "Send SetUTCCommand") + # It doesn't actually matter if this succeeds. The DUT is free to reject this command and use its own time. + # If the DUT fails to get the time completely, all other tests will fail. + try: + await self.send_set_utc_cmd(utc_time_in_matter_epoch()) + except InteractionModelError: + pass + + self.print_step(2, "Send SetTimeZone command") + tz = [tz_struct(offset=7200, validAt=0)] + ret = await self.send_set_time_zone_cmd(tz) + asserts.assert_true(ret.DSTOffsetRequired, "DSTOffsetRequired not set to true") + + self.print_step(3, "Subscribe to TimeZoneStatus event") + event = time_cluster.Events.TimeZoneStatus + self.q = queue.Queue() + cb = SimpleEventCallback("TimeZoneStatus", event.cluster_id, event.event_id, self.q) + urgent = 1 + subscription = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(self.endpoint, event, urgent)], reportInterval=[1, 3]) + subscription.SetEventUpdateCallback(callback=cb) + + self.print_step(4, "TH reads the TimeZoneListMaxSize") + tz_list_size = await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.TimeZoneListMaxSize) + asserts.assert_greater_equal(tz_list_size, 1, "Invalid tz list size") + + self.print_step(5, "TH sets two TZ items if dst_list_size > 1") + th_utc = utc_time_in_matter_epoch(datetime.now(tz=timezone.utc)) + tz_first = tz_struct(offset=3600, validAt=0, name="Not/Real") + if tz_list_size > 1: + valid_second = utc_time_in_matter_epoch(datetime.now(tz=timezone.utc) + timedelta(seconds=10)) + tz_second = tz_struct(offset=7200, validAt=valid_second, name="Un/Real") + tz = [tz_first, tz_second] + await self.send_set_time_zone_cmd(tz) + + self.print_step(6, "TH sets 1 TZ item if tz_list_size == 1") + if tz_list_size == 1: + tz = [tz_first] + await self.send_set_time_zone_cmd(tz) + + self.print_step(7, "TH sets DST offsets") + dst = [dst_struct(offset=0, validStarting=0, validUntil=NullValue)] + await self.send_set_dst_cmd(dst) + + self.print_step(8, "TH reads LocalTime") + await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.LocalTime) + + self.print_step(9, "TH waits for TimeZoneStatus event until th_utc + 5s") + self.wait_for_tz_status(th_utc, 5, 3600, "Not/Real") + + self.print_step(10, "If tz_list_size > 1, TH waits until th_utc + 15s") + if tz_list_size > 1: + time.sleep(get_wait_seconds_from_set_time(th_utc, 15)) + + self.print_step(11, "If tz_list_size > 1, TH reads LocalTime") + if tz_list_size > 1: + await self.read_single_attribute_check_success(cluster=Clusters.TimeSynchronization, attribute=Clusters.TimeSynchronization.Attributes.LocalTime) + + self.print_step(12, "if tz_list_size > 1, TH waits for a TimeZoneStatus event until th_utc + 20s") + if tz_list_size > 1: + self.wait_for_tz_status(th_utc, 20, 7200, "Un/Real") + + self.print_step(13, "Set time zone back to 0") + tz = [tz_struct(offset=0, validAt=0)] + ret = await self.send_set_time_zone_cmd(tz) + asserts.assert_true(ret.DSTOffsetRequired, "DSTOffsetRequired not set to true") + + self.print_step(14, "Set DST back to 0") + dst = [dst_struct(offset=0, validStarting=0, validUntil=NullValue)] + await self.send_set_dst_cmd(dst) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/TC_TIMESYNC_2_13.py b/src/python_testing/TC_TIMESYNC_2_13.py new file mode 100644 index 00000000000000..ddebc27d287162 --- /dev/null +++ b/src/python_testing/TC_TIMESYNC_2_13.py @@ -0,0 +1,96 @@ +# +# Copyright (c) 2023 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import queue +import time + +import chip.clusters as Clusters +from chip import ChipDeviceCtrl +from chip.clusters.Types import NullValue +from matter_testing_support import MatterBaseTest, SimpleEventCallback, async_test_body, default_matter_test_main, type_matches +from mobly import asserts + + +class TC_TIMESYNC_2_13(MatterBaseTest): + def wait_for_trusted_time_souce_event(self, timeout): + try: + ret = self.q.get(block=True, timeout=timeout) + asserts.assert_true(type_matches(received_value=ret.Data, + desired_type=Clusters.TimeSynchronization.Events.MissingTrustedTimeSource), "Incorrect type received for event") + except queue.Empty: + asserts.fail("Did not receive MissingTrustedTimeSouce event") + + @async_test_body + async def test_TC_TIMESYNC_2_13(self): + + self.endpoint = 0 + + self.print_step(0, "Commissioning, already done") + + self.print_step(1, "TH1 opens a commissioning window") + params = self.default_controller.OpenCommissioningWindow( + nodeid=self.dut_node_id, timeout=600, iteration=10000, discriminator=1234, option=1) + + self.print_step(2, "Commission to TH2") + new_certificate_authority = self.certificate_authority_manager.NewCertificateAuthority() + new_fabric_admin = new_certificate_authority.NewFabricAdmin(vendorId=0xFFF1, fabricId=2) + TH2 = new_fabric_admin.NewController(nodeId=112233) + + errcode = TH2.CommissionOnNetwork( + nodeId=self.dut_node_id, setupPinCode=params.setupPinCode, + filterType=ChipDeviceCtrl.DiscoveryFilterType.LONG_DISCRIMINATOR, filter=1234) + asserts.assert_true(errcode.is_success, 'Commissioning on TH2 did not complete successfully') + + self.print_step(3, "TH2 reads the current fabric") + th2_fabric_idx = await self.read_single_attribute_check_success( + dev_ctrl=TH2, cluster=Clusters.OperationalCredentials, attribute=Clusters.OperationalCredentials.Attributes.CurrentFabricIndex) + + self.print_step(4, "TH2 sends the SetTrustedTimeSource command to the DUT with its nodeID") + tts = Clusters.TimeSynchronization.Structs.FabricScopedTrustedTimeSourceStruct(nodeID=TH2.nodeId, endpoint=0) + await self.send_single_cmd(dev_ctrl=TH2, cmd=Clusters.TimeSynchronization.Commands.SetTrustedTimeSource(trustedTimeSource=tts)) + + self.print_step(5, "TH1 subscribeds to the MissingTrustedTimeSource event") + event = Clusters.TimeSynchronization.Events.MissingTrustedTimeSource + self.q = queue.Queue() + cb = SimpleEventCallback("MissingTrustedTimeSource", event.cluster_id, event.event_id, self.q) + urgent = 1 + subscription = await self.default_controller.ReadEvent(nodeid=self.dut_node_id, events=[(self.endpoint, event, urgent)], reportInterval=[1, 3]) + subscription.SetEventUpdateCallback(callback=cb) + + self.print_step(6, "TH1 removes the TH2 fabric") + await self.send_single_cmd(cmd=Clusters.OperationalCredentials.Commands.RemoveFabric(fabricIndex=th2_fabric_idx)) + + self.print_step(7, "TH1 waits for the MissingTrustedTimeSource event with a timeout of 5 seconds") + self.wait_for_trusted_time_souce_event(5) + + self.print_step(8, "TH1 sends a SetTrusteTimeSource command") + tts = Clusters.TimeSynchronization.Structs.FabricScopedTrustedTimeSourceStruct( + nodeID=self.default_controller.nodeId, endpoint=0) + await self.send_single_cmd(cmd=Clusters.TimeSynchronization.Commands.SetTrustedTimeSource(trustedTimeSource=tts)) + + self.print_step(9, "TH1 waits 5 seconds") + time.sleep(5) + + self.print_step(10, "TH1 sends the SetTrustedTimeSource command with TrustedTimeSource set to NULL") + await self.send_single_cmd(cmd=Clusters.TimeSynchronization.Commands.SetTrustedTimeSource(NullValue)) + + self.print_step(11, "TH1 waits for the MissingTrustedTimeSource event with a timeout of 5 seconds") + self.wait_for_trusted_time_souce_event(5) + + +if __name__ == "__main__": + default_matter_test_main() diff --git a/src/python_testing/TestMatterTestingSupport.py b/src/python_testing/TestMatterTestingSupport.py index 8762e9d8ff15bc..ce9f8b88403786 100644 --- a/src/python_testing/TestMatterTestingSupport.py +++ b/src/python_testing/TestMatterTestingSupport.py @@ -15,14 +15,15 @@ # limitations under the License. # +import time import typing from datetime import datetime, timedelta, timezone import chip.clusters as Clusters from chip.clusters.Types import Nullable, NullValue from chip.tlv import uint -from matter_testing_support import (MatterBaseTest, async_test_body, compare_time, default_matter_test_main, parse_pics, - type_matches, utc_time_in_matter_epoch) +from matter_testing_support import (MatterBaseTest, async_test_body, compare_time, default_matter_test_main, + get_wait_seconds_from_set_time, parse_pics, type_matches, utc_time_in_matter_epoch) from mobly import asserts, signals @@ -184,6 +185,21 @@ def test_time_compare_function(self): compare_time(received=timedelta(seconds=3600).total_seconds() * 1000000, offset=timedelta(seconds=3605), utc=0, tolerance=timedelta(seconds=5)) + def test_get_wait_time_function(self): + th_utc = utc_time_in_matter_epoch() + secs = get_wait_seconds_from_set_time(th_utc, 5) + asserts.assert_equal(secs, 5) + # If we've pass less than a second, we still want to wait 5 + time.sleep(0.5) + secs = get_wait_seconds_from_set_time(th_utc, 5) + asserts.assert_equal(secs, 5) + + time.sleep(0.5) + secs = get_wait_seconds_from_set_time(th_utc, 5) + asserts.assert_equal(secs, 4) + secs = get_wait_seconds_from_set_time(th_utc, 15) + asserts.assert_equal(secs, 14) + if __name__ == "__main__": default_matter_test_main() diff --git a/src/python_testing/matter_testing_support.py b/src/python_testing/matter_testing_support.py index 8688504b6c090d..b51da99aed15e6 100644 --- a/src/python_testing/matter_testing_support.py +++ b/src/python_testing/matter_testing_support.py @@ -20,8 +20,10 @@ import builtins import json import logging +import math import os import pathlib +import queue import re import sys import typing @@ -46,6 +48,7 @@ import chip.logging import chip.native from chip.ChipStack import ChipStack +from chip.clusters.Attribute import EventReadResult, SubscriptionTransaction from chip.interaction_model import InteractionModelError, Status from chip.storage import PersistentStorage from mobly import asserts, base_test, signals, utils @@ -195,6 +198,28 @@ def compare_time(received: int, offset: timedelta = timedelta(), utc: int = None asserts.assert_less_equal(delta, tolerance, "Received time is out of tolerance") +def get_wait_seconds_from_set_time(set_time_matter_us: int, wait_seconds: int): + seconds_passed = math.floor((utc_time_in_matter_epoch() - set_time_matter_us)/1000000) + return wait_seconds - seconds_passed + + +class SimpleEventCallback: + def __init__(self, name: str, expected_cluster_id: int, expected_event_id: int, output_queue: queue.SimpleQueue): + self._name = name + self._expected_cluster_id = expected_cluster_id + self._expected_event_id = expected_event_id + self._output_queue = output_queue + + def __call__(self, event_result: EventReadResult, transaction: SubscriptionTransaction): + if (self._expected_cluster_id == event_result.Header.ClusterId and + self._expected_event_id == event_result.Header.EventId): + self._output_queue.put(event_result) + + @property + def name(self) -> str: + return self._name + + @dataclass class MatterTestConfig: storage_path: pathlib.Path = pathlib.Path(".")