Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20628] Fix Discovery Server over TCP using logical port #4584

Merged
merged 12 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <rtps/builtin/discovery/participant/DS/PDPSecurityInitiatorListener.hpp>
#include <rtps/builtin/discovery/participant/timedevent/DSClientEvent.h>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <fastdds/rtps/transport/TCPTransportDescriptor.h>
#include <utils/SystemInfo.hpp>
#include <vector>

Expand Down Expand Up @@ -443,6 +444,15 @@ bool PDPClient::create_ds_pdp_reliable_endpoints(
{
eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

// TCP Clients need to handle logical ports
if (mp_RTPSParticipant->has_tcp_transports())
{
for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
mp_RTPSParticipant->create_tcp_connections(it.metatrafficUnicastLocatorList);
}
}

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList);
Expand Down Expand Up @@ -841,8 +851,20 @@ void PDPClient::update_remote_servers_list()
{
eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

// TCP Clients need to handle logical ports
bool set_logicals = mp_RTPSParticipant->has_tcp_transports();

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) ||
!endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader()))
{
if (set_logicals)
{
mp_RTPSParticipant->create_tcp_connections(it.metatrafficUnicastLocatorList);
}
}

if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()))
{
match_pdp_writer_nts_(it);
Expand Down
25 changes: 25 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <fastdds/rtps/participant/RTPSParticipantListener.h>
#include <fastdds/rtps/reader/StatefulReader.h>
#include <fastdds/rtps/writer/StatefulWriter.h>
#include <fastdds/rtps/transport/TCPTransportDescriptor.h>

#include <fastdds/rtps/history/WriterHistory.h>
#include <fastdds/rtps/history/ReaderHistory.h>
Expand Down Expand Up @@ -515,6 +516,15 @@ bool PDPServer::create_ds_pdp_reliable_endpoints(
{
eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

// TCP Clients need to handle logical ports
if (mp_RTPSParticipant->has_tcp_transports())
{
for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
mp_RTPSParticipant->create_tcp_connections(it.metatrafficUnicastLocatorList);
}
}

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
mp_RTPSParticipant->createSenderResources(it.metatrafficMulticastLocatorList);
Expand Down Expand Up @@ -1186,8 +1196,20 @@ void PDPServer::update_remote_servers_list()

eprosima::shared_lock<eprosima::shared_mutex> disc_lock(mp_builtin->getDiscoveryMutex());

// TCP Clients need to handle logical ports
bool set_logicals = mp_RTPSParticipant->has_tcp_transports();

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()) ||
!endpoints->writer.writer_->matched_reader_is_matched(it.GetPDPReader()))
{
if (set_logicals)
{
mp_RTPSParticipant->create_tcp_connections(it.metatrafficUnicastLocatorList);
}
}

if (!endpoints->reader.reader_->matched_writer_is_matched(it.GetPDPWriter()))
{
match_pdp_writer_nts_(it);
Expand All @@ -1203,6 +1225,9 @@ void PDPServer::update_remote_servers_list()
{
discovery_db_.add_server(server.guidPrefix);
}

// Need to reactivate the server thread to send the DATA(p) to the new servers
awake_server_thread();
}

