Skip to content

Commit

Permalink
IM: Create ReadHandler after Session Establishment for Subscription R…
Browse files Browse the repository at this point in the history
…esumption
  • Loading branch information
wqx6 committed Nov 15, 2023
1 parent 75cf38c commit 96c2553
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 86 deletions.
7 changes: 7 additions & 0 deletions src/app/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ static_library("app") {
]
}

if (chip_persist_subscriptions) {
sources += [
"SubscriptionResumptionHelper.cpp",
"SubscriptionResumptionHelper.h",
]
}

if (chip_enable_read_client) {
sources += [
"BufferedReadCallback.cpp",
Expand Down
17 changes: 6 additions & 11 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1853,26 +1853,21 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap
continue;
}

auto requestedAttributePathCount = subscriptionInfo.mAttributePaths.AllocatedSize();
auto requestedEventPathCount = subscriptionInfo.mEventPaths.AllocatedSize();
if (!imEngine->EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, requestedAttributePathCount,
requestedEventPathCount))
auto subscriptionResumptionHelper = Platform::MakeUnique<SubscriptionResumptionHelper>();
if (subscriptionResumptionHelper == nullptr)
{
ChipLogProgress(InteractionModel, "no resource for Subscription resumption");
ChipLogProgress(InteractionModel, "Failed to create SubscriptionResumptionHelper");
iterator->Release();
return;
}

ReadHandler * handler = imEngine->mReadHandlers.CreateObject(*imEngine, imEngine->GetReportScheduler());
if (handler == nullptr)
if (subscriptionResumptionHelper->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo) != CHIP_NO_ERROR)
{
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
ChipLogProgress(InteractionModel, "Failed to ResumeSubscription 0x%" PRIx32, subscriptionInfo.mSubscriptionId);
iterator->Release();
return;
}

ChipLogProgress(InteractionModel, "Resuming subscriptionId %" PRIu32, subscriptionInfo.mSubscriptionId);
handler->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo);
subscriptionResumptionHelper.release();
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
resumedSubscriptions = true;
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
Expand Down
2 changes: 2 additions & 0 deletions src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <app/AppBuildConfig.h>
#include <app/MessageDef/AttributeReportIBs.h>
#include <app/MessageDef/ReportDataMessage.h>
#include <app/SubscriptionResumptionHelper.h>
#include <lib/core/CHIPCore.h>
#include <lib/support/CodeUtils.h>
#include <lib/support/DLLUtil.h>
Expand Down Expand Up @@ -377,6 +378,7 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
friend class reporting::Engine;
friend class TestCommandInteraction;
friend class TestInteractionModelEngine;
friend class SubscriptionResumptionHelper;
using Status = Protocols::InteractionModel::Status;

void OnDone(CommandHandler & apCommandObj) override;
Expand Down
89 changes: 30 additions & 59 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
InteractionType aInteractionType, Observer * observer) :
mExchangeCtx(*this),
mManagementCallback(apCallback)
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
,
mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
#endif
{
VerifyOrDie(apExchangeContext != nullptr);

Expand All @@ -83,8 +79,7 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :
mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
mExchangeCtx(*this), mManagementCallback(apCallback)
{
mInteractionType = InteractionType::Subscribe;
mFlags.ClearAll();
Expand All @@ -93,41 +88,55 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :
mObserver = observer;
}

void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo)
void ReadHandler::OnSubscriptionResumed(const SessionHandle & sessionHandle, SubscriptionResumptionHelper & helper)
{
mSubscriptionId = subscriptionInfo.mSubscriptionId;
mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval;
mMaxInterval = subscriptionInfo.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, subscriptionInfo.mFabricFiltered);
mSubscriptionId = helper.mSubscriptionId;
mMinIntervalFloorSeconds = helper.mMinIntervalFloorSeconds;
mMaxInterval = helper.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, helper.mFabricFiltered);

// Move dynamically allocated attributes and events from the SubscriptionInfo struct into
// the object pool managed by the IM engine
for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedSize(); i++)
for (size_t i = 0; i < helper.mAttributePaths.AllocatedSize(); i++)
{
AttributePathParams attributePathParams = subscriptionInfo.mAttributePaths[i].GetParams();
CHIP_ERROR err =
InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, attributePathParams);
InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, helper.mAttributePaths[i]);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}
for (size_t i = 0; i < subscriptionInfo.mEventPaths.AllocatedSize(); i++)
for (size_t i = 0; i < helper.mEventPaths.AllocatedSize(); i++)
{
EventPathParams eventPathParams = subscriptionInfo.mEventPaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, eventPathParams);
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, helper.mEventPaths[i]);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}

// Ask IM engine to start CASE session with subscriber
ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex);
caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
mSessionHandle.Grab(sessionHandle);

SetStateFlag(ReadHandlerFlags::ActiveSubscription);

auto * appCallback = mManagementCallback.GetAppCallback();
if (appCallback)
{
appCallback->OnSubscriptionEstablished(*this);
}
// Notify the observer that a subscription has been resumed
mObserver->OnSubscriptionEstablished(this);

MoveToState(HandlerState::CanStartReporting);

ObjectList<AttributePathParams> * attributePath = mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}

#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
Expand Down Expand Up @@ -892,43 +901,5 @@ void ReadHandler::ClearStateFlag(ReadHandlerFlags aFlag)
SetStateFlag(aFlag, false);
}

void ReadHandler::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);

_this->mSessionHandle.Grab(sessionHandle);

_this->SetStateFlag(ReadHandlerFlags::ActiveSubscription);

auto * appCallback = _this->mManagementCallback.GetAppCallback();
if (appCallback)
{
appCallback->OnSubscriptionEstablished(*_this);
}
// Notify the observer that a subscription has been resumed
_this->mObserver->OnSubscriptionEstablished(_this);

