diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp index 6cf158eca18c7d..68ea7c7e986448 100644 --- a/src/app/InteractionModelEngine.cpp +++ b/src/app/InteractionModelEngine.cpp @@ -210,14 +210,14 @@ void InteractionModelEngine::OnDone(CommandHandler & apCommandObj) void InteractionModelEngine::OnDone(ReadHandler & apReadObj) { - mReadHandlers.ReleaseObject(&apReadObj); - // // Deleting an item can shift down the contents of the underlying pool storage, - // rendering any tracker using positional indexes invalid. Let's reset it and - // have it start from index 0. + // rendering any tracker using positional indexes invalid. Let's reset it, + // based on which readHandler we are getting rid of. // - mReportingEngine.ResetReadHandlerTracker(); + mReportingEngine.ResetReadHandlerTracker(&apReadObj); + + mReadHandlers.ReleaseObject(&apReadObj); } CHIP_ERROR InteractionModelEngine::OnInvokeCommandRequest(Messaging::ExchangeContext * apExchangeContext, diff --git a/src/app/reporting/Engine.cpp b/src/app/reporting/Engine.cpp index 667a5909647dd6..004a5222f458d7 100644 --- a/src/app/reporting/Engine.cpp +++ b/src/app/reporting/Engine.cpp @@ -528,14 +528,19 @@ void Engine::Run() mRunScheduled = false; - while ((mNumReportsInFlight < CHIP_IM_MAX_REPORTS_IN_FLIGHT) && (numReadHandled < imEngine->mReadHandlers.Allocated())) + // We may be deallocating read handlers as we go. Track how many we had + // initially, so we make sure to go through all of them. + size_t initialAllocated = imEngine->mReadHandlers.Allocated(); + while ((mNumReportsInFlight < CHIP_IM_MAX_REPORTS_IN_FLIGHT) && (numReadHandled < initialAllocated)) { ReadHandler * readHandler = imEngine->ActiveHandlerAt(mCurReadHandlerIdx % (uint32_t) imEngine->mReadHandlers.Allocated()); VerifyOrDie(readHandler != nullptr); if (readHandler->IsReportable()) { - CHIP_ERROR err = BuildAndSendSingleReportData(readHandler); + mRunningReadHandler = readHandler; + CHIP_ERROR err = BuildAndSendSingleReportData(readHandler); + mRunningReadHandler = nullptr; if (err != CHIP_NO_ERROR) { return; @@ -543,6 +548,9 @@ void Engine::Run() } numReadHandled++; + // If readHandler removed itself from our list, we also decremented + // mCurReadHandlerIdx to account for that removal, so it's safe to + // increment here. mCurReadHandlerIdx++; } @@ -678,6 +686,12 @@ void Engine::OnReportConfirm() { VerifyOrDie(mNumReportsInFlight > 0); + if (mNumReportsInFlight == CHIP_IM_MAX_REPORTS_IN_FLIGHT) + { + // We could have other things waiting to go now that this report is no + // longer in flight. + ScheduleRun(); + } mNumReportsInFlight--; ChipLogDetail(DataManagement, " OnReportConfirm: NumReports = %" PRIu32, mNumReportsInFlight); } diff --git a/src/app/reporting/Engine.h b/src/app/reporting/Engine.h index 759d6d50dc8f09..ff5bbf42dd2302 100644 --- a/src/app/reporting/Engine.h +++ b/src/app/reporting/Engine.h @@ -98,8 +98,25 @@ class Engine /* * Resets the tracker that tracks the currently serviced read handler. - */ - void ResetReadHandlerTracker() { mCurReadHandlerIdx = 0; } + * apReadHandler can be non-null to indicate that the reset is due to a + * specific ReadHandler being deallocated. + */ + void ResetReadHandlerTracker(ReadHandler * apReadHandlerBeingDeleted) + { + if (apReadHandlerBeingDeleted == mRunningReadHandler) + { + // Just decrement, so our increment after we finish running it will + // do the right thing. + --mCurReadHandlerIdx; + } + else + { + // No idea what to do here to make the indexing sane. Just start at + // the beginning. We need to do better here; see + // https://github.com/project-chip/connectedhomeip/issues/13809 + mCurReadHandlerIdx = 0; + } + } private: friend class TestReportingEngine; @@ -175,6 +192,11 @@ class Engine */ uint32_t mCurReadHandlerIdx = 0; + /** + * The read handler we're calling BuildAndSendSingleReportData on right now. + */ + ReadHandler * mRunningReadHandler = nullptr; + /** * mGlobalDirtySet is used to track the set of attribute/event paths marked dirty for reporting purposes. * diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp index 7799519a34001b..9e325a26a444e0 100644 --- a/src/controller/tests/data_model/TestRead.cpp +++ b/src/controller/tests/data_model/TestRead.cpp @@ -46,6 +46,10 @@ enum ResponseDirective ResponseDirective responseDirective; +// Number of reads of TestCluster::Attributes::Int16u that we have observed. +// Every read will increment this count by 1 and return the new value. +uint16_t totalReadCount = 0; + } // namespace namespace chip { @@ -73,6 +77,16 @@ CHIP_ERROR ReadSingleClusterData(const Access::SubjectDescriptor & aSubjectDescr return CHIP_NO_ERROR; }); } + else if (aPath.mClusterId == app::Clusters::TestCluster::Id && + aPath.mAttributeId == app::Clusters::TestCluster::Attributes::Int16u::Id) + { + AttributeValueEncoder::AttributeEncodeState state = + (apEncoderState == nullptr ? AttributeValueEncoder::AttributeEncodeState() : *apEncoderState); + AttributeValueEncoder valueEncoder(aAttributeReports, aSubjectDescriptor.fabricIndex, aPath, 0 /* data version */, + aIsFabricFiltered, state); + + return valueEncoder.Encode(++totalReadCount); + } else { AttributeReportIB::Builder & attributeReport = aAttributeReports.CreateAttributeReport(); @@ -155,11 +169,20 @@ class TestReadInteraction static void TestReadFabricScopedWithoutFabricFilter(nlTestSuite * apSuite, void * apContext); static void TestReadFabricScopedWithFabricFilter(nlTestSuite * apSuite, void * apContext); static void TestReadHandler_MultipleSubscriptions(nlTestSuite * apSuite, void * apContext); + static void TestReadHandler_MultipleReads(nlTestSuite * apSuite, void * apContext); + static void TestReadHandler_OneSubscribeMultipleReads(nlTestSuite * apSuite, void * apContext); + static void TestReadHandler_TwoSubscribesMultipleReads(nlTestSuite * apSuite, void * apContext); static void TestReadHandler_MultipleSubscriptionsWithDataVersionFilter(nlTestSuite * apSuite, void * apContext); static void TestReadHandlerResourceExhaustion_MultipleSubscriptions(nlTestSuite * apSuite, void * apContext); static void TestReadHandlerResourceExhaustion_MultipleReads(nlTestSuite * apSuite, void * apContext); private: + // Issue the given number of reads in parallel and wait for them all to + // succeed. + static void MultipleReadHelper(nlTestSuite * apSuite, TestContext & aCtx, size_t aReadCount); + // Establish the given number of subscriptions, then issue the given number + // of reads in parallel and wait for them all to succeed. + static void SubscribeThenReadHelper(nlTestSuite * apSuite, TestContext & aCtx, size_t aSubscribeCount, size_t aReadCount); }; void TestReadInteraction::TestReadAttributeResponse(nlTestSuite * apSuite, void * apContext) @@ -198,8 +221,6 @@ void TestReadInteraction::TestReadAttributeResponse(nlTestSuite * apSuite, void kTestEndpointId, onSuccessCb, onFailureCb); ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, onSuccessCbInvoked && !onFailureCbInvoked); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadClients() == 0); @@ -243,8 +264,6 @@ void TestReadInteraction::TestReadDataVersionFilter(nlTestSuite * apSuite, void &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, true, dataVersion); ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, !onSuccessCbInvoked && !onFailureCbInvoked); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadClients() == 0); @@ -276,8 +295,6 @@ void TestReadInteraction::TestReadEventResponse(nlTestSuite * apSuite, void * ap onSuccessCb, onFailureCb); ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, !onFailureCbInvoked); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadClients() == 0); @@ -310,8 +327,6 @@ void TestReadInteraction::TestReadAttributeError(nlTestSuite * apSuite, void * a kTestEndpointId, onSuccessCb, onFailureCb); ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, !onSuccessCbInvoked && onFailureCbInvoked); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadClients() == 0); @@ -343,9 +358,11 @@ void TestReadInteraction::TestReadAttributeTimeout(nlTestSuite * apSuite, void * Controller::ReadAttribute(&ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb); + ctx.ExpireSessionAliceToBob(); + ctx.DrainAndServiceIO(); - NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 2); + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 1); ctx.ExpireSessionBobToAlice(); @@ -353,13 +370,7 @@ void TestReadInteraction::TestReadAttributeTimeout(nlTestSuite * apSuite, void * NL_TEST_ASSERT(apSuite, !onSuccessCbInvoked && onFailureCbInvoked); - NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 1); - - ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); - - ctx.ExpireSessionAliceToBob(); + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers() == 0); @@ -370,10 +381,7 @@ void TestReadInteraction::TestReadAttributeTimeout(nlTestSuite * apSuite, void * ctx.CreateSessionAliceToBob(); ctx.CreateSessionBobToAlice(); - // - // TODO: Figure out why I cannot enable this line below. - // - // NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } void TestReadInteraction::TestReadHandler_MultipleSubscriptions(nlTestSuite * apSuite, void * apContext) @@ -415,16 +423,7 @@ void TestReadInteraction::TestReadHandler_MultipleSubscriptions(nlTestSuite * ap onSubscriptionEstablishedCb, false, true) == CHIP_NO_ERROR); } - // - // It may take a couple of service calls since we may hit the limit of CHIP_IM_MAX_REPORTS_IN_FLIGHT - // reports. - // - for (int i = 0; i < 10 && (numSubscriptionEstablishedCalls != (CHIP_IM_MAX_NUM_READ_HANDLER + 1)); i++) - { - ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); - } + ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, numSuccessCalls == (CHIP_IM_MAX_NUM_READ_HANDLER + 1)); NL_TEST_ASSERT(apSuite, numSubscriptionEstablishedCalls == (CHIP_IM_MAX_NUM_READ_HANDLER + 1)); @@ -434,6 +433,135 @@ void TestReadInteraction::TestReadHandler_MultipleSubscriptions(nlTestSuite * ap NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } +void TestReadInteraction::TestReadHandler_MultipleReads(nlTestSuite * apSuite, void * apContext) +{ + TestContext & ctx = *static_cast(apContext); + + static_assert(CHIP_IM_MAX_REPORTS_IN_FLIGHT <= CHIP_IM_MAX_NUM_READ_HANDLER, + "How can we have more reports in flight than read handlers?"); + + MultipleReadHelper(apSuite, ctx, CHIP_IM_MAX_REPORTS_IN_FLIGHT); + + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); + + app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); +} + +void TestReadInteraction::TestReadHandler_OneSubscribeMultipleReads(nlTestSuite * apSuite, void * apContext) +{ + TestContext & ctx = *static_cast(apContext); + + static_assert(CHIP_IM_MAX_REPORTS_IN_FLIGHT <= CHIP_IM_MAX_NUM_READ_HANDLER, + "How can we have more reports in flight than read handlers?"); + static_assert(CHIP_IM_MAX_REPORTS_IN_FLIGHT > 1, "We won't do any reads"); + + SubscribeThenReadHelper(apSuite, ctx, 1, CHIP_IM_MAX_REPORTS_IN_FLIGHT - 1); + + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); + + app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); +} + +void TestReadInteraction::TestReadHandler_TwoSubscribesMultipleReads(nlTestSuite * apSuite, void * apContext) +{ + TestContext & ctx = *static_cast(apContext); + + static_assert(CHIP_IM_MAX_REPORTS_IN_FLIGHT <= CHIP_IM_MAX_NUM_READ_HANDLER, + "How can we have more reports in flight than read handlers?"); + static_assert(CHIP_IM_MAX_REPORTS_IN_FLIGHT > 2, "We won't do any reads"); + + SubscribeThenReadHelper(apSuite, ctx, 2, CHIP_IM_MAX_REPORTS_IN_FLIGHT - 2); + + NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); + + app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); +} + +void TestReadInteraction::SubscribeThenReadHelper(nlTestSuite * apSuite, TestContext & aCtx, size_t aSubscribeCount, + size_t aReadCount) +{ + auto sessionHandle = aCtx.GetSessionBobToAlice(); + uint32_t numSuccessCalls = 0; + uint32_t numSubscriptionEstablishedCalls = 0; + + responseDirective = kSendDataResponse; + + // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, it's + // not safe to do so. + auto onSuccessCb = [&numSuccessCalls](const app::ConcreteDataAttributePath & attributePath, const auto & dataResponse) { + numSuccessCalls++; + }; + + // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, it's + // not safe to do so. + auto onFailureCb = [&apSuite](const app::ConcreteDataAttributePath * attributePath, CHIP_ERROR aError) { + // + // We shouldn't be encountering any failures in this test. + // + NL_TEST_ASSERT(apSuite, false); + }; + + auto onSubscriptionEstablishedCb = [&numSubscriptionEstablishedCalls, &apSuite, &aCtx, aSubscribeCount, aReadCount]() { + numSubscriptionEstablishedCalls++; + if (numSubscriptionEstablishedCalls == aSubscribeCount) + { + MultipleReadHelper(apSuite, aCtx, aReadCount); + } + }; + + for (size_t i = 0; i < aSubscribeCount; ++i) + { + NL_TEST_ASSERT(apSuite, + Controller::SubscribeAttribute( + &aCtx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, 0, 10, + onSubscriptionEstablishedCb, false, true) == CHIP_NO_ERROR); + } + + aCtx.DrainAndServiceIO(); + + NL_TEST_ASSERT(apSuite, numSuccessCalls == aSubscribeCount); + NL_TEST_ASSERT(apSuite, numSubscriptionEstablishedCalls == aSubscribeCount); +} + +void TestReadInteraction::MultipleReadHelper(nlTestSuite * apSuite, TestContext & aCtx, size_t aReadCount) +{ + auto sessionHandle = aCtx.GetSessionBobToAlice(); + uint32_t numSuccessCalls = 0; + uint32_t numFailureCalls = 0; + + responseDirective = kSendDataResponse; + + uint16_t firstExpectedResponse = totalReadCount + 1; + + // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, it's + // not safe to do so. + auto onFailureCb = [&apSuite, &numFailureCalls](const app::ConcreteDataAttributePath * attributePath, CHIP_ERROR aError) { + numFailureCalls++; + + NL_TEST_ASSERT(apSuite, attributePath == nullptr); + }; + + for (size_t i = 0; i < aReadCount; ++i) + { + // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, + // it's not safe to do so. + auto onSuccessCb = [&numSuccessCalls, &apSuite, firstExpectedResponse, + i](const app::ConcreteDataAttributePath & attributePath, const auto & dataResponse) { + NL_TEST_ASSERT(apSuite, dataResponse == firstExpectedResponse + i); + numSuccessCalls++; + }; + + NL_TEST_ASSERT(apSuite, + Controller::ReadAttribute( + &aCtx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb) == CHIP_NO_ERROR); + } + + aCtx.DrainAndServiceIO(); + + NL_TEST_ASSERT(apSuite, numSuccessCalls == aReadCount); + NL_TEST_ASSERT(apSuite, numFailureCalls == 0); +} + void TestReadInteraction::TestReadHandler_MultipleSubscriptionsWithDataVersionFilter(nlTestSuite * apSuite, void * apContext) { TestContext & ctx = *static_cast(apContext); @@ -476,16 +604,7 @@ void TestReadInteraction::TestReadHandler_MultipleSubscriptionsWithDataVersionFi onSubscriptionEstablishedCb, false, true, dataVersion) == CHIP_NO_ERROR); } - // - // It may take a couple of service calls since we may hit the limit of CHIP_IM_MAX_REPORTS_IN_FLIGHT - // reports. - // - for (int i = 0; i < 10 && (numSubscriptionEstablishedCalls != (CHIP_IM_MAX_NUM_READ_HANDLER + 1)); i++) - { - ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); - } + ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, numSuccessCalls == (CHIP_IM_MAX_NUM_READ_HANDLER + 1)); NL_TEST_ASSERT(apSuite, numSubscriptionEstablishedCalls == (CHIP_IM_MAX_NUM_READ_HANDLER + 1)); @@ -537,16 +656,7 @@ void TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleSubscription &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, 0, 10, onSubscriptionEstablishedCb, false, true) == CHIP_NO_ERROR); - // - // It may take a couple of service calls since we may hit the limit of CHIP_IM_MAX_REPORTS_IN_FLIGHT - // reports. - // - for (int i = 0; i < 10; i++) - { - ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); - } + ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, numSuccessCalls == 1); NL_TEST_ASSERT(apSuite, numSubscriptionEstablishedCalls == 1); @@ -589,16 +699,7 @@ void TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleReads(nlTest Controller::ReadAttribute( &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb) == CHIP_NO_ERROR); - // - // It may take a couple of service calls since we may hit the limit of CHIP_IM_MAX_REPORTS_IN_FLIGHT - // reports. - // - for (int i = 0; i < 10; i++) - { - ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); - } + ctx.DrainAndServiceIO(); app::InteractionModelEngine::GetInstance()->SetHandlerCapacity(-1); app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); @@ -649,8 +750,6 @@ void TestReadInteraction::TestReadFabricScopedWithoutFabricFilter(nlTestSuite * &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, false /* fabric filtered */); ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, onSuccessCbInvoked && !onFailureCbInvoked); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadClients() == 0); @@ -707,8 +806,6 @@ void TestReadInteraction::TestReadFabricScopedWithFabricFilter(nlTestSuite * apS &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, true /* fabric filtered */); ctx.DrainAndServiceIO(); - app::InteractionModelEngine::GetInstance()->GetReportingEngine().Run(); - ctx.DrainAndServiceIO(); NL_TEST_ASSERT(apSuite, onSuccessCbInvoked && !onFailureCbInvoked); NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadClients() == 0); @@ -727,6 +824,9 @@ const nlTest sTests[] = NL_TEST_DEF("TestReadFabricScopedWithFabricFilter", TestReadInteraction::TestReadFabricScopedWithFabricFilter), NL_TEST_DEF("TestReadHandler_MultipleSubscriptions", TestReadInteraction::TestReadHandler_MultipleSubscriptions), NL_TEST_DEF("TestReadHandler_MultipleSubscriptionsWithDataVersionFilter", TestReadInteraction::TestReadHandler_MultipleSubscriptionsWithDataVersionFilter), + NL_TEST_DEF("TestReadHandler_MultipleReads", TestReadInteraction::TestReadHandler_MultipleReads), + NL_TEST_DEF("TestReadHandler_OneSubscribeMultipleReads", TestReadInteraction::TestReadHandler_OneSubscribeMultipleReads), + NL_TEST_DEF("TestReadHandler_TwoSubscribesMultipleReads", TestReadInteraction::TestReadHandler_TwoSubscribesMultipleReads), NL_TEST_DEF("TestReadHandlerResourceExhaustion_MultipleSubscriptions", TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleSubscriptions), NL_TEST_DEF("TestReadHandlerResourceExhaustion_MultipleReads", TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleReads), NL_TEST_DEF("TestReadAttributeTimeout", TestReadInteraction::TestReadAttributeTimeout), diff --git a/src/messaging/tests/MessagingContext.h b/src/messaging/tests/MessagingContext.h index 2f543db4191359..0b498c5ec0038c 100644 --- a/src/messaging/tests/MessagingContext.h +++ b/src/messaging/tests/MessagingContext.h @@ -259,21 +259,44 @@ class LoopbackMessagingContext : public MessagingContext * Consequently, this is guarded with a user-provided timeout to ensure we don't have unit-tests that stall * in CI due to bugs in the code that is being tested. * - * This DOES NOT ensure that all pending events are serviced to completion (i.e timers, any ScheduleWork calls). + * This DOES NOT ensure that all pending events are serviced to completion + * (i.e timers, any ScheduleWork calls), but does: * + * 1) Guarantee that every call will make some progress on ready-to-run + * things, by calling DriveIO at least once. + * 2) Try to ensure that any ScheduleWork calls that happend directly as a + * result of message reception, and any messages those async tasks send, + * get handled before DrainAndServiceIO returns. */ void DrainAndServiceIO(System::Clock::Timeout maxWait = chip::System::Clock::Seconds16(5)) { auto & impl = GetLoopback(); System::Clock::Timestamp startTime = System::SystemClock().GetMonotonicTimestamp(); - while (impl.HasPendingMessages()) + while (true) { + bool hadPendingMessages = impl.HasPendingMessages(); + while (impl.HasPendingMessages()) + { + mIOContext.DriveIO(); + if ((System::SystemClock().GetMonotonicTimestamp() - startTime) >= maxWait) + { + return; + } + } + // Processing those messages might have queued some run-ASAP async + // work. Make sure to process that too, in case it generates + // response messages. mIOContext.DriveIO(); - if ((System::SystemClock().GetMonotonicTimestamp() - startTime) >= maxWait) + if (!hadPendingMessages && !impl.HasPendingMessages()) { + // We're not making any progress on messages. Just stop. break; } + // No need to check our timer here: either impl.HasPendingMessages() + // is true and we will check it next iteration, or it's false and we + // will either stop on the next iteration or it will become true and + // we will check the timer then. } }