diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index 110717b38e3..3f45373fdb4 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -420,7 +420,7 @@ bool StatefulReader::processDataMsg( assert(change); - std::lock_guard lock(mp_mutex); + std::unique_lock lock(mp_mutex); if (!is_alive_) { return false; @@ -428,8 +428,6 @@ bool StatefulReader::processDataMsg( if (acceptMsgFrom(change->writerGUID, &pWP)) { - assert_writer_liveliness(change->writerGUID); - // Check if CacheChange was received or is framework data if (!pWP || !pWP->change_was_received(change->sequenceNumber)) { @@ -490,6 +488,10 @@ bool StatefulReader::processDataMsg( return false; } } + + lock.unlock(); // Avoid deadlock with LivelinessManager. + assert_writer_liveliness(change->writerGUID); + return true; } @@ -515,8 +517,6 @@ bool StatefulReader::processDataFragMsg( // TODO: see if we need manage framework fragmented DATA message if (acceptMsgFrom(incomingChange->writerGUID, &pWP) && pWP) { - assert_writer_liveliness(incomingChange->writerGUID); - // Check if CacheChange was received. if (!pWP->change_was_received(incomingChange->sequenceNumber)) { @@ -575,6 +575,10 @@ bool StatefulReader::processDataFragMsg( NotifyChanges(pWP); } } + + lock.unlock(); // Avoid deadlock with LivelinessManager; + assert_writer_liveliness(incomingChange->writerGUID); + } return true; @@ -604,6 +608,9 @@ bool StatefulReader::processHeartbeatMsg( { mp_history->remove_fragmented_changes_until(firstSN, writerGUID); + // Maybe now we have to notify user from new CacheChanges. + NotifyChanges(writer); + // Try to assert liveliness if requested by proxy's logic if (assert_liveliness) { @@ -615,6 +622,7 @@ bool StatefulReader::processHeartbeatMsg( auto wlp = this->mp_RTPSParticipant->wlp(); if ( wlp != nullptr) { + lock.unlock(); // Avoid deadlock with LivelinessManager. wlp->sub_liveliness_manager_->assert_liveliness( writerGUID, liveliness_kind_, @@ -627,10 +635,8 @@ bool StatefulReader::processHeartbeatMsg( } } } - - // Maybe now we have to notify user from new CacheChanges. - NotifyChanges(writer); } + return true; } diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index a71b0173d6c..e3c4818f401 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -394,8 +394,6 @@ bool StatelessReader::processDataMsg( { logInfo(RTPS_MSG_IN, IDSTRING "Trying to add change " << change->sequenceNumber << " TO reader: " << m_guid); - assert_writer_liveliness(change->writerGUID); - // Ask the pool for a cache change CacheChange_t* change_to_add = nullptr; if (!change_pool_->reserve_cache(change_to_add)) @@ -456,6 +454,9 @@ bool StatelessReader::processDataMsg( change_pool_->release_cache(change_to_add); return false; } + + lock.unlock(); // Avoid deadlock with LivelinessManager. + assert_writer_liveliness(change->writerGUID); } return true; @@ -478,7 +479,6 @@ bool StatelessReader::processDataFragMsg( { // Datasharing communication will never send fragments assert(!writer.is_datasharing); - assert_writer_liveliness(writer_guid); // Check if CacheChange was received. if (!thereIsUpperRecordOf(writer_guid, incomingChange->sequenceNumber)) @@ -564,6 +564,10 @@ bool StatelessReader::processDataFragMsg( } } } + + lock.unlock(); // Avoid deadlock with LivelinessManager. + assert_writer_liveliness(writer_guid); + return true; } }