From dd4c434ccd3a029e1a4aa207443e79b05db5eb61 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jes=C3=BAs=20P=C3=A9rez?=
 <78275223+jepemi@users.noreply.github.com>
Date: Wed, 13 Dec 2023 07:49:41 +0100
Subject: [PATCH] TCP deadlock on channel reuse (#4099)

* Refs #19939: Channel disabling relocated and OnDataReceived() callback protected

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #19939: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #19939: Apply suggestions

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

---------

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
---
 .../rtps/transport/TCPTransportInterface.cpp  | 33 +++++++++++++++----
 .../common/BlackboxTestsTransportTCP.cpp      |  6 ++--
 .../common/TCPReqRepHelloWorldReplier.cpp     |  8 ++++-
 .../common/TCPReqRepHelloWorldReplier.hpp     | 16 ++++++++-
 4 files changed, 52 insertions(+), 11 deletions(-)

diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp
index 14c9ccde6f1..2fd9a2d60e2 100644
--- a/src/cpp/rtps/transport/TCPTransportInterface.cpp
+++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp
@@ -258,6 +258,21 @@ void TCPTransportInterface::bind_socket(
     auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel);
     assert(it_remove != unbound_channel_resources_.end());
     unbound_channel_resources_.erase(it_remove);
+
+    unbound_lock.unlock();
+
+    // Look for an existing channel that matches this physical locator
+    auto existing_channel = channel_resources_.find(channel->locator());
+    // If the channel exists, check if the channel reference wait until it finishes its tasks
+    if (existing_channel != channel_resources_.end())
+    {
+        // Disconnect the old channel
+        existing_channel->second->disconnect();
+        scopedLock.unlock();
+        existing_channel->second->clear();
+        scopedLock.lock();
+    }
+
     channel_resources_[channel->locator()] = channel;
 
 }
@@ -678,9 +693,6 @@ bool TCPTransportInterface::OpenOutputChannel(
                 if (existing_channel != channel_resources_.end() &&
                         existing_channel->second != tcp_sender_resource->channel())
                 {
-                    // Disconnect the old channel
-                    tcp_sender_resource->channel()->disconnect();
-                    tcp_sender_resource->channel()->clear();
                     // Update sender resource with new channel
                     tcp_sender_resource->channel() = existing_channel->second;
                 }
@@ -917,10 +929,17 @@ void TCPTransportInterface::perform_listen_operation(
             {
                 TransportReceiverInterface* receiver = it->second.first;
                 ReceiverInUseCV* receiver_in_use = it->second.second;
-                receiver_in_use->in_use = true;
-                scopedLock.unlock();
-                receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator);
-                scopedLock.lock();
+                receiver_in_use->cv.wait(scopedLock, [&]()
+                        {
+                            return receiver_in_use->in_use == false;
+                        });
+                if (TCPChannelResource::eConnectionStatus::eConnecting < channel->connection_status())
+                {
+                    receiver_in_use->in_use = true;
+                    scopedLock.unlock();
+                    receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator);
+                    scopedLock.lock();
+                }
                 receiver_in_use->in_use = false;
                 receiver_in_use->cv.notify_one();
             }
diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp
index d22dc6639af..1751654532a 100644
--- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp
+++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp
@@ -615,8 +615,10 @@ TEST_P(TransportTCP, TCPv6_copy)
     EXPECT_EQ(tcpv6_transport_copy, tcpv6_transport);
 }
 
-// Test connection is successfully restablished after dropping and relaunching a TCP client (requester)
+// Test connection is successfully restablished after dropping and relaunching a TCP client (requester),
+// when the server's listening thread for the old client hasn't processed all its messages.
 // Issue -> https://github.com/eProsima/Fast-DDS/issues/2409
+// Issue -> https://github.com/eProsima/Fast-DDS/issues/4026
 TEST(TransportTCP, Client_reconnection)
 {
     TCPReqRepHelloWorldReplier* replier;
@@ -624,7 +626,7 @@ TEST(TransportTCP, Client_reconnection)
     const uint16_t nmsgs = 5;
 
     replier = new TCPReqRepHelloWorldReplier;
-    replier->init(1, 0, global_port);
+    replier->init(1, 0, global_port, 0, nullptr, true);
 
     ASSERT_TRUE(replier->isInitialized());
 
diff --git a/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp b/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp
index 4910c44bbe6..6bd5f6b7633 100644
--- a/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp
+++ b/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp
@@ -65,7 +65,8 @@ void TCPReqRepHelloWorldReplier::init(
         int domainId,
         uint16_t listeningPort,
         uint32_t maxInitialPeer,
-        const char* certs_path)
+        const char* certs_path,
+        bool use_busy_listener)
 {
     ParticipantAttributes pattr;
     pattr.domainId = domainId;
@@ -132,9 +133,14 @@ void TCPReqRepHelloWorldReplier::init(
     puattr.topic.topicDataType = type_.getName();
     puattr.topic.topicName = "HelloWorldTopicReply";
     configPublisher("Reply");
+    if (use_busy_listener)
+    {
+        reply_listener_.use_busy_listener(true);
+    }
     reply_publisher_ = Domain::createPublisher(participant_, puattr, &reply_listener_);
     ASSERT_NE(reply_publisher_, nullptr);
 
+
     initialized_ = true;
 }
 
diff --git a/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp b/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp
index c08100b0faf..fff5d0e74ba 100644
--- a/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp
+++ b/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp
@@ -28,6 +28,7 @@
 #include <fastrtps/publisher/PublisherListener.h>
 #include <fastrtps/attributes/PublisherAttributes.h>
 
+#include <thread>
 #include <list>
 #include <condition_variable>
 #include <asio.hpp>
@@ -91,6 +92,7 @@ class TCPReqRepHelloWorldReplier
         RequestListener(
                 TCPReqRepHelloWorldReplier& replier)
             : replier_(replier)
+            , use_busy_listener_(false)
         {
         }
 
@@ -106,6 +108,16 @@ class TCPReqRepHelloWorldReplier
             {
                 replier_.matched();
             }
+            else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING && use_busy_listener_)
+            {
+                std::this_thread::sleep_for(std::chrono::milliseconds(500));
+            }
+        }
+
+        void use_busy_listener(
+                const bool& value)
+        {
+            use_busy_listener_ = value;
         }
 
     private:
@@ -114,6 +126,7 @@ class TCPReqRepHelloWorldReplier
                 const RequestListener&) = delete;
 
         TCPReqRepHelloWorldReplier& replier_;
+        bool use_busy_listener_;
 
     }
     reply_listener_;
@@ -125,7 +138,8 @@ class TCPReqRepHelloWorldReplier
             int domainId,
             uint16_t listeningPort,
             uint32_t maxInitialPeer = 0,
-            const char* certs_path = nullptr);
+            const char* certs_path = nullptr,
+            bool use_busy_listener = false);
     bool isInitialized() const
     {
         return initialized_;