_this->MoveToState(HandlerState::CanStartReporting);

ObjectList<AttributePathParams> * attributePath = _this->mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}

void ReadHandler::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);
VerifyOrDie(_this != nullptr);

// TODO: Have a retry mechanism tied to wake interval for IC devices
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
err.Format());
_this->Close();
}

} // namespace app
} // namespace chip
21 changes: 5 additions & 16 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <app/MessageDef/EventPathIBs.h>
#include <app/ObjectList.h>
#include <app/OperationalSessionSetup.h>
#include <app/SubscriptionResumptionHelper.h>
#include <app/SubscriptionResumptionStorage.h>
#include <lib/core/CHIPCallback.h>
#include <lib/core/CHIPCore.h>
Expand Down Expand Up @@ -305,13 +306,11 @@ class ReadHandler : public Messaging::ExchangeDelegate
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
*
* @brief Resume a persisted subscription
* @brief Initialize a ReadHandler for a resumed subsciption
*
* Used after ReadHandler(ManagementCallback & apCallback). This will start a CASE session
* with the subscriber if one doesn't already exist, and send full priming report when connected.
* Used after the SubscriptionResumptionHelper establishs the CASE session
*/
void ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo);
void OnSubscriptionResumed(const SessionHandle & sessionHandle, SubscriptionResumptionHelper & helper);
#endif

/**
Expand Down Expand Up @@ -429,6 +428,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
//
friend class chip::app::reporting::Engine;
friend class chip::app::InteractionModelEngine;
friend class chip::app::SubscriptionResumptionHelper;

// The report scheduler needs to be able to access StateFlag private functions ShouldStartReporting(), CanStartReporting(),
// ForceDirtyState() and IsDirty() to know when to schedule a run so it is declared as a friend class.
Expand Down Expand Up @@ -485,11 +485,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
/// @param aFlag Flag to clear
void ClearStateFlag(ReadHandlerFlags aFlag);

// Helpers for continuing the subscription resumption
static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle);
static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error);

AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr);

// The current generation of the reporting engine dirty set the last time we were notified that a path we're interested in was
Expand Down Expand Up @@ -571,12 +566,6 @@ class ReadHandler : public Messaging::ExchangeDelegate

// TODO (#27675): Merge all observers into one and that one will dispatch the callbacks to the right place.
Observer * mObserver = nullptr;

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
// Callbacks to handle server-initiated session success/failure
chip::Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
chip::Callback::Callback<OnDeviceConnectionFailure> mOnConnectionFailureCallback;
#endif
};
} // namespace app
} // namespace chip
94 changes: 94 additions & 0 deletions src/app/SubscriptionResumptionHelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
*
* Copyright (c) 2023 Project CHIP Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <app/SubscriptionResumptionHelper.h>
#include <app/InteractionModelEngine.h>

namespace chip {
namespace app {
SubscriptionResumptionHelper::SubscriptionResumptionHelper() :
mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
{}

CHIP_ERROR SubscriptionResumptionHelper::ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo)
{
mNodeId = subscriptionInfo.mNodeId;
mFabricIndex = subscriptionInfo.mFabricIndex;
mSubscriptionId = subscriptionInfo.mSubscriptionId;
mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval;
mMaxInterval = subscriptionInfo.mMaxInterval;
mFabricFiltered = subscriptionInfo.mFabricFiltered;

if (subscriptionInfo.mAttributePaths.AllocatedSize() > 0)
{
mAttributePaths.Alloc(subscriptionInfo.mAttributePaths.AllocatedSize());
ReturnErrorCodeIf(mAttributePaths.Get() == nullptr, CHIP_ERROR_NO_MEMORY);
for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedSize(); i++)
{
mAttributePaths[i] = subscriptionInfo.mAttributePaths[i].GetParams();
}
}

if (subscriptionInfo.mEventPaths.AllocatedSize() > 0)
{
mEventPaths.Alloc(subscriptionInfo.mEventPaths.AllocatedSize());
ReturnErrorCodeIf(mEventPaths.Get() == nullptr, CHIP_ERROR_NO_MEMORY);
for (size_t i = 0; i < subscriptionInfo.mEventPaths.AllocatedSize(); i++)
{
mEventPaths[i] = subscriptionInfo.mEventPaths[i].GetParams();
}
}
ScopedNodeId peerNode = ScopedNodeId(mNodeId, mFabricIndex);
caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
return CHIP_NO_ERROR;
}

void SubscriptionResumptionHelper::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle)
{
Platform::UniquePtr<SubscriptionResumptionHelper> _this(static_cast<SubscriptionResumptionHelper *>(context));
InteractionModelEngine *imEngine = InteractionModelEngine::GetInstance();
if (!imEngine->EnsureResourceForSubscription(_this->mFabricIndex, _this->mAttributePaths.AllocatedSize(),
_this->mEventPaths.AllocatedSize()))
{
ChipLogProgress(InteractionModel, "no resource for subscription resumption");
return;
}
ReadHandler *readHandler = imEngine->mReadHandlers.CreateObject(*imEngine, imEngine->GetReportScheduler());
if (readHandler == nullptr) {
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
return;
}
readHandler->OnSubscriptionResumed(sessionHandle, *_this);
}

void SubscriptionResumptionHelper::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error)
{
Platform::UniquePtr<SubscriptionResumptionHelper> _this(static_cast<SubscriptionResumptionHelper *>(context));
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
error.Format());
// Delete the persistent subscription information
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
if (subscriptionResumptionStorage)
{
subscriptionResumptionStorage->Delete(_this->mNodeId, _this->mFabricIndex, _this->mSubscriptionId);
}
}

} // namespace app
} // namespace chip
Loading

0 comments on commit 96c2553

Please sign in to comment.