From 64eba7b4b3c555b0d06283f34a3ce18d83decdd5 Mon Sep 17 00:00:00 2001
From: Miguel Company <miguelcompany@eprosima.com>
Date: Fri, 2 Jul 2021 16:22:09 +0200
Subject: [PATCH] Fix deadlock with LivelinessManager (#2037)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
---
 src/cpp/rtps/reader/StatefulReader.cpp  | 22 ++++++++++++++--------
 src/cpp/rtps/reader/StatelessReader.cpp | 10 +++++++---
 2 files changed, 21 insertions(+), 11 deletions(-)

diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp
index 110717b38e3..3f45373fdb4 100644
--- a/src/cpp/rtps/reader/StatefulReader.cpp
+++ b/src/cpp/rtps/reader/StatefulReader.cpp
@@ -420,7 +420,7 @@ bool StatefulReader::processDataMsg(
 
     assert(change);
 
-    std::lock_guard<RecursiveTimedMutex> lock(mp_mutex);
+    std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
     if (!is_alive_)
     {
         return false;
@@ -428,8 +428,6 @@ bool StatefulReader::processDataMsg(
 
     if (acceptMsgFrom(change->writerGUID, &pWP))
     {
-        assert_writer_liveliness(change->writerGUID);
-
         // Check if CacheChange was received or is framework data
         if (!pWP || !pWP->change_was_received(change->sequenceNumber))
         {
@@ -490,6 +488,10 @@ bool StatefulReader::processDataMsg(
                 return false;
             }
         }
+
+        lock.unlock(); // Avoid deadlock with LivelinessManager.
+        assert_writer_liveliness(change->writerGUID);
+
         return true;
     }
 
@@ -515,8 +517,6 @@ bool StatefulReader::processDataFragMsg(
     // TODO: see if we need manage framework fragmented DATA message
     if (acceptMsgFrom(incomingChange->writerGUID, &pWP) && pWP)
     {
-        assert_writer_liveliness(incomingChange->writerGUID);
-
         // Check if CacheChange was received.
         if (!pWP->change_was_received(incomingChange->sequenceNumber))
         {
@@ -575,6 +575,10 @@ bool StatefulReader::processDataFragMsg(
                 NotifyChanges(pWP);
             }
         }
+
+        lock.unlock(); // Avoid deadlock with LivelinessManager;
+        assert_writer_liveliness(incomingChange->writerGUID);
+
     }
 
     return true;
@@ -604,6 +608,9 @@ bool StatefulReader::processHeartbeatMsg(
         {
             mp_history->remove_fragmented_changes_until(firstSN, writerGUID);
 
+            // Maybe now we have to notify user from new CacheChanges.
+            NotifyChanges(writer);
+
             // Try to assert liveliness if requested by proxy's logic
             if (assert_liveliness)
             {
@@ -615,6 +622,7 @@ bool StatefulReader::processHeartbeatMsg(
                         auto wlp = this->mp_RTPSParticipant->wlp();
                         if ( wlp != nullptr)
                         {
+                            lock.unlock(); // Avoid deadlock with LivelinessManager.
                             wlp->sub_liveliness_manager_->assert_liveliness(
                                 writerGUID,
                                 liveliness_kind_,
@@ -627,10 +635,8 @@ bool StatefulReader::processHeartbeatMsg(
                     }
                 }
             }
-
-            // Maybe now we have to notify user from new CacheChanges.
-            NotifyChanges(writer);
         }
+
         return true;
     }
 
diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp
index a71b0173d6c..e3c4818f401 100644
--- a/src/cpp/rtps/reader/StatelessReader.cpp
+++ b/src/cpp/rtps/reader/StatelessReader.cpp
@@ -394,8 +394,6 @@ bool StatelessReader::processDataMsg(
     {
         logInfo(RTPS_MSG_IN, IDSTRING "Trying to add change " << change->sequenceNumber << " TO reader: " << m_guid);
 
-        assert_writer_liveliness(change->writerGUID);
-
         // Ask the pool for a cache change
         CacheChange_t* change_to_add = nullptr;
         if (!change_pool_->reserve_cache(change_to_add))
@@ -456,6 +454,9 @@ bool StatelessReader::processDataMsg(
             change_pool_->release_cache(change_to_add);
             return false;
         }
+
+        lock.unlock(); // Avoid deadlock with LivelinessManager.
+        assert_writer_liveliness(change->writerGUID);
     }
 
     return true;
@@ -478,7 +479,6 @@ bool StatelessReader::processDataFragMsg(
         {
             // Datasharing communication will never send fragments
             assert(!writer.is_datasharing);
-            assert_writer_liveliness(writer_guid);
 
             // Check if CacheChange was received.
             if (!thereIsUpperRecordOf(writer_guid, incomingChange->sequenceNumber))
@@ -564,6 +564,10 @@ bool StatelessReader::processDataFragMsg(
                     }
                 }
             }
+
+            lock.unlock(); // Avoid deadlock with LivelinessManager.
+            assert_writer_liveliness(writer_guid);
+
             return true;
         }
     }