From 221f79387bccedc814bb3b3144141e11decb55f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez?= Date: Tue, 14 Sep 2021 07:38:52 +0200 Subject: [PATCH] Fix ABBA deadlock in flow controller core (#2175) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refs #12393. Fix deadlock in flow controller core Signed-off-by: Ricardo González Moreno * Refs #12394. Fix compilation on tests Signed-off-by: Ricardo González * Apply suggestion Co-authored-by: Miguel Company Co-authored-by: Miguel Company Signed-off-by: Samuel Wilhelmsson --- .../fastdds/rtps/messages/RTPSMessageGroup.h | 6 ++-- .../messages/RTPSMessageSenderInterface.hpp | 12 ++++++++ include/fastdds/rtps/reader/RTPSReader.h | 2 +- include/fastdds/rtps/reader/StatefulReader.h | 6 ++-- include/fastdds/rtps/reader/StatelessReader.h | 2 +- .../rtps/writer/LocatorSelectorSender.hpp | 28 +++++++++++++++++-- include/fastdds/rtps/writer/ReaderLocator.h | 16 +++++++++++ include/fastdds/rtps/writer/ReaderProxy.h | 2 +- .../participant/DirectMessageSender.hpp | 14 ++++++++++ .../rtps/flowcontrol/FlowControllerImpl.hpp | 4 +++ src/cpp/rtps/messages/RTPSMessageGroup.cpp | 4 +-- src/cpp/rtps/reader/StatefulReader.cpp | 6 ++-- src/cpp/rtps/reader/StatelessReader.cpp | 2 +- src/cpp/rtps/reader/WriterProxy.cpp | 4 +-- src/cpp/rtps/reader/WriterProxy.h | 20 +++++++++++-- src/cpp/rtps/writer/LocatorSelectorSender.cpp | 2 +- src/cpp/rtps/writer/StatefulWriter.cpp | 10 +++++-- src/cpp/rtps/writer/StatelessWriter.cpp | 4 ++- .../fastdds/rtps/writer/RTPSWriter.h | 6 ++-- .../fastdds/rtps/writer/ReaderLocator.h | 8 ++++++ 20 files changed, 129 insertions(+), 29 deletions(-) diff --git a/include/fastdds/rtps/messages/RTPSMessageGroup.h b/include/fastdds/rtps/messages/RTPSMessageGroup.h index 982fb0ecded..6b7c97d5e96 100644 --- a/include/fastdds/rtps/messages/RTPSMessageGroup.h +++ b/include/fastdds/rtps/messages/RTPSMessageGroup.h @@ -100,7 +100,7 @@ class RTPSMessageGroup RTPSMessageGroup( RTPSParticipantImpl* participant, Endpoint* endpoint, - const RTPSMessageSenderInterface* msg_sender, + RTPSMessageSenderInterface* msg_sender, std::chrono::steady_clock::time_point max_blocking_time_point = std::chrono::steady_clock::now() + std::chrono::hours(24)); @@ -217,7 +217,7 @@ class RTPSMessageGroup */ void sender( Endpoint* endpoint, - const RTPSMessageSenderInterface* msg_sender) + RTPSMessageSenderInterface* msg_sender) { assert((endpoint != nullptr && msg_sender != nullptr) || (endpoint == nullptr && msg_sender == nullptr)); if (endpoint != endpoint_ || msg_sender != sender_) @@ -293,7 +293,7 @@ class RTPSMessageGroup const SequenceNumberSet_t& gap_bitmap, const EntityId_t& reader_id); - const RTPSMessageSenderInterface* sender_ = nullptr; + RTPSMessageSenderInterface* sender_ = nullptr; Endpoint* endpoint_ = nullptr; diff --git a/include/fastdds/rtps/messages/RTPSMessageSenderInterface.hpp b/include/fastdds/rtps/messages/RTPSMessageSenderInterface.hpp index ccf252380d7..219594bd037 100644 --- a/include/fastdds/rtps/messages/RTPSMessageSenderInterface.hpp +++ b/include/fastdds/rtps/messages/RTPSMessageSenderInterface.hpp @@ -80,6 +80,18 @@ class RTPSMessageSenderInterface virtual bool send( CDRMessage_t* message, std::chrono::steady_clock::time_point max_blocking_time_point) const = 0; + + /*! + * Lock the object. + */ + virtual void lock() = 0; + + /*! + * Lock the object. + */ + virtual void unlock() = 0; + + }; } /* namespace rtps */ diff --git a/include/fastdds/rtps/reader/RTPSReader.h b/include/fastdds/rtps/reader/RTPSReader.h index f56822e3acf..dbaffbb67c8 100644 --- a/include/fastdds/rtps/reader/RTPSReader.h +++ b/include/fastdds/rtps/reader/RTPSReader.h @@ -315,7 +315,7 @@ class RTPSReader */ virtual void change_read_by_user( CacheChange_t* change, - const WriterProxy* writer, + WriterProxy* writer, bool mark_as_read = true) = 0; /** diff --git a/include/fastdds/rtps/reader/StatefulReader.h b/include/fastdds/rtps/reader/StatefulReader.h index 500125be836..36e1a626988 100644 --- a/include/fastdds/rtps/reader/StatefulReader.h +++ b/include/fastdds/rtps/reader/StatefulReader.h @@ -246,7 +246,7 @@ class StatefulReader : public RTPSReader void send_acknack( const WriterProxy* writer, const SequenceNumberSet_t& sns, - const RTPSMessageSenderInterface* sender, + RTPSMessageSenderInterface* sender, bool is_final); /** @@ -257,7 +257,7 @@ class StatefulReader : public RTPSReader */ void send_acknack( const WriterProxy* writer, - const RTPSMessageSenderInterface* sender, + RTPSMessageSenderInterface* sender, bool heartbeat_was_final); /** @@ -313,7 +313,7 @@ class StatefulReader : public RTPSReader */ void change_read_by_user( CacheChange_t* change, - const WriterProxy* writer, + WriterProxy* writer, bool mark_as_read = true) override; private: diff --git a/include/fastdds/rtps/reader/StatelessReader.h b/include/fastdds/rtps/reader/StatelessReader.h index f6079d4c03a..b0a56a4ad53 100644 --- a/include/fastdds/rtps/reader/StatelessReader.h +++ b/include/fastdds/rtps/reader/StatelessReader.h @@ -247,7 +247,7 @@ class StatelessReader : public RTPSReader */ void change_read_by_user( CacheChange_t* change, - const WriterProxy* writer, + WriterProxy* writer, bool mark_as_read = true) override; private: diff --git a/include/fastdds/rtps/writer/LocatorSelectorSender.hpp b/include/fastdds/rtps/writer/LocatorSelectorSender.hpp index 83085b7c97e..63949a80d62 100644 --- a/include/fastdds/rtps/writer/LocatorSelectorSender.hpp +++ b/include/fastdds/rtps/writer/LocatorSelectorSender.hpp @@ -23,10 +23,10 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface RTPSWriter& writer, ResourceLimitedContainerConfig matched_readers_allocation ) - : writer(writer) - , locator_selector(matched_readers_allocation) + : locator_selector(matched_readers_allocation) , all_remote_readers(matched_readers_allocation) , all_remote_participants(matched_readers_allocation) + , writer_(writer) { } @@ -75,13 +75,35 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface CDRMessage_t* message, std::chrono::steady_clock::time_point max_blocking_time_point) const override; - RTPSWriter& writer; + /*! + * Lock the object. + * + * This kind of object needs to be locked because could be used outside the writer's mutex. + */ + void lock() override + { + mutex_.lock(); + } + + /*! + * Unlock the object. + */ + void unlock() override + { + mutex_.unlock(); + } fastrtps::rtps::LocatorSelector locator_selector; ResourceLimitedVector all_remote_readers; ResourceLimitedVector all_remote_participants; + +private: + + RTPSWriter& writer_; + + std::recursive_mutex mutex_; }; } // namespace rtps diff --git a/include/fastdds/rtps/writer/ReaderLocator.h b/include/fastdds/rtps/writer/ReaderLocator.h index c192d6f6ac1..bfead3e3e44 100644 --- a/include/fastdds/rtps/writer/ReaderLocator.h +++ b/include/fastdds/rtps/writer/ReaderLocator.h @@ -231,6 +231,22 @@ class ReaderLocator : public RTPSMessageSenderInterface } + /* + * Do nothing. + * This object always is protected by writer's mutex. + */ + void lock() override + { + } + + /* + * Do nothing. + * This object always is protected by writer's mutex. + */ + void unlock() override + { + } + private: RTPSWriter* owner_; diff --git a/include/fastdds/rtps/writer/ReaderProxy.h b/include/fastdds/rtps/writer/ReaderProxy.h index 509ca9d2896..ee32f7f5c94 100644 --- a/include/fastdds/rtps/writer/ReaderProxy.h +++ b/include/fastdds/rtps/writer/ReaderProxy.h @@ -351,7 +351,7 @@ class ReaderProxy return locator_info_.locator_selector_entry(); } - const RTPSMessageSenderInterface* message_sender() const + RTPSMessageSenderInterface* message_sender() { return &locator_info_; } diff --git a/src/cpp/rtps/builtin/discovery/participant/DirectMessageSender.hpp b/src/cpp/rtps/builtin/discovery/participant/DirectMessageSender.hpp index 7d1334f780f..16326afcd9a 100644 --- a/src/cpp/rtps/builtin/discovery/participant/DirectMessageSender.hpp +++ b/src/cpp/rtps/builtin/discovery/participant/DirectMessageSender.hpp @@ -85,6 +85,20 @@ class DirectMessageSender : public RTPSMessageSenderInterface CDRMessage_t* message, std::chrono::steady_clock::time_point max_blocking_time_point) const override; + /* + * Do nothing. + */ + void lock() override + { + } + + /* + * Do nothing. + */ + void unlock() override + { + } + private: RTPSParticipantImpl* participant_; diff --git a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp index 05d07d09182..e92dee4adca 100644 --- a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp +++ b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp @@ -1141,6 +1141,7 @@ class FlowControllerImpl : public FlowController { // This call should be made with writer's mutex locked. fastrtps::rtps::LocatorSelectorSender& locator_selector = writer->get_general_locator_selector(); + std::lock_guard lock(locator_selector); fastrtps::rtps::RTPSMessageGroup group(participant_, writer, &locator_selector); if (fastrtps::rtps::DeliveryRetCode::DELIVERED != writer->deliver_sample_nts(change, group, locator_selector, max_blocking_time)) @@ -1324,6 +1325,7 @@ class FlowControllerImpl : public FlowController fastrtps::rtps::LocatorSelectorSender& locator_selector = current_writer->get_async_locator_selector(); + locator_selector.lock(); async_mode.group.sender(current_writer, &locator_selector); // Remove previously from queue, because deliver_sample_nts could call FlowController::remove_sample() @@ -1349,11 +1351,13 @@ class FlowControllerImpl : public FlowController async_mode.process_deliver_retcode(ret_delivery); + locator_selector.unlock(); current_writer->getMutex().unlock(); // Unlock mutex_ and try again. break; } + locator_selector.unlock(); current_writer->getMutex().unlock(); sched.work_done(); diff --git a/src/cpp/rtps/messages/RTPSMessageGroup.cpp b/src/cpp/rtps/messages/RTPSMessageGroup.cpp index bff24a829af..2c7542b3627 100644 --- a/src/cpp/rtps/messages/RTPSMessageGroup.cpp +++ b/src/cpp/rtps/messages/RTPSMessageGroup.cpp @@ -180,7 +180,7 @@ RTPSMessageGroup::RTPSMessageGroup( RTPSMessageGroup::RTPSMessageGroup( RTPSParticipantImpl* participant, Endpoint* endpoint, - const RTPSMessageSenderInterface* msg_sender, + RTPSMessageSenderInterface* msg_sender, std::chrono::steady_clock::time_point max_blocking_time_point) : RTPSMessageGroup(participant) { @@ -236,7 +236,7 @@ void RTPSMessageGroup::send() if (full_msg_->length > RTPSMESSAGE_HEADER_SIZE) { - std::unique_lock lock(endpoint_->getMutex()); + std::lock_guard lock(*sender_); #if HAVE_SECURITY // TODO(Ricardo) Control message size if it will be encrypted. diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index b345c756737..ffd6d9ed2c4 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -1123,7 +1123,7 @@ void StatefulReader::end_sample_access_nts( void StatefulReader::change_read_by_user( CacheChange_t* change, - const WriterProxy* writer, + WriterProxy* writer, bool mark_as_read) { assert(writer != nullptr); @@ -1177,7 +1177,7 @@ void StatefulReader::change_read_by_user( void StatefulReader::send_acknack( const WriterProxy* writer, const SequenceNumberSet_t& sns, - const RTPSMessageSenderInterface* sender, + RTPSMessageSenderInterface* sender, bool is_final) { @@ -1204,7 +1204,7 @@ void StatefulReader::send_acknack( void StatefulReader::send_acknack( const WriterProxy* writer, - const RTPSMessageSenderInterface* sender, + RTPSMessageSenderInterface* sender, bool heartbeat_was_final) { // Protect reader diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index d958e434ef9..5e74cc9abe6 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -372,7 +372,7 @@ void StatelessReader::end_sample_access_nts( void StatelessReader::change_read_by_user( CacheChange_t* change, - const WriterProxy* /*writer*/, + WriterProxy* /*writer*/, bool mark_as_read) { // Mark change as read diff --git a/src/cpp/rtps/reader/WriterProxy.cpp b/src/cpp/rtps/reader/WriterProxy.cpp index 499b59333c1..e500780c7a2 100644 --- a/src/cpp/rtps/reader/WriterProxy.cpp +++ b/src/cpp/rtps/reader/WriterProxy.cpp @@ -487,7 +487,7 @@ SequenceNumber_t WriterProxy::next_cache_change_to_be_notified() return SequenceNumber_t::unknown(); } -void WriterProxy::perform_initial_ack_nack() const +void WriterProxy::perform_initial_ack_nack() { // Send initial NACK. SequenceNumberSet_t sns(SequenceNumber_t(0, 0)); @@ -506,7 +506,7 @@ void WriterProxy::perform_initial_ack_nack() const } } -void WriterProxy::perform_heartbeat_response() const +void WriterProxy::perform_heartbeat_response() { reader_->send_acknack(this, this, heartbeat_final_flag_.load()); } diff --git a/src/cpp/rtps/reader/WriterProxy.h b/src/cpp/rtps/reader/WriterProxy.h index 0b43e339e84..c7374bf776b 100644 --- a/src/cpp/rtps/reader/WriterProxy.h +++ b/src/cpp/rtps/reader/WriterProxy.h @@ -241,12 +241,12 @@ class WriterProxy : public RTPSMessageSenderInterface /** * Sends a preemptive acknack to the writer represented by this proxy. */ - void perform_initial_ack_nack() const; + void perform_initial_ack_nack(); /** * Sends the necessary acknac and nackfrag messages to answer the last received heartbeat message. */ - void perform_heartbeat_response() const; + void perform_heartbeat_response(); /** * Process an incoming heartbeat from the writer represented by this proxy. @@ -337,6 +337,22 @@ class WriterProxy : public RTPSMessageSenderInterface return is_datasharing_writer_; } + /* + * Do nothing. + * This object always is protected by reader's mutex. + */ + void lock() override + { + } + + /* + * Do nothing. + * This object always is protected by reader's mutex. + */ + void unlock() override + { + } + private: /** diff --git a/src/cpp/rtps/writer/LocatorSelectorSender.cpp b/src/cpp/rtps/writer/LocatorSelectorSender.cpp index 246fec8f089..9ef1d310eac 100644 --- a/src/cpp/rtps/writer/LocatorSelectorSender.cpp +++ b/src/cpp/rtps/writer/LocatorSelectorSender.cpp @@ -9,7 +9,7 @@ bool LocatorSelectorSender::send( CDRMessage_t* message, std::chrono::steady_clock::time_point max_blocking_time_point) const { - return writer.send_nts(message, *this, max_blocking_time_point); + return writer_.send_nts(message, *this, max_blocking_time_point); } } // namespace rtps diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index c0eaa18f90c..c2f21639f76 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -692,7 +692,7 @@ void StatefulWriter::deliver_sample_to_datasharing( DeliveryRetCode StatefulWriter::deliver_sample_to_network( CacheChange_t* change, RTPSMessageGroup& group, - LocatorSelectorSender& locator_selector, + LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl const std::chrono::time_point& max_blocking_time) { NetworkFactory& network = mp_RTPSParticipant->network_factory(); @@ -980,6 +980,8 @@ bool StatefulWriter::matched_reader_add( } std::lock_guard guard(mp_mutex); + std::lock_guard guard_locator_selector_general(locator_selector_general_); + std::lock_guard guard_locator_selector_async(locator_selector_async_); // Check if it is already matched. if (for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, @@ -1153,6 +1155,8 @@ bool StatefulWriter::matched_reader_remove( { ReaderProxy* rproxy = nullptr; std::unique_lock lock(mp_mutex); + std::lock_guard guard_locator_selector_general(locator_selector_general_); + std::lock_guard guard_locator_selector_async(locator_selector_async_); for (ReaderProxyIterator it = matched_local_readers_.begin(); it != matched_local_readers_.end(); ++it) @@ -1211,7 +1215,6 @@ bool StatefulWriter::matched_reader_remove( rproxy->stop(); matched_readers_pool_.push_back(rproxy); - lock.unlock(); check_acked_status(); return true; @@ -1550,6 +1553,7 @@ bool StatefulWriter::send_periodic_heartbeat( bool liveliness) { std::lock_guard guardW(mp_mutex); + std::lock_guard guard_locator_selector_general(locator_selector_general_); bool unacked_changes = false; if (!liveliness) @@ -1985,7 +1989,7 @@ const fastdds::rtps::IReaderDataFilter* StatefulWriter::reader_data_filter() con DeliveryRetCode StatefulWriter::deliver_sample_nts( CacheChange_t* cache_change, RTPSMessageGroup& group, - LocatorSelectorSender& locator_selector, + LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl const std::chrono::time_point& max_blocking_time) { DeliveryRetCode ret_code = DeliveryRetCode::DELIVERED; diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 83bacdda18c..d7b787fbc14 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -414,6 +414,7 @@ bool StatelessWriter::matched_reader_add( const ReaderProxyData& data) { std::lock_guard guard(mp_mutex); + std::lock_guard locator_selector_guard(locator_selector_); assert(data.guid() != c_Guid_Unknown); @@ -522,6 +523,7 @@ bool StatelessWriter::matched_reader_remove( const GUID_t& reader_guid) { std::lock_guard guard(mp_mutex); + std::lock_guard locator_selector_guard(locator_selector_); if (locator_selector_.locator_selector.remove_entry(reader_guid)) { @@ -619,7 +621,7 @@ bool StatelessWriter::send_nts( DeliveryRetCode StatelessWriter::deliver_sample_nts( CacheChange_t* cache_change, RTPSMessageGroup& group, - LocatorSelectorSender& locator_selector, + LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl const std::chrono::time_point& /*TODO max_blocking_time*/) { size_t num_locators = locator_selector.locator_selector.selected_size() + fixed_locators_.size(); diff --git a/test/mock/rtps/RTPSWriter/fastdds/rtps/writer/RTPSWriter.h b/test/mock/rtps/RTPSWriter/fastdds/rtps/writer/RTPSWriter.h index 7cb78d525d7..58b02f01d58 100644 --- a/test/mock/rtps/RTPSWriter/fastdds/rtps/writer/RTPSWriter.h +++ b/test/mock/rtps/RTPSWriter/fastdds/rtps/writer/RTPSWriter.h @@ -42,6 +42,8 @@ class RTPSWriter : public Endpoint public: RTPSWriter() + : general_locator_selector_(*this, ResourceLimitedContainerConfig()) + , async_locator_selector_(*this, ResourceLimitedContainerConfig()) { static uint8_t entity_id = 0; // Generate a guid. @@ -247,9 +249,9 @@ class RTPSWriter : public Endpoint LivelinessLostStatus liveliness_lost_status_; - LocatorSelectorSender general_locator_selector_ = LocatorSelectorSender(*this, ResourceLimitedContainerConfig()); + LocatorSelectorSender general_locator_selector_; - LocatorSelectorSender async_locator_selector_ = LocatorSelectorSender(*this, ResourceLimitedContainerConfig()); + LocatorSelectorSender async_locator_selector_; }; } // namespace rtps diff --git a/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h b/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h index f99aa4acd33..544f756c246 100644 --- a/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h +++ b/test/mock/rtps/ReaderLocator/fastdds/rtps/writer/ReaderLocator.h @@ -233,6 +233,14 @@ class ReaderLocator : public RTPSMessageSenderInterface return 0; } + void lock() override + { + } + + void unlock() override + { + } + private: GUID_t remote_guid_;