Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ABBA deadlock in flow controller core [12394] #2175

Merged
merged 3 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/fastdds/rtps/messages/RTPSMessageGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class RTPSMessageGroup
RTPSMessageGroup(
RTPSParticipantImpl* participant,
Endpoint* endpoint,
const RTPSMessageSenderInterface* msg_sender,
RTPSMessageSenderInterface* msg_sender,
std::chrono::steady_clock::time_point max_blocking_time_point =
std::chrono::steady_clock::now() + std::chrono::hours(24));

Expand Down Expand Up @@ -217,7 +217,7 @@ class RTPSMessageGroup
*/
void sender(
Endpoint* endpoint,
const RTPSMessageSenderInterface* msg_sender)
RTPSMessageSenderInterface* msg_sender)
{
assert((endpoint != nullptr && msg_sender != nullptr) || (endpoint == nullptr && msg_sender == nullptr));
if (endpoint != endpoint_ || msg_sender != sender_)
Expand Down Expand Up @@ -293,7 +293,7 @@ class RTPSMessageGroup
const SequenceNumberSet_t& gap_bitmap,
const EntityId_t& reader_id);

const RTPSMessageSenderInterface* sender_ = nullptr;
RTPSMessageSenderInterface* sender_ = nullptr;

Endpoint* endpoint_ = nullptr;

Expand Down
12 changes: 12 additions & 0 deletions include/fastdds/rtps/messages/RTPSMessageSenderInterface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ class RTPSMessageSenderInterface
virtual bool send(
CDRMessage_t* message,
std::chrono::steady_clock::time_point max_blocking_time_point) const = 0;

/*!
* Lock the object.
*/
virtual void lock() = 0;

/*!
* Lock the object.
*/
virtual void unlock() = 0;


};

} /* namespace rtps */
Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/rtps/reader/RTPSReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ class RTPSReader
*/
virtual void change_read_by_user(
CacheChange_t* change,
const WriterProxy* writer,
WriterProxy* writer,
bool mark_as_read = true) = 0;

/**
Expand Down
6 changes: 3 additions & 3 deletions include/fastdds/rtps/reader/StatefulReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class StatefulReader : public RTPSReader
void send_acknack(
const WriterProxy* writer,
const SequenceNumberSet_t& sns,
const RTPSMessageSenderInterface* sender,
RTPSMessageSenderInterface* sender,
bool is_final);

/**
Expand All @@ -257,7 +257,7 @@ class StatefulReader : public RTPSReader
*/
void send_acknack(
const WriterProxy* writer,
const RTPSMessageSenderInterface* sender,
RTPSMessageSenderInterface* sender,
bool heartbeat_was_final);

/**
Expand Down Expand Up @@ -313,7 +313,7 @@ class StatefulReader : public RTPSReader
*/
void change_read_by_user(
CacheChange_t* change,
const WriterProxy* writer,
WriterProxy* writer,
bool mark_as_read = true) override;

private:
Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/rtps/reader/StatelessReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class StatelessReader : public RTPSReader
*/
void change_read_by_user(
CacheChange_t* change,
const WriterProxy* writer,
WriterProxy* writer,
bool mark_as_read = true) override;

private:
Expand Down
28 changes: 25 additions & 3 deletions include/fastdds/rtps/writer/LocatorSelectorSender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface
RTPSWriter& writer,
ResourceLimitedContainerConfig matched_readers_allocation
)
: writer(writer)
, locator_selector(matched_readers_allocation)
: locator_selector(matched_readers_allocation)
, all_remote_readers(matched_readers_allocation)
, all_remote_participants(matched_readers_allocation)
, writer_(writer)
{
}

Expand Down Expand Up @@ -75,13 +75,35 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface
CDRMessage_t* message,
std::chrono::steady_clock::time_point max_blocking_time_point) const override;

RTPSWriter& writer;
/*!
* Lock the object.
*
* This kind of object needs to be locked because could be used outside the writer's mutex.
*/
void lock() override
{
mutex_.lock();
}

/*!
* Unlock the object.
*/
void unlock() override
{
mutex_.unlock();
}

fastrtps::rtps::LocatorSelector locator_selector;

ResourceLimitedVector<GUID_t> all_remote_readers;

