From 6cb5ebb681111a4984499f1f1a772d8c72db27dd Mon Sep 17 00:00:00 2001 From: Irene Bandera Moreno Date: Thu, 22 Feb 2024 18:32:55 +0100 Subject: [PATCH] Add missing TypeLookup listeners (#4335) * Add listener classes to handle cache change acknowledgments Signed-off-by: Irene Bandera * Uncrustify Signed-off-by: Irene Bandera * Fix memory leaks in TypeLookupManager.cpp * Apply changes Signed-off-by: Irene Bandera * Apply changes Signed-off-by: Irene Bandera * Refs #20311: Further refactor TypeLookupManager::create_endpoints Signed-off-by: EduPonz --------- Signed-off-by: Irene Bandera Signed-off-by: EduPonz Co-authored-by: EduPonz --- .../builtin/typelookup/TypeLookupManager.hpp | 6 + .../typelookup/TypeLookupReplyListener.hpp | 17 ++- .../typelookup/TypeLookupRequestListener.hpp | 17 ++- .../builtin/typelookup/TypeLookupManager.cpp | 105 ++++++++++++++---- .../typelookup/TypeLookupReplyListener.cpp | 7 ++ .../typelookup/TypeLookupRequestListener.cpp | 7 ++ 6 files changed, 131 insertions(+), 28 deletions(-) diff --git a/include/fastdds/dds/builtin/typelookup/TypeLookupManager.hpp b/include/fastdds/dds/builtin/typelookup/TypeLookupManager.hpp index 7c955fc95d0..c6465905c80 100644 --- a/include/fastdds/dds/builtin/typelookup/TypeLookupManager.hpp +++ b/include/fastdds/dds/builtin/typelookup/TypeLookupManager.hpp @@ -286,6 +286,12 @@ class TypeLookupManager bool create_secure_endpoints(); #endif */ + + void request_cache_change_acked( + fastrtps::rtps::CacheChange_t* change); + + void reply_cache_change_acked( + fastrtps::rtps::CacheChange_t* change); }; } /* namespace builtin */ diff --git a/include/fastdds/dds/builtin/typelookup/TypeLookupReplyListener.hpp b/include/fastdds/dds/builtin/typelookup/TypeLookupReplyListener.hpp index 3509f6fa1e4..adc4b6c6499 100644 --- a/include/fastdds/dds/builtin/typelookup/TypeLookupReplyListener.hpp +++ b/include/fastdds/dds/builtin/typelookup/TypeLookupReplyListener.hpp @@ -21,6 +21,8 @@ #define TYPELOOKUP_REPLY_LISTENER_HPP_ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #include +#include + namespace eprosima { namespace fastrtps { @@ -47,7 +49,7 @@ class TypeLookupManager; * Class TypeLookupReplyListener that receives the typelookup request messages of remote endpoints. * @ingroup TYPES_MODULE */ -class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener +class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener, public fastrtps::rtps::WriterListener { public: @@ -70,7 +72,16 @@ class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener */ void onNewCacheChangeAdded( fastrtps::rtps::RTPSReader* reader, - const fastrtps::rtps::CacheChange_t* const change) override; + const fastrtps::rtps::CacheChange_t* const change) override; + + /** + * @brief This method is called when all the readers matched with this Writer acknowledge that a cache + * change has been received. + * @param change The cache change + */ + void onWriterChangeReceivedByAll( + fastrtps::rtps::RTPSWriter*, + fastrtps::rtps::CacheChange_t* change) override; private: @@ -85,5 +96,5 @@ class TypeLookupReplyListener : public fastrtps::rtps::ReaderListener } /* namespace dds */ } /* namespace fastdds */ } /* namespace eprosima */ -#endif +#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #endif /* TYPELOOKUP_REPLY_LISTENER_HPP_*/ diff --git a/include/fastdds/dds/builtin/typelookup/TypeLookupRequestListener.hpp b/include/fastdds/dds/builtin/typelookup/TypeLookupRequestListener.hpp index 23ffd24139b..87741fc84c5 100644 --- a/include/fastdds/dds/builtin/typelookup/TypeLookupRequestListener.hpp +++ b/include/fastdds/dds/builtin/typelookup/TypeLookupRequestListener.hpp @@ -21,6 +21,8 @@ #define TYPELOOKUP_REQUEST_LISTENER_HPP_ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #include +#include + namespace eprosima { namespace fastrtps { @@ -47,7 +49,7 @@ class TypeLookupManager; * Class TypeLookupRequestListener that receives the typelookup request messages of remote endpoints. * @ingroup TYPES_MODULE */ -class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener +class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener, public fastrtps::rtps::WriterListener { public: @@ -70,7 +72,16 @@ class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener */ void onNewCacheChangeAdded( fastrtps::rtps::RTPSReader* reader, - const fastrtps::rtps::CacheChange_t* const change) override; + const fastrtps::rtps::CacheChange_t* const change) override; + + /** + * @brief This method is called when all the readers matched with this Writer acknowledge that a cache + * change has been received. + * @param change The cache change + */ + void onWriterChangeReceivedByAll( + fastrtps::rtps::RTPSWriter*, + fastrtps::rtps::CacheChange_t* change) override; private: @@ -86,5 +97,5 @@ class TypeLookupRequestListener : public fastrtps::rtps::ReaderListener } /* namespace dds */ } /* namespace fastdds */ } /* namespace eprosima */ -#endif +#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #endif /* TYPELOOKUP_REQUEST_LISTENER_HPP_*/ diff --git a/src/cpp/fastdds/builtin/typelookup/TypeLookupManager.cpp b/src/cpp/fastdds/builtin/typelookup/TypeLookupManager.cpp index 8ae2c618c54..40ffd150fb2 100644 --- a/src/cpp/fastdds/builtin/typelookup/TypeLookupManager.cpp +++ b/src/cpp/fastdds/builtin/typelookup/TypeLookupManager.cpp @@ -326,6 +326,8 @@ ReaderHistory* TypeLookupManager::get_builtin_reply_reader_history() */ bool TypeLookupManager::create_endpoints() { + bool ret = true; + const RTPSParticipantAttributes& pattr = participant_->getRTPSParticipantAttributes(); // Built-in history attributes. @@ -348,6 +350,7 @@ bool TypeLookupManager::create_endpoints() // Built-in request writer if (builtin_protocols_->m_att.typelookup_config.use_client) { + request_listener_ = new TypeLookupRequestListener(this); builtin_request_writer_history_ = new WriterHistory(hatt); RTPSWriter* req_writer; @@ -355,7 +358,7 @@ bool TypeLookupManager::create_endpoints() &req_writer, watt, builtin_request_writer_history_, - nullptr, + request_listener_, fastrtps::rtps::c_EntityId_TypeLookup_request_writer, true)) { @@ -365,15 +368,14 @@ bool TypeLookupManager::create_endpoints() else { EPROSIMA_LOG_ERROR(TYPELOOKUP_SERVICE, "Typelookup request writer creation failed."); - delete builtin_request_writer_history_; - builtin_request_writer_history_ = nullptr; - return false; + ret = false; } } // Built-in reply writer - if (builtin_protocols_->m_att.typelookup_config.use_server) + if (ret && builtin_protocols_->m_att.typelookup_config.use_server) { + reply_listener_ = new TypeLookupReplyListener(this); builtin_reply_writer_history_ = new WriterHistory(hatt); RTPSWriter* rep_writer; @@ -381,7 +383,7 @@ bool TypeLookupManager::create_endpoints() &rep_writer, watt, builtin_reply_writer_history_, - nullptr, + reply_listener_, fastrtps::rtps::c_EntityId_TypeLookup_reply_writer, true)) { @@ -391,9 +393,7 @@ bool TypeLookupManager::create_endpoints() else { EPROSIMA_LOG_ERROR(TYPELOOKUP_SERVICE, "Typelookup reply writer creation failed."); - delete builtin_reply_writer_history_; - builtin_reply_writer_history_ = nullptr; - return false; + ret = false; } } @@ -410,9 +410,12 @@ bool TypeLookupManager::create_endpoints() ratt.endpoint.durabilityKind = fastrtps::rtps::VOLATILE; // Built-in request reader - if (builtin_protocols_->m_att.typelookup_config.use_server) + if (ret && builtin_protocols_->m_att.typelookup_config.use_server) { - request_listener_ = new TypeLookupRequestListener(this); + if (nullptr == request_listener_) + { + request_listener_ = new TypeLookupRequestListener(this); + } builtin_request_reader_history_ = new ReaderHistory(hatt); RTPSReader* req_reader; @@ -430,18 +433,17 @@ bool TypeLookupManager::create_endpoints() else { EPROSIMA_LOG_ERROR(TYPELOOKUP_SERVICE, "Typelookup request reader creation failed."); - delete builtin_request_reader_history_; - builtin_request_reader_history_ = nullptr; - delete request_listener_; - request_listener_ = nullptr; - return false; + ret = false; } } // Built-in reply reader - if (builtin_protocols_->m_att.typelookup_config.use_client) + if (ret && builtin_protocols_->m_att.typelookup_config.use_client) { - reply_listener_ = new TypeLookupReplyListener(this); + if (nullptr == reply_listener_) + { + reply_listener_ = new TypeLookupReplyListener(this); + } builtin_reply_reader_history_ = new ReaderHistory(hatt); RTPSReader* rep_reader; @@ -459,15 +461,50 @@ bool TypeLookupManager::create_endpoints() else { EPROSIMA_LOG_ERROR(TYPELOOKUP_SERVICE, "Typelookup reply reader creation failed."); + ret = false; + } + } + + // Clean up if something failed. + if (!ret) + { + if (nullptr != builtin_request_writer_history_) + { + delete builtin_request_writer_history_; + builtin_request_writer_history_ = nullptr; + } + + if (nullptr != builtin_reply_writer_history_) + { + delete builtin_reply_writer_history_; + builtin_reply_writer_history_ = nullptr; + } + + if (nullptr != builtin_request_reader_history_) + { + delete builtin_request_reader_history_; + builtin_request_reader_history_ = nullptr; + } + + if (nullptr != builtin_reply_reader_history_) + { delete builtin_reply_reader_history_; builtin_reply_reader_history_ = nullptr; + } + + if (nullptr != request_listener_) + { + delete request_listener_; + request_listener_ = nullptr; + } + if (nullptr != reply_listener_) + { delete reply_listener_; reply_listener_ = nullptr; - return false; } } - return true; + return ret; } /* TODO Implement if security is needed. @@ -567,7 +604,13 @@ bool TypeLookupManager::send_request( SerializedPayload_t payload; payload.max_size = change->serializedPayload.max_size - 4; payload.data = change->serializedPayload.data + 4; - if (valid && request_type_.serialize(&req, &payload, DataRepresentationId_t::XCDR2_DATA_REPRESENTATION)) + + bool serialize_ret = request_type_.serialize(&req, &payload, DataRepresentationId_t::XCDR2_DATA_REPRESENTATION); + if (!serialize_ret) + { + payload.data = nullptr; + } + else if (valid) { change->serializedPayload.length += payload.length; change->serializedPayload.pos += payload.pos; @@ -610,7 +653,13 @@ bool TypeLookupManager::send_reply( SerializedPayload_t payload; payload.max_size = change->serializedPayload.max_size - 4; payload.data = change->serializedPayload.data + 4; - if (valid && reply_type_.serialize(&rep, &payload, DataRepresentationId_t::XCDR2_DATA_REPRESENTATION)) + + bool serialize_ret = reply_type_.serialize(&rep, &payload, DataRepresentationId_t::XCDR2_DATA_REPRESENTATION); + if (!serialize_ret) + { + payload.data = nullptr; + } + else if (valid) { change->serializedPayload.length += payload.length; change->serializedPayload.pos += payload.pos; @@ -695,6 +744,18 @@ const fastrtps::rtps::GUID_t& TypeLookupManager::get_builtin_request_writer_guid return c_Guid_Unknown; } +void TypeLookupManager::request_cache_change_acked( + fastrtps::rtps::CacheChange_t* change) +{ + builtin_request_writer_history_->remove_change(change); +} + +void TypeLookupManager::reply_cache_change_acked( + fastrtps::rtps::CacheChange_t* change) +{ + builtin_reply_writer_history_->remove_change(change); +} + } // namespace builtin } // namespace dds } // namespace fastdds diff --git a/src/cpp/fastdds/builtin/typelookup/TypeLookupReplyListener.cpp b/src/cpp/fastdds/builtin/typelookup/TypeLookupReplyListener.cpp index 7afc3f194ce..3efe307b383 100644 --- a/src/cpp/fastdds/builtin/typelookup/TypeLookupReplyListener.cpp +++ b/src/cpp/fastdds/builtin/typelookup/TypeLookupReplyListener.cpp @@ -116,6 +116,13 @@ void TypeLookupReplyListener::onNewCacheChangeAdded( reader->getHistory()->remove_change(change); } +void TypeLookupReplyListener::onWriterChangeReceivedByAll( + fastrtps::rtps::RTPSWriter*, + fastrtps::rtps::CacheChange_t* change) +{ + tlm_->reply_cache_change_acked(change); +} + } // namespace builtin } // namespace dds } // namespace fastdds diff --git a/src/cpp/fastdds/builtin/typelookup/TypeLookupRequestListener.cpp b/src/cpp/fastdds/builtin/typelookup/TypeLookupRequestListener.cpp index 835eae0b350..f2ccfe37cc8 100644 --- a/src/cpp/fastdds/builtin/typelookup/TypeLookupRequestListener.cpp +++ b/src/cpp/fastdds/builtin/typelookup/TypeLookupRequestListener.cpp @@ -150,6 +150,13 @@ void TypeLookupRequestListener::onNewCacheChangeAdded( reader->getHistory()->remove_change(change); } +void TypeLookupRequestListener::onWriterChangeReceivedByAll( + fastrtps::rtps::RTPSWriter*, + fastrtps::rtps::CacheChange_t* change) +{ + tlm_->request_cache_change_acked(change); +} + } // namespace builtin } // namespace dds } // namespace fastdds