Skip to content

Commit

Permalink
Revert "TCP deadlock on channel reuse (#4099)" (#4181)
Browse files Browse the repository at this point in the history
* Revert "TCP deadlock on channel reuse (#4099)"

This reverts commit dd4c434.

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

* Refs #20055: Separate builtin transports tests into individual cases

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

* Refs #20055: Mark large_data tests as flaky due to TCP

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

---------

Signed-off-by: EduPonz <eduardoponz@eprosima.com>
  • Loading branch information
EduPonz authored Dec 22, 2023
1 parent 6898d18 commit 5e87eb3
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 210 deletions.
33 changes: 7 additions & 26 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,21 +258,6 @@ void TCPTransportInterface::bind_socket(
auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel);
assert(it_remove != unbound_channel_resources_.end());
unbound_channel_resources_.erase(it_remove);

unbound_lock.unlock();

// Look for an existing channel that matches this physical locator
auto existing_channel = channel_resources_.find(channel->locator());
// If the channel exists, check if the channel reference wait until it finishes its tasks
if (existing_channel != channel_resources_.end())
{
// Disconnect the old channel
existing_channel->second->disconnect();
scopedLock.unlock();
existing_channel->second->clear();
scopedLock.lock();
}

channel_resources_[channel->locator()] = channel;

}
Expand Down Expand Up @@ -693,6 +678,9 @@ bool TCPTransportInterface::OpenOutputChannel(
if (existing_channel != channel_resources_.end() &&
existing_channel->second != tcp_sender_resource->channel())
{
// Disconnect the old channel
tcp_sender_resource->channel()->disconnect();
tcp_sender_resource->channel()->clear();
// Update sender resource with new channel
tcp_sender_resource->channel() = existing_channel->second;
}
Expand Down Expand Up @@ -929,17 +917,10 @@ void TCPTransportInterface::perform_listen_operation(
{
TransportReceiverInterface* receiver = it->second.first;
ReceiverInUseCV* receiver_in_use = it->second.second;
receiver_in_use->cv.wait(scopedLock, [&]()
{
return receiver_in_use->in_use == false;
});
if (TCPChannelResource::eConnectionStatus::eConnecting < channel->connection_status())
{
receiver_in_use->in_use = true;
scopedLock.unlock();
receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator);
scopedLock.lock();
}
receiver_in_use->in_use = true;
scopedLock.unlock();
receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator);
scopedLock.lock();
receiver_in_use->in_use = false;
receiver_in_use->cv.notify_one();
}
Expand Down
Loading

0 comments on commit 5e87eb3

Please sign in to comment.