ResourceLimitedVector<GuidPrefix_t> all_remote_participants;

private:

RTPSWriter& writer_;

std::recursive_mutex mutex_;
};

} // namespace rtps
Expand Down
16 changes: 16 additions & 0 deletions include/fastdds/rtps/writer/ReaderLocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ class ReaderLocator : public RTPSMessageSenderInterface

}

/*
* Do nothing.
* This object always is protected by writer's mutex.
*/
void lock() override
{
}

/*
* Do nothing.
* This object always is protected by writer's mutex.
*/
void unlock() override
{
}

private:

RTPSWriter* owner_;
Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class ReaderProxy
return locator_info_.locator_selector_entry();
}

const RTPSMessageSenderInterface* message_sender() const
RTPSMessageSenderInterface* message_sender()
{
return &locator_info_;
}
Expand Down
14 changes: 14 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/DirectMessageSender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ class DirectMessageSender : public RTPSMessageSenderInterface
CDRMessage_t* message,
std::chrono::steady_clock::time_point max_blocking_time_point) const override;

/*
* Do nothing.
*/
void lock() override
{
}

/*
* Do nothing.
*/
void unlock() override
{
}

private:

RTPSParticipantImpl* participant_;
Expand Down
4 changes: 4 additions & 0 deletions src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,7 @@ class FlowControllerImpl : public FlowController
{
// This call should be made with writer's mutex locked.
fastrtps::rtps::LocatorSelectorSender& locator_selector = writer->get_general_locator_selector();
std::lock_guard<fastrtps::rtps::LocatorSelectorSender> lock(locator_selector);
fastrtps::rtps::RTPSMessageGroup group(participant_, writer, &locator_selector);
if (fastrtps::rtps::DeliveryRetCode::DELIVERED !=
writer->deliver_sample_nts(change, group, locator_selector, max_blocking_time))
Expand Down Expand Up @@ -1324,6 +1325,7 @@ class FlowControllerImpl : public FlowController

fastrtps::rtps::LocatorSelectorSender& locator_selector =
current_writer->get_async_locator_selector();
locator_selector.lock();
async_mode.group.sender(current_writer, &locator_selector);

// Remove previously from queue, because deliver_sample_nts could call FlowController::remove_sample()
Expand All @@ -1349,11 +1351,13 @@ class FlowControllerImpl : public FlowController

async_mode.process_deliver_retcode(ret_delivery);

locator_selector.unlock();
current_writer->getMutex().unlock();
// Unlock mutex_ and try again.
break;
}

locator_selector.unlock();
current_writer->getMutex().unlock();

