Skip to content

Commit

Permalink
Fix ABBA deadlock in flow controller core (#2175)
Browse files Browse the repository at this point in the history
* Refs #12393. Fix deadlock in flow controller core

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>

* Refs #12394. Fix compilation on tests

Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Apply suggestion

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
  • Loading branch information
richiware and MiguelCompany authored Sep 14, 2021
1 parent c97053e commit c2c3486
Show file tree
Hide file tree
Showing 20 changed files with 129 additions and 29 deletions.
6 changes: 3 additions & 3 deletions include/fastdds/rtps/messages/RTPSMessageGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class RTPSMessageGroup
RTPSMessageGroup(
RTPSParticipantImpl* participant,
Endpoint* endpoint,
const RTPSMessageSenderInterface* msg_sender,
RTPSMessageSenderInterface* msg_sender,
std::chrono::steady_clock::time_point max_blocking_time_point =
std::chrono::steady_clock::now() + std::chrono::hours(24));

Expand Down Expand Up @@ -217,7 +217,7 @@ class RTPSMessageGroup
*/
void sender(
Endpoint* endpoint,
const RTPSMessageSenderInterface* msg_sender)
RTPSMessageSenderInterface* msg_sender)
{
assert((endpoint != nullptr && msg_sender != nullptr) || (endpoint == nullptr && msg_sender == nullptr));
if (endpoint != endpoint_ || msg_sender != sender_)
Expand Down Expand Up @@ -293,7 +293,7 @@ class RTPSMessageGroup
const SequenceNumberSet_t& gap_bitmap,
const EntityId_t& reader_id);

const RTPSMessageSenderInterface* sender_ = nullptr;
RTPSMessageSenderInterface* sender_ = nullptr;

Endpoint* endpoint_ = nullptr;

Expand Down
12 changes: 12 additions & 0 deletions include/fastdds/rtps/messages/RTPSMessageSenderInterface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ class RTPSMessageSenderInterface
virtual bool send(
CDRMessage_t* message,
std::chrono::steady_clock::time_point max_blocking_time_point) const = 0;

/*!
* Lock the object.
*/
virtual void lock() = 0;

/*!
* Lock the object.
*/
virtual void unlock() = 0;


};

} /* namespace rtps */
Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/rtps/reader/RTPSReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ class RTPSReader
*/
virtual void change_read_by_user(
CacheChange_t* change,
const WriterProxy* writer,
WriterProxy* writer,
bool mark_as_read = true) = 0;

/**
Expand Down
6 changes: 3 additions & 3 deletions include/fastdds/rtps/reader/StatefulReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class StatefulReader : public RTPSReader
void send_acknack(
const WriterProxy* writer,
const SequenceNumberSet_t& sns,
const RTPSMessageSenderInterface* sender,
RTPSMessageSenderInterface* sender,
bool is_final);

/**
Expand All @@ -257,7 +257,7 @@ class StatefulReader : public RTPSReader
*/
void send_acknack(
const WriterProxy* writer,
const RTPSMessageSenderInterface* sender,
RTPSMessageSenderInterface* sender,
bool heartbeat_was_final);

/**
Expand Down Expand Up @@ -313,7 +313,7 @@ class StatefulReader : public RTPSReader
*/
void change_read_by_user(
CacheChange_t* change,
const WriterProxy* writer,
WriterProxy* writer,
bool mark_as_read = true) override;

private:
Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/rtps/reader/StatelessReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class StatelessReader : public RTPSReader
*/
void change_read_by_user(
CacheChange_t* change,
const WriterProxy* writer,
WriterProxy* writer,
bool mark_as_read = true) override;

private:
Expand Down
28 changes: 25 additions & 3 deletions include/fastdds/rtps/writer/LocatorSelectorSender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface
RTPSWriter& writer,
ResourceLimitedContainerConfig matched_readers_allocation
)
: writer(writer)
, locator_selector(matched_readers_allocation)
: locator_selector(matched_readers_allocation)
, all_remote_readers(matched_readers_allocation)
, all_remote_participants(matched_readers_allocation)
, writer_(writer)
{
}

Expand Down Expand Up @@ -75,13 +75,35 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface
CDRMessage_t* message,
std::chrono::steady_clock::time_point max_blocking_time_point) const override;

RTPSWriter& writer;
/*!
* Lock the object.
*
* This kind of object needs to be locked because could be used outside the writer's mutex.
*/
void lock() override
{
mutex_.lock();
}

