Skip to content

Commit

Permalink
Fix deadlock with LivelinessManager (#2037)
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
  • Loading branch information
MiguelCompany committed Jul 2, 2021
1 parent 36b059c commit 64eba7b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
22 changes: 14 additions & 8 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,16 +420,14 @@ bool StatefulReader::processDataMsg(

assert(change);

std::lock_guard<RecursiveTimedMutex> lock(mp_mutex);
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
if (!is_alive_)
{
return false;
}

if (acceptMsgFrom(change->writerGUID, &pWP))
{
assert_writer_liveliness(change->writerGUID);

// Check if CacheChange was received or is framework data
if (!pWP || !pWP->change_was_received(change->sequenceNumber))
{
Expand Down Expand Up @@ -490,6 +488,10 @@ bool StatefulReader::processDataMsg(
return false;
}
}

lock.unlock(); // Avoid deadlock with LivelinessManager.
assert_writer_liveliness(change->writerGUID);

return true;
}

Expand All @@ -515,8 +517,6 @@ bool StatefulReader::processDataFragMsg(
// TODO: see if we need manage framework fragmented DATA message
if (acceptMsgFrom(incomingChange->writerGUID, &pWP) && pWP)
{
assert_writer_liveliness(incomingChange->writerGUID);

// Check if CacheChange was received.
if (!pWP->change_was_received(incomingChange->sequenceNumber))
{
Expand Down Expand Up @@ -575,6 +575,10 @@ bool StatefulReader::processDataFragMsg(
NotifyChanges(pWP);
}
}

lock.unlock(); // Avoid deadlock with LivelinessManager;
assert_writer_liveliness(incomingChange->writerGUID);

}

return true;
Expand Down Expand Up @@ -604,6 +608,9 @@ bool StatefulReader::processHeartbeatMsg(
{
mp_history->remove_fragmented_changes_until(firstSN, writerGUID);

// Maybe now we have to notify user from new CacheChanges.
NotifyChanges(writer);

// Try to assert liveliness if requested by proxy's logic
if (assert_liveliness)
{
Expand All @@ -615,6 +622,7 @@ bool StatefulReader::processHeartbeatMsg(
auto wlp = this->mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
{
lock.unlock(); // Avoid deadlock with LivelinessManager.
wlp->sub_liveliness_manager_->assert_liveliness(
writerGUID,
liveliness_kind_,
Expand All @@ -627,10 +635,8 @@ bool StatefulReader::processHeartbeatMsg(
}
}
}

// Maybe now we have to notify user from new CacheChanges.
NotifyChanges(writer);
}

return true;
}

Expand Down
10 changes: 7 additions & 3 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,6 @@ bool StatelessReader::processDataMsg(
{
logInfo(RTPS_MSG_IN, IDSTRING "Trying to add change " << change->sequenceNumber << " TO reader: " << m_guid);

assert_writer_liveliness(change->writerGUID);

// Ask the pool for a cache change
CacheChange_t* change_to_add = nullptr;
if (!change_pool_->reserve_cache(change_to_add))
Expand Down Expand Up @@ -456,6 +454,9 @@ bool StatelessReader::processDataMsg(
change_pool_->release_cache(change_to_add);
return false;
}

lock.unlock(); // Avoid deadlock with LivelinessManager.
assert_writer_liveliness(change->writerGUID);
}

return true;
Expand All @@ -478,7 +479,6 @@ bool StatelessReader::processDataFragMsg(
{
// Datasharing communication will never send fragments
assert(!writer.is_datasharing);
assert_writer_liveliness(writer_guid);

// Check if CacheChange was received.
if (!thereIsUpperRecordOf(writer_guid, incomingChange->sequenceNumber))
Expand Down Expand Up @@ -564,6 +564,10 @@ bool StatelessReader::processDataFragMsg(
}
}
}

lock.unlock(); // Avoid deadlock with LivelinessManager.
assert_writer_liveliness(writer_guid);

return true;
}
}
Expand Down

0 comments on commit 64eba7b

Please sign in to comment.