Skip to content

Commit

Permalink
Allow ReliableMessageMgr to perform address relookup on first message…
Browse files Browse the repository at this point in the history
… fail (#21768)

* OnFirstMessageDeliveryFailed is removed from SessionDelegate
* CASESessionManager will implement delegate to allow updating Session transport peer address.
*During CASESessionManager::Init it will register itself with ReliableMessageMgr, so that ReliableMessageMgr has a callback.
* CASESessionManager create a different OperationalSessionSetup to perform address lookup and update that is separate
  from OperationalSessionSetup used for establishing a connection.

Co-authored-by: Boris Zbarsky <bzbarsky@apple.com>
  • Loading branch information
tehampson and bzbarsky-apple authored Aug 12, 2022
1 parent 0888818 commit b94e1a1
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 52 deletions.
29 changes: 25 additions & 4 deletions src/app/CASESessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ CHIP_ERROR CASESessionManager::Init(chip::System::Layer * systemLayer, const CAS
{
ReturnErrorOnFailure(params.sessionInitParams.Validate());
mConfig = params;
params.sessionInitParams.exchangeMgr->GetReliableMessageMgr()->RegisterSessionUpdateDelegate(this);
return AddressResolve::Resolver::Instance().Init(systemLayer);
}

Expand All @@ -34,7 +35,8 @@ void CASESessionManager::FindOrEstablishSession(const ScopedNodeId & peerId, Cal
ChipLogDetail(CASESessionManager, "FindOrEstablishSession: PeerId = [%d:" ChipLogFormatX64 "]", peerId.GetFabricIndex(),
ChipLogValueX64(peerId.GetNodeId()));

OperationalSessionSetup * session = FindExistingSessionSetup(peerId);
bool forAddressUpdate = false;
OperationalSessionSetup * session = FindExistingSessionSetup(peerId, forAddressUpdate);
if (session == nullptr)
{
ChipLogDetail(CASESessionManager, "FindOrEstablishSession: No existing OperationalSessionSetup instance found");
Expand Down Expand Up @@ -78,9 +80,28 @@ CHIP_ERROR CASESessionManager::GetPeerAddress(const ScopedNodeId & peerId, Trans
return CHIP_NO_ERROR;
}

OperationalSessionSetup * CASESessionManager::FindExistingSessionSetup(const ScopedNodeId & peerId) const
void CASESessionManager::UpdatePeerAddress(ScopedNodeId peerId)
{
return mConfig.sessionSetupPool->FindSessionSetup(peerId);
bool forAddressUpdate = true;
OperationalSessionSetup * session = FindExistingSessionSetup(peerId, forAddressUpdate);
if (session == nullptr)
{
ChipLogDetail(CASESessionManager, "UpdatePeerAddress: No existing OperationalSessionSetup instance found");

session = mConfig.sessionSetupPool->Allocate(mConfig.sessionInitParams, peerId, this);
if (session == nullptr)
{
ChipLogDetail(CASESessionManager, "UpdatePeerAddress: Failed to allocate OperationalSessionSetup instance");
return;
}
}

session->PerformAddressUpdate();
}

OperationalSessionSetup * CASESessionManager::FindExistingSessionSetup(const ScopedNodeId & peerId, bool forAddressUpdate) const
{
return mConfig.sessionSetupPool->FindSessionSetup(peerId, forAddressUpdate);
}

Optional<SessionHandle> CASESessionManager::FindExistingSession(const ScopedNodeId & peerId) const
Expand All @@ -89,7 +110,7 @@ Optional<SessionHandle> CASESessionManager::FindExistingSession(const ScopedNode
MakeOptional(Transport::SecureSession::Type::kCASE));
}

void CASESessionManager::ReleaseSession(OperationalSessionSetup * session) const
void CASESessionManager::ReleaseSession(OperationalSessionSetup * session)
{
if (session != nullptr)
{
Expand Down
23 changes: 17 additions & 6 deletions src/app/CASESessionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <lib/support/Pool.h>
#include <platform/CHIPDeviceLayer.h>
#include <transport/SessionDelegate.h>
#include <transport/SessionUpdateDelegate.h>

#include <lib/dnssd/ResolverProxy.h>

Expand All @@ -45,11 +46,17 @@ struct CASESessionManagerConfig
* 4. During session establishment, trigger node ID resolution (if needed), and update the DNS-SD cache (if resolution is
* successful)
*/
class CASESessionManager : public OperationalSessionReleaseDelegate
class CASESessionManager : public OperationalSessionReleaseDelegate, public SessionUpdateDelegate
{
public:
CASESessionManager() = default;
virtual ~CASESessionManager() {}
virtual ~CASESessionManager()
{
if (mConfig.sessionInitParams.Validate() == CHIP_NO_ERROR)
{
mConfig.sessionInitParams.exchangeMgr->GetReliableMessageMgr()->RegisterSessionUpdateDelegate(nullptr);
}
}

CHIP_ERROR Init(chip::System::Layer * systemLayer, const CASESessionManagerConfig & params);
void Shutdown() {}
Expand All @@ -71,9 +78,9 @@ class CASESessionManager : public OperationalSessionReleaseDelegate
void FindOrEstablishSession(const ScopedNodeId & peerId, Callback::Callback<OnDeviceConnected> * onConnection,
Callback::Callback<OnDeviceConnectionFailure> * onFailure);

OperationalSessionSetup * FindExistingSessionSetup(const ScopedNodeId & peerId) const;
OperationalSessionSetup * FindExistingSessionSetup(const ScopedNodeId & peerId, bool forAddressUpdate = false) const;

void ReleaseSession(const ScopedNodeId & peerId) override;
void ReleaseSession(const ScopedNodeId & peerId);

void ReleaseSessionsForFabric(FabricIndex fabricIndex);

Expand All @@ -89,11 +96,15 @@ class CASESessionManager : public OperationalSessionReleaseDelegate
*/
CHIP_ERROR GetPeerAddress(const ScopedNodeId & peerId, Transport::PeerAddress & addr);

//////////// OperationalSessionReleaseDelegate Implementation ///////////////
void ReleaseSession(OperationalSessionSetup * device) override;

//////////// SessionUpdateDelegate Implementation ///////////////
void UpdatePeerAddress(ScopedNodeId peerId) override;

private:
Optional<SessionHandle> FindExistingSession(const ScopedNodeId & peerId) const;

void ReleaseSession(OperationalSessionSetup * device) const;

CASESessionManagerConfig mConfig;
};

Expand Down
68 changes: 46 additions & 22 deletions src/app/OperationalSessionSetup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,27 +191,29 @@ void OperationalSessionSetup::UpdateDeviceData(const Transport::PeerAddress & ad
if (mState == State::ResolvingAddress)
{
MoveToState(State::HasAddress);
err = EstablishConnection();
if (err != CHIP_NO_ERROR)
{
DequeueConnectionCallbacks(err);
// Do not touch `this` instance anymore; it has been destroyed in DequeueConnectionCallbacks.
return;
}
}
else
{
if (!mSecureSession)
mInitParams.sessionManager->UpdateAllSessionsPeerAddress(mPeerId, addr);
if (!mPerformingAddressUpdate)
{
// Nothing needs to be done here. It's not an error to not have a
// secureSession. For one thing, we could have gotten a different
// UpdateAddress already and that caused connections to be torn down and
// whatnot.
err = EstablishConnection();
if (err != CHIP_NO_ERROR)
{
DequeueConnectionCallbacks(err);
// Do not touch `this` instance anymore; it has been destroyed in DequeueConnectionCallbacks.
return;
}
// We expect to get a callback via OnSessionEstablished or OnSessionEstablishmentError to continue
// the state machine forward.
return;
}

mSecureSession.Get().Value()->AsSecureSession()->SetPeerAddress(addr);
DequeueConnectionCallbacks(CHIP_NO_ERROR);
// Do not touch `this` instance anymore; it has been destroyed in DequeueConnectionCallbacks.
return;
}

ChipLogError(Controller, "Received UpdateDeviceData in incorrect state");
DequeueConnectionCallbacks(CHIP_ERROR_INCORRECT_STATE);
// Do not touch `this` instance anymore; it has been destroyed in DequeueConnectionCallbacks.
}

CHIP_ERROR OperationalSessionSetup::EstablishConnection()
Expand Down Expand Up @@ -297,7 +299,7 @@ void OperationalSessionSetup::DequeueConnectionCallbacks(CHIP_ERROR error)
}
}

releaseDelegate->ReleaseSession(peerId);
releaseDelegate->ReleaseSession(this);
}

void OperationalSessionSetup::OnSessionEstablishmentError(CHIP_ERROR error)
Expand Down Expand Up @@ -364,11 +366,6 @@ void OperationalSessionSetup::OnSessionReleased()
MoveToState(State::HasAddress);
}

void OperationalSessionSetup::OnFirstMessageDeliveryFailed()
{
LookupPeerAddress();
}

void OperationalSessionSetup::OnSessionHang()
{
Disconnect();
Expand Down Expand Up @@ -423,6 +420,32 @@ CHIP_ERROR OperationalSessionSetup::LookupPeerAddress()
return Resolver::Instance().LookupNode(request, mAddressLookupHandle);
}

void OperationalSessionSetup::PerformAddressUpdate()
{
if (mPerformingAddressUpdate)
{
// We are already in the middle of a lookup from a previous call to
// PerformAddressUpdate. In that case we will just exit right away as
// we are already looking to update the results from the previous lookup.
return;
}

// We must be newly-allocated to handle this address lookup, so must be in the NeedsAddress state.
VerifyOrDie(mState == State::NeedsAddress);

// We are doing an address lookup whether we have an active session for this peer or not.
mPerformingAddressUpdate = true;
MoveToState(State::ResolvingAddress);
CHIP_ERROR err = LookupPeerAddress();
if (err != CHIP_NO_ERROR)
{
ChipLogError(Controller, "PerformAddressUpdate could not perform lookup");
DequeueConnectionCallbacks(err);
// Do not touch `this` instance anymore; it has been destroyed in DequeueConnectionCallbacks.
return;
}
}

void OperationalSessionSetup::OnNodeAddressResolved(const PeerId & peerId, const ResolveResult & result)
{
UpdateDeviceData(result.address, result.mrpRemoteConfig);
Expand All @@ -438,6 +461,7 @@ void OperationalSessionSetup::OnNodeAddressResolutionFailed(const PeerId & peerI
MoveToState(State::NeedsAddress);
}

// No need to set mPerformingAddressUpdate to false since call below releases `this`.
DequeueConnectionCallbacks(reason);
// Do not touch `this` instance anymore; it has been destroyed in DequeueConnectionCallbacks.
}
Expand Down
16 changes: 10 additions & 6 deletions src/app/OperationalSessionSetup.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ struct DeviceProxyInitParams
}
};

