Skip to content

Commit

Permalink
Add missing TypeLookup listeners (#4335) (#4438)
Browse files Browse the repository at this point in the history
* Add listener classes to handle cache change acknowledgments



* Uncrustify



* Fix memory leaks in TypeLookupManager.cpp

* Apply changes



* Apply changes



* Refs #20311: Further refactor TypeLookupManager::create_endpoints



---------

Signed-off-by: Irene Bandera <irenebandera@eprosima.com>
Signed-off-by: EduPonz <eduardoponz@eprosima.com>
Co-authored-by: Irene Bandera Moreno <irenebandera@eprosima.com>
Co-authored-by: EduPonz <eduardoponz@eprosima.com>
  • Loading branch information
3 people authored Apr 4, 2024
1 parent 39b3d58 commit 6a45117
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 28 deletions.
6 changes: 6 additions & 0 deletions include/fastdds/dds/builtin/typelookup/TypeLookupManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class TypeLookupManager
bool create_secure_endpoints();
#endif
*/

void request_cache_change_acked(
fastrtps::rtps::CacheChange_t* change);

void reply_cache_change_acked(
fastrtps::rtps::CacheChange_t* change);
};

} /* namespace builtin */
Expand Down
17 changes: 14 additions & 3 deletions include/fastdds/dds/builtin/typelookup/TypeLookupReplyListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#define TYPELOOKUP_REPLY_LISTENER_HPP_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#include <fastrtps/rtps/reader/ReaderListener.h>
#include <fastrtps/rtps/writer/WriterListener.h>


namespace eprosima {
namespace fastrtps {
Expand All @@ -47,7 +49,7 @@ class TypeLookupManager;
* Class TypeLookupReplyListener that receives the typelookup request messages of remote endpoints.
* @ingroup TYPES_MODULE
*/
class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener
class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener, public fastrtps::rtps::WriterListener
{
public:

Expand All @@ -70,7 +72,16 @@ class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener
*/
void onNewCacheChangeAdded(
fastrtps::rtps::RTPSReader* reader,
const fastrtps::rtps::CacheChange_t* const change) override;
const fastrtps::rtps::CacheChange_t* const change) override;

/**
* @brief This method is called when all the readers matched with this Writer acknowledge that a cache
* change has been received.
* @param change The cache change
*/
void onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change) override;

private:

Expand All @@ -85,5 +96,5 @@ class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener
} /* namespace dds */
} /* namespace fastdds */
} /* namespace eprosima */
#endif
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* TYPELOOKUP_REPLY_LISTENER_HPP_*/
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#define TYPELOOKUP_REQUEST_LISTENER_HPP_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#include <fastrtps/rtps/reader/ReaderListener.h>
#include <fastrtps/rtps/writer/WriterListener.h>