bool PDPServer::process_writers_acknowledgements()
Expand Down
146 changes: 134 additions & 12 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,68 @@ RTPSParticipantImpl::RTPSParticipantImpl(
switch (m_att.builtin.discovery_config.discoveryProtocol)
{
case DiscoveryProtocol::BACKUP:
case DiscoveryProtocol::CLIENT:
case DiscoveryProtocol::SERVER:
// Verify if listening ports are provided
for (auto& transportDescriptor : m_att.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT)
{
if (pT->listening_ports.empty())
{
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT,
"Participant " << m_att.getName() << " with GUID " << m_guid <<
" tries to create a TCP server for discovery server without providing a proper listening port.");
break;
}
if (!m_att.builtin.metatrafficUnicastLocatorList.empty())
{
std::for_each(m_att.builtin.metatrafficUnicastLocatorList.begin(),
m_att.builtin.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator)
{
// TCP DS default logical port is the same as the physical one
if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6)
{
if (IPLocator::getLogicalPort(locator) == 0)
{
IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator));
}
}
});
}
}
}
break;
case DiscoveryProtocol::CLIENT:
case DiscoveryProtocol::SUPER_CLIENT:
// Verify if listening ports are provided
for (auto& transportDescriptor : m_att.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT && pT->listening_ports.empty())
if (pT)
{
EPROSIMA_LOG_INFO(RTPS_PARTICIPANT,
"Participant " << m_att.getName() << " with GUID " << m_guid <<
" tries to use discovery server over TCP without providing a proper listening port.");
if (pT->listening_ports.empty())
{
EPROSIMA_LOG_INFO(RTPS_PARTICIPANT,
"Participant " << m_att.getName() << " with GUID " << m_guid <<
" tries to create a TCP client for discovery server without providing a proper listening port." <<
" No TCP participants will be able to connect to this participant, but it will be able make connections.");
}
for (fastdds::rtps::RemoteServerAttributes& it : m_att.builtin.discovery_config.m_DiscoveryServers)
{
std::for_each(it.metatrafficUnicastLocatorList.begin(),
it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator)
{
// TCP DS default logical port is the same as the physical one
if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6)
{
if (IPLocator::getLogicalPort(locator) == 0)
{
IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator));
}
}
});
}
}
}
default:
Expand Down Expand Up @@ -1318,8 +1368,37 @@ void RTPSParticipantImpl::update_attributes(
auto pdp = mp_builtinProtocols->mp_PDP;
bool update_pdp = false;

// Check if discovery servers need to be updated
eprosima::fastdds::rtps::RemoteServerList_t converted_discovery_servers =
patt.builtin.discovery_config.m_DiscoveryServers;
if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers)
{
for (auto& transportDescriptor : m_att.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT)
{
for (fastdds::rtps::RemoteServerAttributes& it : converted_discovery_servers)
{
std::for_each(it.metatrafficUnicastLocatorList.begin(),
it.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator)
{
// TCP DS default logical port is the same as the physical one
if (locator.kind == LOCATOR_KIND_TCPv4 || locator.kind == LOCATOR_KIND_TCPv6)
{
if (IPLocator::getLogicalPort(locator) == 0)
{
IPLocator::setLogicalPort(locator, IPLocator::getPhysicalPort(locator));
}
}
});
}
}
}
}

