Skip to content

Commit

Permalink
Refs 13120. Always assert liveliness on scope exit.
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
MiguelCompany committed Dec 2, 2021
1 parent e17467f commit 761b391
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
23 changes: 16 additions & 7 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,14 @@ bool StatefulReader::processDataMsg(
// Check if CacheChange was received or is framework data
if (!pWP || !pWP->change_was_received(change->sequenceNumber))
{
// Always assert liveliness on scope exit
auto assert_liveliness_lambda = [&lock, this, change](void*)
{
lock.unlock(); // Avoid deadlock with LivelinessManager.
assert_writer_liveliness(change->writerGUID);
};
std::unique_ptr<void, decltype(assert_liveliness_lambda)> p{ this, assert_liveliness_lambda };

logInfo(RTPS_MSG_IN,
IDSTRING "Trying to add change " << change->sequenceNumber << " TO reader: " << getGuid().entityId);

Expand Down Expand Up @@ -508,9 +516,6 @@ bool StatefulReader::processDataMsg(
}
}

lock.unlock(); // Avoid deadlock with LivelinessManager.
assert_writer_liveliness(change->writerGUID);

return true;
}

Expand All @@ -536,6 +541,14 @@ bool StatefulReader::processDataFragMsg(
// TODO: see if we need manage framework fragmented DATA message
if (acceptMsgFrom(incomingChange->writerGUID, &pWP) && pWP)
{
// Always assert liveliness on scope exit
auto assert_liveliness_lambda = [&lock, this, incomingChange](void*)
{
lock.unlock(); // Avoid deadlock with LivelinessManager.
assert_writer_liveliness(incomingChange->writerGUID);
};
std::unique_ptr<void, decltype(assert_liveliness_lambda)> p{ this, assert_liveliness_lambda };

// Check if CacheChange was received.
if (!pWP->change_was_received(incomingChange->sequenceNumber))
{
Expand Down Expand Up @@ -596,10 +609,6 @@ bool StatefulReader::processDataFragMsg(
NotifyChanges(pWP);
}
}

lock.unlock(); // Avoid deadlock with LivelinessManager;
assert_writer_liveliness(incomingChange->writerGUID);

}

return true;
Expand Down
22 changes: 16 additions & 6 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ bool StatelessReader::processDataMsg(

if (acceptMsgFrom(change->writerGUID, change->kind))
{
// Always assert liveliness on scope exit
auto assert_liveliness_lambda = [&lock, this, change](void*)
{
lock.unlock(); // Avoid deadlock with LivelinessManager.
assert_writer_liveliness(change->writerGUID);
};
std::unique_ptr<void, decltype(assert_liveliness_lambda)> p{ this, assert_liveliness_lambda };

logInfo(RTPS_MSG_IN, IDSTRING "Trying to add change " << change->sequenceNumber << " TO reader: " << m_guid);

// Check rejection by history
Expand Down Expand Up @@ -467,9 +475,6 @@ bool StatelessReader::processDataMsg(
return false;
}
}

lock.unlock(); // Avoid deadlock with LivelinessManager.
assert_writer_liveliness(change->writerGUID);
}

return true;
Expand All @@ -490,6 +495,14 @@ bool StatelessReader::processDataFragMsg(
{
if (writer.guid == writer_guid)
{
// Always assert liveliness on scope exit
auto assert_liveliness_lambda = [&lock, this, &writer_guid](void*)
{
lock.unlock(); // Avoid deadlock with LivelinessManager.
assert_writer_liveliness(writer_guid);
};
std::unique_ptr<void, decltype(assert_liveliness_lambda)> p{ this, assert_liveliness_lambda };

// Datasharing communication will never send fragments
assert(!writer.is_datasharing);

Expand Down Expand Up @@ -578,9 +591,6 @@ bool StatelessReader::processDataFragMsg(
}
}

lock.unlock(); // Avoid deadlock with LivelinessManager.
assert_writer_liveliness(writer_guid);

return true;
}
}
Expand Down

0 comments on commit 761b391

Please sign in to comment.