diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp index 19e1ced153334c..7ba7e9b9b1fb35 100644 --- a/src/app/ReadClient.cpp +++ b/src/app/ReadClient.cpp @@ -458,6 +458,16 @@ void ReadClient::OnUnsolicitedReportData(Messaging::ExchangeContext * apExchange Status status = Status::Success; mExchange.Grab(apExchangeContext); + // + // Let's update the session we're tracking in our SessionHolder to that associated with the message that was just received. + // This CAN be different from the one we were tracking before, since the server is permitted to send exchanges on any valid + // session to us, of which there could be multiple. + // + // Since receipt of a message is proof of a working session on the peer, it's always best to update to that if possible + // to maximize our chances of success later. + // + mReadPrepareParams.mSessionHolder.Grab(mExchange->GetSessionHandle()); + CHIP_ERROR err = ProcessReportData(std::move(aPayload)); if (err != CHIP_NO_ERROR) { @@ -576,14 +586,14 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload) { MoveToState(ClientState::AwaitingSubscribeResponse); } - else if (IsSubscriptionActive()) + else if (IsSubscriptionActive() && err == CHIP_NO_ERROR) { // // Only refresh the liveness check timer if we've successfully established // a subscription and have a valid value for mMaxInterval which the function // relies on. // - RefreshLivenessCheckTimer(); + err = RefreshLivenessCheckTimer(); } } @@ -753,7 +763,11 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout) { mLivenessTimeoutOverride = aLivenessTimeout; - RefreshLivenessCheckTimer(); + auto err = RefreshLivenessCheckTimer(); + if (err != CHIP_NO_ERROR) + { + Close(err); + } } CHIP_ERROR ReadClient::RefreshLivenessCheckTimer() @@ -784,11 +798,6 @@ CHIP_ERROR ReadClient::RefreshLivenessCheckTimer() err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer( timeout, OnLivenessTimeoutCallback, this); - if (err != CHIP_NO_ERROR) - { - Close(err); - } - return err; } @@ -854,7 +863,7 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP mNumRetries = 0; - RefreshLivenessCheckTimer(); + ReturnErrorOnFailure(RefreshLivenessCheckTimer()); return CHIP_NO_ERROR; } @@ -874,6 +883,7 @@ CHIP_ERROR ReadClient::SendSubscribeRequest(const ReadPrepareParams & aReadPrepa { VerifyOrReturnError(aReadPrepareParams.mMinIntervalFloorSeconds <= aReadPrepareParams.mMaxIntervalCeilingSeconds, CHIP_ERROR_INVALID_ARGUMENT); + return SendSubscribeRequestImpl(aReadPrepareParams); } @@ -881,6 +891,11 @@ CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadP { VerifyOrReturnError(ClientState::Idle == mState, CHIP_ERROR_INCORRECT_STATE); + if (&aReadPrepareParams != &mReadPrepareParams) + { + mReadPrepareParams.mSessionHolder = aReadPrepareParams.mSessionHolder; + } + mMinIntervalFloorSeconds = aReadPrepareParams.mMinIntervalFloorSeconds; // Todo: Remove the below, Update span in ReadPrepareParams diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index 2b161a7b62e6f5..0f5a72c72eb714 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -516,6 +516,12 @@ class ReadClient : public Messaging::ExchangeDelegate ReadClient * mpNext = nullptr; InteractionModelEngine * mpImEngine = nullptr; + + // + // This stores the params associated with the interaction in a specific set of cases: + // 1. Stores all parameters when used with subscriptions initiated using SendAutoResubscribeRequest. + // 2. Stores just the SessionHolder when used with any subscriptions. + // ReadPrepareParams mReadPrepareParams; uint32_t mNumRetries = 0; diff --git a/src/app/tests/TestReadInteraction.cpp b/src/app/tests/TestReadInteraction.cpp index 660cd8ef8020c5..1123f5ca10674f 100644 --- a/src/app/tests/TestReadInteraction.cpp +++ b/src/app/tests/TestReadInteraction.cpp @@ -1752,7 +1752,7 @@ void TestReadInteraction::TestSubscribeWildcard(nlTestSuite * apSuite, void * ap readPrepareParams.mAttributePathParamsListSize = 2; readPrepareParams.mMinIntervalFloorSeconds = 0; - readPrepareParams.mMaxIntervalCeilingSeconds = 0; + readPrepareParams.mMaxIntervalCeilingSeconds = 1; printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId); { @@ -1857,7 +1857,7 @@ void TestReadInteraction::TestSubscribePartialOverlap(nlTestSuite * apSuite, voi readPrepareParams.mAttributePathParamsListSize = 1; readPrepareParams.mMinIntervalFloorSeconds = 0; - readPrepareParams.mMaxIntervalCeilingSeconds = 0; + readPrepareParams.mMaxIntervalCeilingSeconds = 1; printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId); { @@ -1933,7 +1933,7 @@ void TestReadInteraction::TestSubscribeSetDirtyFullyOverlap(nlTestSuite * apSuit readPrepareParams.mAttributePathParamsListSize = 1; readPrepareParams.mMinIntervalFloorSeconds = 0; - readPrepareParams.mMaxIntervalCeilingSeconds = 0; + readPrepareParams.mMaxIntervalCeilingSeconds = 1; printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId); { @@ -2059,7 +2059,7 @@ void TestReadInteraction::TestSubscribeInvalidAttributePathRoundtrip(nlTestSuite readPrepareParams.mSessionHolder.Grab(ctx.GetSessionBobToAlice()); readPrepareParams.mMinIntervalFloorSeconds = 0; - readPrepareParams.mMaxIntervalCeilingSeconds = 0; + readPrepareParams.mMaxIntervalCeilingSeconds = 1; printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId); { diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp index 0dda7e31483ce4..cc314e60469cd2 100644 --- a/src/controller/tests/data_model/TestRead.cpp +++ b/src/controller/tests/data_model/TestRead.cpp @@ -246,6 +246,7 @@ class TestReadInteraction : public app::ReadHandler::ApplicationCallback static void TestReadAttributeError(nlTestSuite * apSuite, void * apContext); static void TestReadAttributeTimeout(nlTestSuite * apSuite, void * apContext); static void TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext); + static void TestResubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext); static void TestReadEventResponse(nlTestSuite * apSuite, void * apContext); static void TestReadFabricScopedWithoutFabricFilter(nlTestSuite * apSuite, void * apContext); static void TestReadFabricScopedWithFabricFilter(nlTestSuite * apSuite, void * apContext); @@ -1488,20 +1489,12 @@ class TestResubscriptionCallback : public app::ReadClient::Callback mLastError = aError; } - void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override - { - mOnSubscriptionEstablishedCount++; - - // - // Set the liveness timeout to a super small number that isn't 0 to - // force the liveness timeout to fire. - // - mpReadClient->OverrideLivenessTimeout(System::Clock::Milliseconds32(10)); - } + void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override { mOnSubscriptionEstablishedCount++; } CHIP_ERROR OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override { mOnResubscriptionsAttempted++; + mLastError = aTerminationCause; return apReadClient->ScheduleResubscription(apReadClient->ComputeTimeTillNextSubscription(), NullOptional, false); } @@ -1532,11 +1525,13 @@ class TestResubscriptionCallback : public app::ReadClient::Callback // TODO: This does not validate the CASE establishment pathways since we're limited by the PASE-centric TestContext. // // -void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext) +void TestReadInteraction::TestResubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext) { TestContext & ctx = *static_cast(apContext); auto sessionHandle = ctx.GetSessionBobToAlice(); + ctx.SetMRPMode(Test::MessagingContext::MRPMode::kResponsive); + { TestResubscriptionCallback callback; app::ReadClient readClient(app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), callback, @@ -1556,27 +1551,115 @@ void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, v readPrepareParams.mMaxIntervalCeilingSeconds = 1; - readClient.SendAutoResubscribeRequest(std::move(readPrepareParams)); + auto err = readClient.SendAutoResubscribeRequest(std::move(readPrepareParams)); + NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR); // - // Drive servicing IO till we have established a subscription at least 2 times. + // Drive servicing IO till we have established a subscription. // - ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(2), - [&]() { return callback.mOnSubscriptionEstablishedCount > 1; }); + ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000), + [&]() { return callback.mOnSubscriptionEstablishedCount >= 1; }); + NL_TEST_ASSERT(apSuite, callback.mOnSubscriptionEstablishedCount == 1); + NL_TEST_ASSERT(apSuite, callback.mOnError == 0); + NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 0); - NL_TEST_ASSERT(apSuite, callback.mOnDone == 0); + // + // Disable packet transmission, and drive IO till we have reported a re-subscription attempt. + // + // 1.5s should cover the liveness timeout in the client of 1s max interval + 50ms ACK timeout. + // + ctx.GetLoopback().mNumMessagesToDrop = Test::LoopbackTransport::kUnlimitedMessageCount; + ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(1500), + [&]() { return callback.mOnResubscriptionsAttempted > 0; }); + + NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 1); + NL_TEST_ASSERT(apSuite, callback.mLastError == CHIP_ERROR_TIMEOUT); + + ctx.GetLoopback().mNumMessagesToDrop = 0; + callback.ClearCounters(); // - // With re-sub enabled, we shouldn't encounter any errors. + // Drive servicing IO till we have established a subscription. + // + ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000), + [&]() { return callback.mOnSubscriptionEstablishedCount == 1; }); + NL_TEST_ASSERT(apSuite, callback.mOnSubscriptionEstablishedCount == 1); + + // + // With re-sub enabled, we shouldn't have encountered any errors // NL_TEST_ASSERT(apSuite, callback.mOnError == 0); + NL_TEST_ASSERT(apSuite, callback.mOnDone == 0); + } + + ctx.SetMRPMode(Test::MessagingContext::MRPMode::kDefault); + + app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); +} + +// +// This validates a vanilla subscription with re-susbcription disabled timing out correctly on the client +// side and triggering the OnError callback with the right error code. +// +void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext) +{ + TestContext & ctx = *static_cast(apContext); + auto sessionHandle = ctx.GetSessionBobToAlice(); + + ctx.SetMRPMode(Test::MessagingContext::MRPMode::kResponsive); + + { + TestResubscriptionCallback callback; + app::ReadClient readClient(app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), callback, + app::ReadClient::InteractionType::Subscribe); + + callback.SetReadClient(&readClient); + + app::ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice()); + + app::AttributePathParams attributePathParams[1]; + readPrepareParams.mpAttributePathParamsList = attributePathParams; + readPrepareParams.mAttributePathParamsListSize = ArraySize(attributePathParams); + attributePathParams[0].mClusterId = app::Clusters::TestCluster::Id; + attributePathParams[0].mAttributeId = app::Clusters::TestCluster::Attributes::Boolean::Id; // - // We should have attempted just one re-subscription. + // Request a max interval that's very small to reduce time to discovering a liveness failure. // - NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 1); + readPrepareParams.mMaxIntervalCeilingSeconds = 1; + + auto err = readClient.SendRequest(readPrepareParams); + NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR); + + // + // Drive servicing IO till we have established a subscription. + // + ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000), + [&]() { return callback.mOnSubscriptionEstablishedCount >= 1; }); + NL_TEST_ASSERT(apSuite, callback.mOnSubscriptionEstablishedCount == 1); + + // + // Request we drop all further messages. + // + ctx.GetLoopback().mNumMessagesToDrop = Test::LoopbackTransport::kUnlimitedMessageCount; + + // + // Drive IO until we get an error on the subscription, which should be caused + // by the liveness timer firing within ~1s of the establishment of the subscription. + // + // 1.5s should cover the liveness timeout in the client of 1s max interval + 50ms ACK timeout. + // + ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(1500), [&]() { return callback.mOnError >= 1; }); + + NL_TEST_ASSERT(apSuite, callback.mOnError == 1); + NL_TEST_ASSERT(apSuite, callback.mLastError == CHIP_ERROR_TIMEOUT); + NL_TEST_ASSERT(apSuite, callback.mOnDone == 1); + NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 0); } + ctx.SetMRPMode(Test::MessagingContext::MRPMode::kDefault); + app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } @@ -1645,6 +1728,7 @@ void TestReadInteraction::TestReadHandler_MultipleSubscriptions(nlTestSuite * ap NL_TEST_ASSERT(apSuite, gTestReadInteraction.mNumActiveSubscriptions == 0); NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); + ctx.SetMRPMode(Test::MessagingContext::MRPMode::kDefault); app::InteractionModelEngine::GetInstance()->UnregisterReadHandlerAppCallback(); } @@ -4334,6 +4418,7 @@ const nlTest sTests[] = NL_TEST_DEF("TestReadAttribute_ManyDataValues", TestReadInteraction::TestReadAttribute_ManyDataValues), NL_TEST_DEF("TestReadAttribute_ManyDataValuesWrongPath", TestReadInteraction::TestReadAttribute_ManyDataValuesWrongPath), NL_TEST_DEF("TestReadAttribute_ManyErrors", TestReadInteraction::TestReadAttribute_ManyErrors), + NL_TEST_DEF("TestResubscribeAttributeTimeout", TestReadInteraction::TestResubscribeAttributeTimeout), NL_TEST_DEF("TestSubscribeAttributeTimeout", TestReadInteraction::TestSubscribeAttributeTimeout), NL_TEST_SENTINEL() }; diff --git a/src/messaging/tests/MessagingContext.cpp b/src/messaging/tests/MessagingContext.cpp index c31e66b7c0e47f..00fd5329925474 100644 --- a/src/messaging/tests/MessagingContext.cpp +++ b/src/messaging/tests/MessagingContext.cpp @@ -16,6 +16,7 @@ */ #include "MessagingContext.h" +#include "system/SystemClock.h" #include #include @@ -99,6 +100,28 @@ void MessagingContext::ShutdownAndRestoreExisting(MessagingContext & existing) existing.mTransport->SetSessionManager(&existing.GetSecureSessionManager()); } +void MessagingContext::SetMRPMode(MRPMode mode) +{ + if (mode == MRPMode::kDefault) + { + mSessionBobToAlice->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig()); + mSessionAliceToBob->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig()); + mSessionCharlieToDavid->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig()); + mSessionDavidToCharlie->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig()); + } + else + { + mSessionBobToAlice->AsSecureSession()->SetRemoteMRPConfig( + ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10))); + mSessionAliceToBob->AsSecureSession()->SetRemoteMRPConfig( + ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10))); + mSessionCharlieToDavid->AsSecureSession()->SetRemoteMRPConfig( + ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10))); + mSessionDavidToCharlie->AsSecureSession()->SetRemoteMRPConfig( + ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10))); + } +} + CHIP_ERROR MessagingContext::CreateSessionBobToAlice() { return mSessionManager.InjectPaseSessionWithTestKey(mSessionBobToAlice, kBobKeyId, GetAliceFabric()->GetNodeId(), kAliceKeyId, diff --git a/src/messaging/tests/MessagingContext.h b/src/messaging/tests/MessagingContext.h index 559a0466b7d930..f4108ffb96949c 100644 --- a/src/messaging/tests/MessagingContext.h +++ b/src/messaging/tests/MessagingContext.h @@ -73,6 +73,17 @@ class PlatformMemoryUser class MessagingContext : public PlatformMemoryUser { public: + enum MRPMode + { + kDefault = 1, // This adopts the default MRP values for idle/active as per the spec. + // i.e IDLE = 4s, ACTIVE = 300ms + + kResponsive = 2, // This adopts values that are better suited for loopback tests that + // don't actually go over a network interface, and are tuned much lower + // to permit more responsive tests. + // i.e IDLE = 10ms, ACTIVE = 10ms + }; + MessagingContext() : mInitialized(false), mAliceAddress(Transport::PeerAddress::UDP(GetAddress(), CHIP_PORT + 1)), mBobAddress(Transport::PeerAddress::UDP(GetAddress(), CHIP_PORT)) @@ -129,6 +140,8 @@ class MessagingContext : public PlatformMemoryUser void ExpireSessionAliceToBob(); void ExpireSessionBobToFriends(); + void SetMRPMode(MRPMode mode); + SessionHandle GetSessionBobToAlice(); SessionHandle GetSessionAliceToBob(); SessionHandle GetSessionCharlieToDavid(); diff --git a/src/transport/raw/tests/NetworkTestHelpers.h b/src/transport/raw/tests/NetworkTestHelpers.h index cde07460556eb5..de459cbeed9090 100644 --- a/src/transport/raw/tests/NetworkTestHelpers.h +++ b/src/transport/raw/tests/NetworkTestHelpers.h @@ -99,6 +99,8 @@ class LoopbackTransport : public Transport::Base } } + static constexpr uint32_t kUnlimitedMessageCount = std::numeric_limits::max(); + CHIP_ERROR SendMessage(const Transport::PeerAddress & address, System::PacketBufferHandle && msgBuf) override { ReturnErrorOnFailure(mMessageSendError); @@ -116,6 +118,7 @@ class LoopbackTransport : public Transport::Base if (dropMessage) { + ChipLogProgress(Test, "Dropping message..."); mDroppedMessageCount++; if (mDelegate != nullptr) {