namespace eprosima {
namespace fastrtps {
Expand All @@ -47,7 +49,7 @@ class TypeLookupManager;
* Class TypeLookupRequestListener that receives the typelookup request messages of remote endpoints.
* @ingroup TYPES_MODULE
*/
class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener
class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener, public fastrtps::rtps::WriterListener
{
public:

Expand All @@ -70,7 +72,16 @@ class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener
*/
void onNewCacheChangeAdded(
fastrtps::rtps::RTPSReader* reader,
const fastrtps::rtps::CacheChange_t* const change) override;
const fastrtps::rtps::CacheChange_t* const change) override;

/**
* @brief This method is called when all the readers matched with this Writer acknowledge that a cache
* change has been received.
* @param change The cache change
*/
void onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change) override;

private:

Expand All @@ -86,5 +97,5 @@ class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener
} /* namespace dds */
} /* namespace fastdds */
} /* namespace eprosima */
#endif
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* TYPELOOKUP_REQUEST_LISTENER_HPP_*/
105 changes: 83 additions & 22 deletions src/cpp/fastdds/builtin/typelookup/TypeLookupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ ReaderHistory* TypeLookupManager::get_builtin_reply_reader_history()
*/
bool TypeLookupManager::create_endpoints()
{
bool ret = true;

// Built-in history attributes.
HistoryAttributes hatt;
hatt.initialReservedCaches = 20;
Expand All @@ -343,14 +345,15 @@ bool TypeLookupManager::create_endpoints()
// Built-in request writer
if (builtin_protocols_->m_att.typelookup_config.use_client)
{
request_listener_ = new TypeLookupRequestListener(this);
builtin_request_writer_history_ = new WriterHistory(hatt);

RTPSWriter* req_writer;
if (participant_->createWriter(
&req_writer,
watt,
builtin_request_writer_history_,
nullptr,
request_listener_,
fastrtps::rtps::c_EntityId_TypeLookup_request_writer,
true))
{
Expand All @@ -360,23 +363,22 @@ bool TypeLookupManager::create_endpoints()
else
{
logError(TYPELOOKUP_SERVICE, "Typelookup request writer creation failed.");
delete builtin_request_writer_history_;
builtin_request_writer_history_ = nullptr;
return false;
ret = false;
}
}

// Built-in reply writer
if (builtin_protocols_->m_att.typelookup_config.use_server)
if (ret && builtin_protocols_->m_att.typelookup_config.use_server)
{
reply_listener_ = new TypeLookupReplyListener(this);
builtin_reply_writer_history_ = new WriterHistory(hatt);

RTPSWriter* rep_writer;
if (participant_->createWriter(
&rep_writer,
watt,
builtin_reply_writer_history_,
nullptr,
reply_listener_,
fastrtps::rtps::c_EntityId_TypeLookup_reply_writer,
true))
{
Expand All @@ -386,9 +388,7 @@ bool TypeLookupManager::create_endpoints()
else
{
logError(TYPELOOKUP_SERVICE, "Typelookup reply writer creation failed.");
delete builtin_reply_writer_history_;
builtin_reply_writer_history_ = nullptr;
return false;
ret = false;
}
}

Expand All @@ -403,9 +403,12 @@ bool TypeLookupManager::create_endpoints()
ratt.endpoint.durabilityKind = fastrtps::rtps::VOLATILE;

// Built-in request reader
if (builtin_protocols_->m_att.typelookup_config.use_server)
if (ret && builtin_protocols_->m_att.typelookup_config.use_server)
{
request_listener_ = new TypeLookupRequestListener(this);
if (nullptr == request_listener_)
{
request_listener_ = new TypeLookupRequestListener(this);
}
builtin_request_reader_history_ = new ReaderHistory(hatt);

RTPSReader* req_reader;
Expand All @@ -423,18 +426,17 @@ bool TypeLookupManager::create_endpoints()
else
{
logError(TYPELOOKUP_SERVICE, "Typelookup request reader creation failed.");
delete builtin_request_reader_history_;
builtin_request_reader_history_ = nullptr;
delete request_listener_;
request_listener_ = nullptr;
return false;
ret = false;
}
}

// Built-in reply reader
if (builtin_protocols_->m_att.typelookup_config.use_client)
if (ret && builtin_protocols_->m_att.typelookup_config.use_client)
{
reply_listener_ = new TypeLookupReplyListener(this);
if (nullptr == reply_listener_)
{
reply_listener_ = new TypeLookupReplyListener(this);
}
builtin_reply_reader_history_ = new ReaderHistory(hatt);

RTPSReader* rep_reader;
Expand All @@ -452,15 +454,50 @@ bool TypeLookupManager::create_endpoints()
else
{
logError(TYPELOOKUP_SERVICE, "Typelookup reply reader creation failed.");
ret = false;
}
}

// Clean up if something failed.
if (!ret)
{
if (nullptr != builtin_request_writer_history_)
{
delete builtin_request_writer_history_;
builtin_request_writer_history_ = nullptr;
}

if (nullptr != builtin_reply_writer_history_)
{
delete builtin_reply_writer_history_;
builtin_reply_writer_history_ = nullptr;
}

if (nullptr != builtin_request_reader_history_)
{
delete builtin_request_reader_history_;
builtin_request_reader_history_ = nullptr;
}

if (nullptr != builtin_reply_reader_history_)
{
delete builtin_reply_reader_history_;
builtin_reply_reader_history_ = nullptr;
}

if (nullptr != request_listener_)
{
delete request_listener_;
request_listener_ = nullptr;
}
if (nullptr != reply_listener_)
{
delete reply_listener_;
reply_listener_ = nullptr;
return false;
}
}

return true;
return ret;
}

/* TODO Implement if security is needed.
Expand Down Expand Up @@ -558,7 +595,13 @@ bool TypeLookupManager::send_request(
SerializedPayload_t payload;
payload.max_size = change->serializedPayload.max_size - 4;
payload.data = change->serializedPayload.data + 4;
if (valid && request_type_.serialize(&req, &payload))

bool serialize_ret = request_type_.serialize(&req, &payload);
if (!serialize_ret)
{
payload.data = nullptr;
}
else if (valid)
{
change->serializedPayload.length += payload.length;
change->serializedPayload.pos += payload.pos;
Expand Down Expand Up @@ -599,7 +642,13 @@ bool TypeLookupManager::send_reply(
SerializedPayload_t payload;
payload.max_size = change->serializedPayload.max_size - 4;
payload.data = change->serializedPayload.data + 4;
if (valid && reply_type_.serialize(&rep, &payload))

bool serialize_ret = reply_type_.serialize(&rep, &payload);
if (!serialize_ret)
{
payload.data = nullptr;
}
else if (valid)
{
change->serializedPayload.length += payload.length;
change->serializedPayload.pos += payload.pos;
Expand Down Expand Up @@ -684,6 +733,18 @@ const fastrtps::rtps::GUID_t& TypeLookupManager::get_builtin_request_writer_guid
return c_Guid_Unknown;
}

void TypeLookupManager::request_cache_change_acked(
fastrtps::rtps::CacheChange_t* change)
{
builtin_request_writer_history_->remove_change(change);
}

void TypeLookupManager::reply_cache_change_acked(
fastrtps::rtps::CacheChange_t* change)
{
builtin_reply_writer_history_->remove_change(change);
}

} // namespace builtin
} // namespace dds
} // namespace fastdds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ void TypeLookupReplyListener::onNewCacheChangeAdded(
reader->getHistory()->remove_change(change);
}

void TypeLookupReplyListener::onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change)
{
tlm_->reply_cache_change_acked(change);
}

} // namespace builtin
} // namespace dds
} // namespace fastdds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ void TypeLookupRequestListener::onNewCacheChangeAdded(
reader->getHistory()->remove_change(change);
}

void TypeLookupRequestListener::onWriterChangeReceivedByAll(
fastrtps::rtps::RTPSWriter*,
fastrtps::rtps::CacheChange_t* change)
{
tlm_->request_cache_change_acked(change);
}

} // namespace builtin
} // namespace dds
} // namespace fastdds
Expand Down

0 comments on commit 6a45117

Please sign in to comment.