// Check if there are changes
if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers
if (converted_discovery_servers != m_att.builtin.discovery_config.m_DiscoveryServers
|| patt.userData != m_att.userData
|| local_interfaces_changed)
{
Expand Down Expand Up @@ -1356,7 +1435,7 @@ void RTPSParticipantImpl::update_attributes(
for (auto existing_server : m_att.builtin.discovery_config.m_DiscoveryServers)
{
bool contained = false;
for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers)
for (auto incoming_server : converted_discovery_servers)
{
if (existing_server.guidPrefix == incoming_server.guidPrefix)
{
Expand Down Expand Up @@ -1415,9 +1494,12 @@ void RTPSParticipantImpl::update_attributes(
local_participant_proxy_data->default_locators.add_unicast_locator(locator);
}

createSenderResources(m_att.builtin.metatrafficMulticastLocatorList);
createSenderResources(m_att.builtin.metatrafficUnicastLocatorList);
createSenderResources(m_att.defaultUnicastLocatorList);
if (local_interfaces_changed)
{
createSenderResources(m_att.builtin.metatrafficMulticastLocatorList);
createSenderResources(m_att.builtin.metatrafficUnicastLocatorList);
createSenderResources(m_att.defaultUnicastLocatorList);
}
if (!modified_locators.empty())
{
createSenderResources(modified_locators);
Expand All @@ -1429,8 +1511,8 @@ void RTPSParticipantImpl::update_attributes(
m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER ||
m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP)
{
// Add incoming servers iff we don't know about them already or the listening locator has been modified
for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers)
// Add incoming servers if we don't know about them already or the listening locator has been modified
for (auto incoming_server : converted_discovery_servers)
{
eprosima::fastdds::rtps::RemoteServerList_t::iterator server_it;
for (server_it = m_att.builtin.discovery_config.m_DiscoveryServers.begin();
Expand Down Expand Up @@ -2251,6 +2333,38 @@ fastdds::dds::builtin::TypeLookupManager* RTPSParticipantImpl::typelookup_manage
return mp_builtinProtocols->tlm_;
}

bool RTPSParticipantImpl::has_tcp_transports()
{
const RTPSParticipantAttributes& pattr = getRTPSParticipantAttributes();
bool has_tcp_transports = false;
for (auto& transportDescriptor : pattr.userTransports)
{
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT)
{
has_tcp_transports = true;
break;
}
}

return has_tcp_transports;
}

void RTPSParticipantImpl::create_tcp_connections(
const LocatorList_t& locators)
{
for (const Locator_t& loc : locators)
{
if (loc.kind == LOCATOR_KIND_TCPv4 || loc.kind == LOCATOR_KIND_TCPv6)
{
// Set logical port to 0 and call createSenderResources to allow opening a TCP CONNECT channel in the transport
Locator_t loc_with_logical_zero = loc;
IPLocator::setLogicalPort(loc_with_logical_zero, 0);
createSenderResources(loc_with_logical_zero);
}
}
}

IPersistenceService* RTPSParticipantImpl::get_persistence_service(
const EndpointAttributes& param)
{
Expand Down Expand Up @@ -2453,9 +2567,17 @@ bool RTPSParticipantImpl::did_mutation_took_place_on_meta(
case LOCATOR_KIND_TCPv4:
set_wan_address(ret);
IPLocator::setPhysicalPort(ret, Tcp4ListeningPort());
if (IPLocator::getLogicalPort(ret) == 0)
{
IPLocator::setLogicalPort(ret, IPLocator::getPhysicalPort(ret));
}
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
break;
case LOCATOR_KIND_TCPv6:
IPLocator::setPhysicalPort(ret, Tcp6ListeningPort());
if (IPLocator::getLogicalPort(ret) == 0)
{
IPLocator::setLogicalPort(ret, IPLocator::getPhysicalPort(ret));
}
break;
}
return ret;
Expand Down
13 changes: 13 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,19 @@ class RTPSParticipantImpl
return has_shm_transport_;
}

//! Check if the participant has at least one TCP transport
bool has_tcp_transports();

/**
* This method creates the needed sender resources for a locator list, but forces
* each logical port to be zero. It is used to enforce the proper creation of a
* CONNECT channel in TCP scenarios.
*
* @param locators List of unicast locators.
*/
void create_tcp_connections(
const LocatorList_t& locators);

uint32_t get_min_network_send_buffer_size()
{
return m_network_Factory.get_min_send_buffer_size();
Expand Down
33 changes: 23 additions & 10 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,10 +728,14 @@ bool TCPTransportInterface::OpenOutputChannel(
return false;
}

bool always_connect = false;
uint16_t logical_port = IPLocator::getLogicalPort(locator);
if (0 == logical_port)
{
return false;
// During builtin endpoints setup, a logical port equal to 0 indicates that the locator belongs
// to discovery server remote server. A connect channel is always needed.
// Should only be called once to avoid adding a logical port equal to 0.
always_connect = true;
}

Locator physical_locator = IPLocator::toPhysicalLocator(locator);
Expand Down Expand Up @@ -760,14 +764,17 @@ bool TCPTransportInterface::OpenOutputChannel(
channel_resource = channel_resources_.find(IPLocator::toPhysicalLocator(wan_locator));
}

if (channel_resource != channel_resources_.end())
{
channel_resource->second->add_logical_port(logical_port, rtcp_message_manager_.get());
}
else
if (logical_port != 0)
{
std::lock_guard<std::mutex> channelPendingLock(channel_pending_logical_ports_mutex_);
channel_pending_logical_ports_[physical_locator].insert(logical_port);
if (channel_resource != channel_resources_.end())
{
channel_resource->second->add_logical_port(logical_port, rtcp_message_manager_.get());
}
else
{
std::lock_guard<std::mutex> channelPendingLock(channel_pending_logical_ports_mutex_);
channel_pending_logical_ports_[physical_locator].insert(logical_port);
}
}

statistics_info_.add_entry(locator);
Expand Down Expand Up @@ -841,7 +848,9 @@ bool TCPTransportInterface::OpenOutputChannel(
// If the remote physical port is higher than our listening port, a new CONNECT channel needs to be created and connected
// and the locator added to the send_resource_list.
// If the remote physical port is lower than our listening port, only the locator needs to be added to the send_resource_list.
if (IPLocator::getPhysicalPort(physical_locator) > listening_port || local_lower_interface)
// If the ports are equal, the CONNECT channel is created if the local interface is lower.
// If this locator belong to a DS server, a CONNECT channel is always needed.
if (always_connect || IPLocator::getPhysicalPort(physical_locator) > listening_port || local_lower_interface)
{
// Client side (either Server-Client or LARGE_DATA)
EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [CONNECT] (physical: "
Expand All @@ -863,7 +872,11 @@ bool TCPTransportInterface::OpenOutputChannel(

channel_resources_[physical_locator] = channel;
channel->connect(channel_resources_[physical_locator]);
channel->add_logical_port(logical_port, rtcp_message_manager_.get());
// Add logical port only if it's not 0
if (!always_connect)
{
channel->add_logical_port(logical_port, rtcp_message_manager_.get());
}
}
else
{
Expand Down
Loading
Loading