Skip to content

Commit

Permalink
TCP deadlock on channel reuse (#4099)
Browse files Browse the repository at this point in the history
* Refs #19939: Channel disabling relocated and OnDataReceived() callback protected

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #19939: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #19939: Apply suggestions

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

---------

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
  • Loading branch information
jepemi authored Dec 13, 2023
1 parent f249561 commit dd4c434
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 11 deletions.
33 changes: 26 additions & 7 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,21 @@ void TCPTransportInterface::bind_socket(
auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel);
assert(it_remove != unbound_channel_resources_.end());
unbound_channel_resources_.erase(it_remove);

unbound_lock.unlock();

// Look for an existing channel that matches this physical locator
auto existing_channel = channel_resources_.find(channel->locator());
// If the channel exists, check if the channel reference wait until it finishes its tasks
if (existing_channel != channel_resources_.end())
{
// Disconnect the old channel
existing_channel->second->disconnect();
scopedLock.unlock();
existing_channel->second->clear();
scopedLock.lock();
}

channel_resources_[channel->locator()] = channel;

}
Expand Down Expand Up @@ -678,9 +693,6 @@ bool TCPTransportInterface::OpenOutputChannel(
if (existing_channel != channel_resources_.end() &&
existing_channel->second != tcp_sender_resource->channel())
{
// Disconnect the old channel
tcp_sender_resource->channel()->disconnect();
tcp_sender_resource->channel()->clear();
// Update sender resource with new channel
tcp_sender_resource->channel() = existing_channel->second;
}
Expand Down Expand Up @@ -917,10 +929,17 @@ void TCPTransportInterface::perform_listen_operation(
{
TransportReceiverInterface* receiver = it->second.first;
ReceiverInUseCV* receiver_in_use = it->second.second;
receiver_in_use->in_use = true;
scopedLock.unlock();
receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator);
scopedLock.lock();
receiver_in_use->cv.wait(scopedLock, [&]()
{
return receiver_in_use->in_use == false;
});
if (TCPChannelResource::eConnectionStatus::eConnecting < channel->connection_status())
{
receiver_in_use->in_use = true;
scopedLock.unlock();
receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator);
scopedLock.lock();
}
receiver_in_use->in_use = false;
receiver_in_use->cv.notify_one();
}
Expand Down
6 changes: 4 additions & 2 deletions test/blackbox/common/BlackboxTestsTransportTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,16 +615,18 @@ TEST_P(TransportTCP, TCPv6_copy)
EXPECT_EQ(tcpv6_transport_copy, tcpv6_transport);
}

// Test connection is successfully restablished after dropping and relaunching a TCP client (requester)
// Test connection is successfully restablished after dropping and relaunching a TCP client (requester),
// when the server's listening thread for the old client hasn't processed all its messages.
// Issue -> https://github.com/eProsima/Fast-DDS/issues/2409
// Issue -> https://github.com/eProsima/Fast-DDS/issues/4026
TEST(TransportTCP, Client_reconnection)
{
TCPReqRepHelloWorldReplier* replier;
TCPReqRepHelloWorldRequester* requester;
const uint16_t nmsgs = 5;

replier = new TCPReqRepHelloWorldReplier;
replier->init(1, 0, global_port);
replier->init(1, 0, global_port, 0, nullptr, true);

ASSERT_TRUE(replier->isInitialized());

Expand Down
8 changes: 7 additions & 1 deletion test/blackbox/common/TCPReqRepHelloWorldReplier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ void TCPReqRepHelloWorldReplier::init(
int domainId,
uint16_t listeningPort,
uint32_t maxInitialPeer,
const char* certs_path)
const char* certs_path,
bool use_busy_listener)
{
ParticipantAttributes pattr;
pattr.domainId = domainId;
Expand Down Expand Up @@ -132,9 +133,14 @@ void TCPReqRepHelloWorldReplier::init(
puattr.topic.topicDataType = type_.getName();
puattr.topic.topicName = "HelloWorldTopicReply";
configPublisher("Reply");
if (use_busy_listener)
{
reply_listener_.use_busy_listener(true);
}
reply_publisher_ = Domain::createPublisher(participant_, puattr, &reply_listener_);
ASSERT_NE(reply_publisher_, nullptr);


initialized_ = true;
}

Expand Down
16 changes: 15 additions & 1 deletion test/blackbox/common/TCPReqRepHelloWorldReplier.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <fastrtps/publisher/PublisherListener.h>
#include <fastrtps/attributes/PublisherAttributes.h>

#include <thread>
#include <list>
#include <condition_variable>
#include <asio.hpp>
Expand Down Expand Up @@ -91,6 +92,7 @@ class TCPReqRepHelloWorldReplier
RequestListener(
TCPReqRepHelloWorldReplier& replier)
: replier_(replier)
, use_busy_listener_(false)
{
}

Expand All @@ -106,6 +108,16 @@ class TCPReqRepHelloWorldReplier
{
replier_.matched();
}
else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING && use_busy_listener_)
{
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}

void use_busy_listener(
const bool& value)
{
use_busy_listener_ = value;
}

private:
Expand All @@ -114,6 +126,7 @@ class TCPReqRepHelloWorldReplier
const RequestListener&) = delete;

TCPReqRepHelloWorldReplier& replier_;
bool use_busy_listener_;

}
reply_listener_;
Expand All @@ -125,7 +138,8 @@ class TCPReqRepHelloWorldReplier
int domainId,
uint16_t listeningPort,
uint32_t maxInitialPeer = 0,
const char* certs_path = nullptr);
const char* certs_path = nullptr,
bool use_busy_listener = false);
bool isInitialized() const
{
return initialized_;
Expand Down

0 comments on commit dd4c434

Please sign in to comment.