sched.work_done();
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/messages/RTPSMessageGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ RTPSMessageGroup::RTPSMessageGroup(
RTPSMessageGroup::RTPSMessageGroup(
RTPSParticipantImpl* participant,
Endpoint* endpoint,
const RTPSMessageSenderInterface* msg_sender,
RTPSMessageSenderInterface* msg_sender,
std::chrono::steady_clock::time_point max_blocking_time_point)
: RTPSMessageGroup(participant)
{
Expand Down Expand Up @@ -236,7 +236,7 @@ void RTPSMessageGroup::send()

if (full_msg_->length > RTPSMESSAGE_HEADER_SIZE)
{
std::unique_lock<RecursiveTimedMutex> lock(endpoint_->getMutex());
std::lock_guard<RTPSMessageSenderInterface> lock(*sender_);

#if HAVE_SECURITY
// TODO(Ricardo) Control message size if it will be encrypted.
Expand Down
6 changes: 3 additions & 3 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ void StatefulReader::end_sample_access_nts(

void StatefulReader::change_read_by_user(
CacheChange_t* change,
const WriterProxy* writer,
WriterProxy* writer,
bool mark_as_read)
{
assert(writer != nullptr);
Expand Down Expand Up @@ -1166,7 +1166,7 @@ void StatefulReader::change_read_by_user(
void StatefulReader::send_acknack(
const WriterProxy* writer,
const SequenceNumberSet_t& sns,
const RTPSMessageSenderInterface* sender,
RTPSMessageSenderInterface* sender,
bool is_final)
{

Expand All @@ -1193,7 +1193,7 @@ void StatefulReader::send_acknack(

void StatefulReader::send_acknack(
const WriterProxy* writer,
const RTPSMessageSenderInterface* sender,
RTPSMessageSenderInterface* sender,
bool heartbeat_was_final)
{
// Protect reader
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ void StatelessReader::end_sample_access_nts(

void StatelessReader::change_read_by_user(
CacheChange_t* change,
const WriterProxy* /*writer*/,
WriterProxy* /*writer*/,
bool mark_as_read)
{
// Mark change as read
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/reader/WriterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ SequenceNumber_t WriterProxy::next_cache_change_to_be_notified()
return SequenceNumber_t::unknown();
}

void WriterProxy::perform_initial_ack_nack() const
void WriterProxy::perform_initial_ack_nack()
{
// Send initial NACK.
SequenceNumberSet_t sns(SequenceNumber_t(0, 0));
Expand All @@ -506,7 +506,7 @@ void WriterProxy::perform_initial_ack_nack() const
}
}

void WriterProxy::perform_heartbeat_response() const
void WriterProxy::perform_heartbeat_response()
{
reader_->send_acknack(this, this, heartbeat_final_flag_.load());
}
Expand Down
20 changes: 18 additions & 2 deletions src/cpp/rtps/reader/WriterProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,12 @@ class WriterProxy : public RTPSMessageSenderInterface
/**
* Sends a preemptive acknack to the writer represented by this proxy.
*/
void perform_initial_ack_nack() const;
void perform_initial_ack_nack();

/**
* Sends the necessary acknac and nackfrag messages to answer the last received heartbeat message.
*/
void perform_heartbeat_response() const;
void perform_heartbeat_response();

/**
* Process an incoming heartbeat from the writer represented by this proxy.
Expand Down Expand Up @@ -337,6 +337,22 @@ class WriterProxy : public RTPSMessageSenderInterface
return is_datasharing_writer_;
}

/*
* Do nothing.
* This object always is protected by reader's mutex.
*/
void lock() override
{
}

/*
* Do nothing.
* This object always is protected by reader's mutex.
*/
void unlock() override
{
}

private:

/**
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/writer/LocatorSelectorSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ bool LocatorSelectorSender::send(
CDRMessage_t* message,
std::chrono::steady_clock::time_point max_blocking_time_point) const
{
return writer.send_nts(message, *this, max_blocking_time_point);
return writer_.send_nts(message, *this, max_blocking_time_point);
}

} // namespace rtps
Expand Down
10 changes: 7 additions & 3 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ void StatefulWriter::deliver_sample_to_datasharing(
DeliveryRetCode StatefulWriter::deliver_sample_to_network(
CacheChange_t* change,
RTPSMessageGroup& group,
LocatorSelectorSender& locator_selector,
LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
{
NetworkFactory& network = mp_RTPSParticipant->network_factory();
Expand Down Expand Up @@ -980,6 +980,8 @@ bool StatefulWriter::matched_reader_add(
}

std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);

// Check if it is already matched.
if (for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
Expand Down Expand Up @@ -1153,6 +1155,8 @@ bool StatefulWriter::matched_reader_remove(
{
ReaderProxy* rproxy = nullptr;
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);

for (ReaderProxyIterator it = matched_local_readers_.begin();
it != matched_local_readers_.end(); ++it)
Expand Down Expand Up @@ -1211,7 +1215,6 @@ bool StatefulWriter::matched_reader_remove(
rproxy->stop();
matched_readers_pool_.push_back(rproxy);

lock.unlock();
check_acked_status();

return true;
Expand Down Expand Up @@ -1550,6 +1553,7 @@ bool StatefulWriter::send_periodic_heartbeat(
bool liveliness)
{
std::lock_guard<RecursiveTimedMutex> guardW(mp_mutex);
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);

bool unacked_changes = false;
if (!liveliness)
Expand Down Expand Up @@ -1985,7 +1989,7 @@ const fastdds::rtps::IReaderDataFilter* StatefulWriter::reader_data_filter() con
DeliveryRetCode StatefulWriter::deliver_sample_nts(
CacheChange_t* cache_change,
RTPSMessageGroup& group,
LocatorSelectorSender& locator_selector,
LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
{
DeliveryRetCode ret_code = DeliveryRetCode::DELIVERED;
Expand Down
Loading