/*!
* Unlock the object.
*/
void unlock() override
{
mutex_.unlock();
}

fastrtps::rtps::LocatorSelector locator_selector;

ResourceLimitedVector<GUID_t> all_remote_readers;

ResourceLimitedVector<GuidPrefix_t> all_remote_participants;

private:

RTPSWriter& writer_;

std::recursive_mutex mutex_;
};

} // namespace rtps
Expand Down
16 changes: 16 additions & 0 deletions include/fastdds/rtps/writer/ReaderLocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ class ReaderLocator : public RTPSMessageSenderInterface

}

/*
* Do nothing.
* This object always is protected by writer's mutex.
*/
void lock() override
{
}

/*
* Do nothing.
* This object always is protected by writer's mutex.
*/
void unlock() override
{
}

private:

RTPSWriter* owner_;
Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class ReaderProxy
return locator_info_.locator_selector_entry();
}

const RTPSMessageSenderInterface* message_sender() const
RTPSMessageSenderInterface* message_sender()
{
return &locator_info_;
}
Expand Down
14 changes: 14 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/DirectMessageSender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ class DirectMessageSender : public RTPSMessageSenderInterface
CDRMessage_t* message,
std::chrono::steady_clock::time_point max_blocking_time_point) const override;

/*
* Do nothing.
*/
void lock() override
{
}

/*
* Do nothing.
*/
void unlock() override
{
}

private:

RTPSParticipantImpl* participant_;
Expand Down
4 changes: 4 additions & 0 deletions src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,7 @@ class FlowControllerImpl : public FlowController
{
// This call should be made with writer's mutex locked.
fastrtps::rtps::LocatorSelectorSender& locator_selector = writer->get_general_locator_selector();
std::lock_guard<fastrtps::rtps::LocatorSelectorSender> lock(locator_selector);
fastrtps::rtps::RTPSMessageGroup group(participant_, writer, &locator_selector);
if (fastrtps::rtps::DeliveryRetCode::DELIVERED !=
writer->deliver_sample_nts(change, group, locator_selector, max_blocking_time))
Expand Down Expand Up @@ -1324,6 +1325,7 @@ class FlowControllerImpl : public FlowController

fastrtps::rtps::LocatorSelectorSender& locator_selector =
current_writer->get_async_locator_selector();
locator_selector.lock();
async_mode.group.sender(current_writer, &locator_selector);

// Remove previously from queue, because deliver_sample_nts could call FlowController::remove_sample()
Expand All @@ -1349,11 +1351,13 @@ class FlowControllerImpl : public FlowController

async_mode.process_deliver_retcode(ret_delivery);

locator_selector.unlock();
current_writer->getMutex().unlock();
// Unlock mutex_ and try again.
break;
}

locator_selector.unlock();
current_writer->getMutex().unlock();