class OperationalSessionSetup;

/**
* @brief Delegate provided when creating OperationalSessionSetup.
*
Expand All @@ -80,10 +82,8 @@ struct DeviceProxyInitParams
class OperationalSessionReleaseDelegate
{
public:
virtual ~OperationalSessionReleaseDelegate() = default;
// TODO Issue #20452: Once cleanup from #20452 takes place we can provide OperationalSessionSetup *
// instead of ScopedNodeId here.
virtual void ReleaseSession(const ScopedNodeId & peerId) = 0;
virtual ~OperationalSessionReleaseDelegate() = default;
virtual void ReleaseSession(OperationalSessionSetup * sessionSetup) = 0;
};

/**
Expand Down Expand Up @@ -202,6 +202,8 @@ class DLL_EXPORT OperationalSessionSetup : public SessionDelegate,

bool IsConnecting() const { return mState == State::Connecting; }

bool IsForAddressUpdate() const { return mPerformingAddressUpdate; }

/**
* IsResolvingAddress returns true if we are doing an address resolution
* that needs to happen before we can establish CASE. We can be in the
Expand All @@ -219,8 +221,6 @@ class DLL_EXPORT OperationalSessionSetup : public SessionDelegate,

// Called when a connection is closing. The object releases all resources associated with the connection.
void OnSessionReleased() override;
// Called when a message is not acked within first retrans timer, try to refresh the peer address
void OnFirstMessageDeliveryFailed() override;
// Called when a connection is hanging. Try to re-establish another session, and shift to the new session when done, the
// original session won't be touched during the period.
void OnSessionHang() override;
Expand Down Expand Up @@ -261,6 +261,8 @@ class DLL_EXPORT OperationalSessionSetup : public SessionDelegate,
*/
CHIP_ERROR LookupPeerAddress();

void PerformAddressUpdate();

// AddressResolve::NodeListener - notifications when dnssd finds a node IP address
void OnNodeAddressResolved(const PeerId & peerId, const AddressResolve::ResolveResult & result) override;
void OnNodeAddressResolutionFailed(const PeerId & peerId, CHIP_ERROR reason) override;
Expand Down Expand Up @@ -304,6 +306,8 @@ class DLL_EXPORT OperationalSessionSetup : public SessionDelegate,

ReliableMessageProtocolConfig mRemoteMRPConfig = GetDefaultMRPConfig();

bool mPerformingAddressUpdate = false;

CHIP_ERROR EstablishConnection();

/*
Expand Down
6 changes: 3 additions & 3 deletions src/app/OperationalSessionSetupPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class OperationalSessionSetupPoolDelegate

virtual void Release(OperationalSessionSetup * device) = 0;

virtual OperationalSessionSetup * FindSessionSetup(ScopedNodeId peerId) = 0;
virtual OperationalSessionSetup * FindSessionSetup(ScopedNodeId peerId, bool forAddressUpdate) = 0;

virtual void ReleaseAllSessionSetupsForFabric(FabricIndex fabricIndex) = 0;

Expand All @@ -55,11 +55,11 @@ class OperationalSessionSetupPool : public OperationalSessionSetupPoolDelegate

void Release(OperationalSessionSetup * device) override { mSessionSetupPool.ReleaseObject(device); }

OperationalSessionSetup * FindSessionSetup(ScopedNodeId peerId) override
OperationalSessionSetup * FindSessionSetup(ScopedNodeId peerId, bool forAddressUpdate) override
{
OperationalSessionSetup * foundDevice = nullptr;
mSessionSetupPool.ForEachActiveObject([&](auto * activeSetup) {
if (activeSetup->GetPeerId() == peerId)
if (activeSetup->GetPeerId() == peerId && activeSetup->IsForAddressUpdate() == forAddressUpdate)
{
foundDevice = activeSetup;
return Loop::Break;
Expand Down
9 changes: 7 additions & 2 deletions src/messaging/ReliableMessageMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,9 @@ CHIP_ERROR ReliableMessageMgr::SendFromRetransTable(RetransTableEntry * entry)
if (exchangeMgr)
{
// After the first failure notify session manager to refresh device data
if (entry->sendCount == 1)
if (entry->sendCount == 1 && mSessionUpdateDelegate != nullptr)
{
entry->ec->GetSessionHandle()->DispatchSessionEvent(&SessionDelegate::OnFirstMessageDeliveryFailed);
mSessionUpdateDelegate->UpdatePeerAddress(entry->ec->GetSessionHandle()->GetPeer());
}
}
}
Expand Down Expand Up @@ -409,6 +409,11 @@ void ReliableMessageMgr::StopTimer()
mSystemLayer->CancelTimer(Timeout, this);
}

void ReliableMessageMgr::RegisterSessionUpdateDelegate(SessionUpdateDelegate * sessionUpdateDelegate)
{
mSessionUpdateDelegate = sessionUpdateDelegate;
}

#if CHIP_CONFIG_TEST
int ReliableMessageMgr::TestGetCountRetransTable()
{
Expand Down
13 changes: 13 additions & 0 deletions src/messaging/ReliableMessageMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <messaging/ReliableMessageProtocolConfig.h>
#include <system/SystemLayer.h>
#include <system/SystemPacketBuffer.h>
#include <transport/SessionUpdateDelegate.h>
#include <transport/raw/MessageHeader.h>

namespace chip {
Expand Down Expand Up @@ -171,6 +172,16 @@ class ReliableMessageMgr
*/
void StopTimer();