sched.work_done();
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/messages/RTPSMessageGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ RTPSMessageGroup::RTPSMessageGroup(
RTPSMessageGroup::RTPSMessageGroup(
RTPSParticipantImpl* participant,
Endpoint* endpoint,
const RTPSMessageSenderInterface* msg_sender,
RTPSMessageSenderInterface* msg_sender,
std::chrono::steady_clock::time_point max_blocking_time_point)
: RTPSMessageGroup(participant)
{
Expand Down Expand Up @@ -236,7 +236,7 @@ void RTPSMessageGroup::send()

if (full_msg_->length > RTPSMESSAGE_HEADER_SIZE)
{
std::unique_lock<RecursiveTimedMutex> lock(endpoint_->getMutex());
std::lock_guard<RTPSMessageSenderInterface> lock(*sender_);

#if HAVE_SECURITY
// TODO(Ricardo) Control message size if it will be encrypted.
Expand Down
6 changes: 3 additions & 3 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ void StatefulReader::end_sample_access_nts(

void StatefulReader::change_read_by_user(
CacheChange_t* change,
const WriterProxy* writer,
WriterProxy* writer,
bool mark_as_read)
{
assert(writer != nullptr);
Expand Down Expand Up @@ -1177,7 +1177,7 @@ void StatefulReader::change_read_by_user(
void StatefulReader::send_acknack(
const WriterProxy* writer,
const SequenceNumberSet_t& sns,
const RTPSMessageSenderInterface* sender,
RTPSMessageSenderInterface* sender,
bool is_final)
{

Expand All @@ -1204,7 +1204,7 @@ void StatefulReader::send_acknack(

void StatefulReader::send_acknack(
const WriterProxy* writer,
const RTPSMessageSenderInterface* sender,
RTPSMessageSenderInterface* sender,
bool heartbeat_was_final)
{
// Protect reader
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ void StatelessReader::end_sample_access_nts(

void StatelessReader::change_read_by_user(
CacheChange_t* change,
const WriterProxy* /*writer*/,
WriterProxy* /*writer*/,
bool mark_as_read)
{
// Mark change as read
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/reader/WriterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ SequenceNumber_t WriterProxy::next_cache_change_to_be_notified()
return SequenceNumber_t::unknown();
}

void WriterProxy::perform_initial_ack_nack() const
void WriterProxy::perform_initial_ack_nack()
{
// Send initial NACK.
SequenceNumberSet_t sns(SequenceNumber_t(0, 0));
Expand All @@ -506,7 +506,7 @@ void WriterProxy::perform_initial_ack_nack() const
}
}

void WriterProxy::perform_heartbeat_response() const
void WriterProxy::perform_heartbeat_response()
{
reader_->send_acknack(this, this, heartbeat_final_flag_.load());
}
Expand Down
20 changes: 18 additions & 2 deletions src/cpp/rtps/reader/WriterProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,12 @@ class WriterProxy : public RTPSMessageSenderInterface
/**
* Sends a preemptive acknack to the writer represented by this proxy.
*/
void perform_initial_ack_nack() const;
void perform_initial_ack_nack();

/**
* Sends the necessary acknac and nackfrag messages to answer the last received heartbeat message.
*/
void perform_heartbeat_response() const;
void perform_heartbeat_response();

/**
* Process an incoming heartbeat from the writer represented by this proxy.
Expand Down Expand Up @@ -337,6 +337,22 @@ class WriterProxy : public RTPSMessageSenderInterface
return is_datasharing_writer_;
}

/*
* Do nothing.
* This object always is protected by reader's mutex.
*/
void lock() override
{
}

/*
* Do nothing.
* This object always is protected by reader's mutex.
*/
void unlock() override
{
}

private:

/**
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/writer/LocatorSelectorSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ bool LocatorSelectorSender::send(
CDRMessage_t* message,
std::chrono::steady_clock::time_point max_blocking_time_point) const
{
return writer.send_nts(message, *this, max_blocking_time_point);
return writer_.send_nts(message, *this, max_blocking_time_point);
}

} // namespace rtps
Expand Down
10 changes: 7 additions & 3 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ void StatefulWriter::deliver_sample_to_datasharing(
DeliveryRetCode StatefulWriter::deliver_sample_to_network(
CacheChange_t* change,
RTPSMessageGroup& group,
LocatorSelectorSender& locator_selector,
LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
{
NetworkFactory& network = mp_RTPSParticipant->network_factory();
Expand Down Expand Up @@ -980,6 +980,8 @@ bool StatefulWriter::matched_reader_add(
}

std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);

// Check if it is already matched.
if (for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
Expand Down Expand Up @@ -1153,6 +1155,8 @@ bool StatefulWriter::matched_reader_remove(
{
ReaderProxy* rproxy = nullptr;
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);

for (ReaderProxyIterator it = matched_local_readers_.begin();
it != matched_local_readers_.end(); ++it)
Expand Down Expand Up @@ -1211,7 +1215,6 @@ bool StatefulWriter::matched_reader_remove(
rproxy->stop();
matched_readers_pool_.push_back(rproxy);

lock.unlock();
check_acked_status();

return true;
Expand Down Expand Up @@ -1550,6 +1553,7 @@ bool StatefulWriter::send_periodic_heartbeat(
bool liveliness)
{
std::lock_guard<RecursiveTimedMutex> guardW(mp_mutex);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);

bool unacked_changes = false;
if (!liveliness)
Expand Down Expand Up @@ -1985,7 +1989,7 @@ const fastdds::rtps::IReaderDataFilter* StatefulWriter::reader_data_filter() con
DeliveryRetCode StatefulWriter::deliver_sample_nts(
CacheChange_t* cache_change,
RTPSMessageGroup& group,
LocatorSelectorSender& locator_selector,
LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
{
DeliveryRetCode ret_code = DeliveryRetCode::DELIVERED;
Expand Down
Loading

0 comments on commit c2c3486

Please sign in to comment.