/**
* Registers a delegate to perform an address lookup and update all active sessions.
*
* @param[in] sessionUpdateDelegate - Pointer to delegate to perform address lookup
* that will update all active session. A null pointer is allowed if you
* no longer have a valid delegate.
*
*/
void RegisterSessionUpdateDelegate(SessionUpdateDelegate * sessionUpdateDelegate);

#if CHIP_CONFIG_TEST
// Functions for testing
int TestGetCountRetransTable();
Expand All @@ -194,6 +205,8 @@ class ReliableMessageMgr

// ReliableMessageProtocol Global tables for timer context
ObjectPool<RetransTableEntry, CHIP_CONFIG_RMP_RETRANS_TABLE_SIZE> mRetransTable;

SessionUpdateDelegate * mSessionUpdateDelegate = nullptr;
};

} // namespace Messaging
Expand Down
8 changes: 0 additions & 8 deletions src/transport/SessionDelegate.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,6 @@ class DLL_EXPORT SessionDelegate
*/
virtual void OnSessionReleased() = 0;

/**
* @brief
* Called when the first message delivery in an exchange fails, so actions aiming to recover connection can be performed.
*
* Note: the implementation must not do anything that will destroy the session or change the SessionHolder.
*/
virtual void OnFirstMessageDeliveryFailed() {}

/**
* @brief
* Called when a session is unresponsive for a while (detected by MRP)
Expand Down
Loading

0 comments on commit b94e1a1

Please sign in to comment.