Skip to content

Commit

Permalink
Discovery Server fix reconnection (#2246)
Browse files Browse the repository at this point in the history
* Refs #12522: Fix minor ds errors

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #12522: fix typo

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #12522: apply suggestions

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #12522: Client reconnection fix

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #12522: apply suggestions to fix comment

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #12522: uncrustify

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #12522: Apply new fix

Signed-off-by: jparisu <javierparis@eprosima.com>
(cherry picked from commit 4368260)

# Conflicts:
#	src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
#	src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp
  • Loading branch information
jparisu authored and mergify-bot committed Mar 22, 2022
1 parent 0670fb9 commit b63e46e
Show file tree
Hide file tree
Showing 4 changed files with 512 additions and 29 deletions.
10 changes: 10 additions & 0 deletions include/fastdds/rtps/builtin/discovery/participant/PDPClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ class PDPClient : public PDP
void notifyAboveRemoteEndpoints(
const ParticipantProxyData& pdata) override;

/**
* This method removes a remote RTPSParticipant and all its writers and readers.
* @param participant_guid GUID_t of the remote RTPSParticipant.
* @param reason Why the participant is being removed (dropped vs removed)
* @return true if correct.
*/
bool remove_remote_participant(
const GUID_t& participant_guid,
ParticipantDiscoveryInfo::DISCOVERY_STATUS reason) override;

/**
* Matching server EDP endpoints
* @return true if all servers have been discovered
Expand Down
53 changes: 24 additions & 29 deletions src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ void DiscoveryDataBase::create_writers_from_change_(
}
else
{
logError(DISCOVERY_DATABASE, "Writer " << writer_guid << " as no associated participant. Skipping");
logError(DISCOVERY_DATABASE, "Writer " << writer_guid << " has no associated participant. Skipping");
return;
}

Expand Down Expand Up @@ -972,7 +972,7 @@ void DiscoveryDataBase::create_readers_from_change_(
}
else
{
logError(DISCOVERY_DATABASE, "Writer " << reader_guid << " as no associated participant. Skipping");
logError(DISCOVERY_DATABASE, "Reader " << reader_guid << " has no associated participant. Skipping");
return;
}

Expand Down Expand Up @@ -1704,17 +1704,24 @@ void DiscoveryDataBase::AckedFunctor::operator () (
{
if (reader_proxy->guid().guidPrefix == *it)
{
// If the participant is already in the DB it means it has answered to the pinging
// or that is pinging us and we have already received its DATA(p)
// If neither of both has happenned we should not wait for it to ack this data, so we
// skip it and leave it as acked
/*
* If the participant is already in the DB it means it has answered to the pinging
* or that is pinging us and we have already received its DATA(p)
* If neither has happenned (participant is not in DB)
* we should not wait for it to ack this data, or it could get stucked in an endless loop
* (this Remote Server could not exist and/or never be discovered)
* Nevertheless, the ack is still pending for this participant and once it is discovered this
* data will be sent again
*/
auto remote_server_it = db_->participants_.find(*it);
if (remote_server_it == db_->participants_.end())
{
logInfo(DISCOVERY_DATABASE, "Change " << change_->instanceHandle <<
"check as acked for " << reader_proxy->guid() << " as it has not answered pinging yet");
return;
}

break;
}
}

Expand All @@ -1731,32 +1738,20 @@ void DiscoveryDataBase::unmatch_participant_(
{
logInfo(DISCOVERY_DATABASE, "unmatching participant: " << guid_prefix);

auto pit = participants_.find(guid_prefix);
if (pit == participants_.end())
// For each participant remove it
// IMPORTANT: This is not for every relevant participant, as participant A could be in other participant's B info
// and B not be relevant for A. So it must be done for every Participant.
for (auto& participant_it : participants_)
{
logWarning(DISCOVERY_DATABASE,
"Attempting to unmatch an unexisting participant: " << guid_prefix);
participant_it.second.remove_participant(guid_prefix);
}

// For each relevant participant make not relevant
for (eprosima::fastrtps::rtps::GuidPrefix_t relevant_participant : pit->second.relevant_participants())
for (auto& writer_it : writers_)
{
if (relevant_participant != guid_prefix)
{
auto rpit = participants_.find(relevant_participant);
if (rpit == participants_.end())
{
// This is not an error. Remote participants will try to unmatch with participants even
// when the match is not reciprocal
logInfo(DISCOVERY_DATABASE,
"Participant " << relevant_participant << " matched with an unexisting participant: " <<
guid_prefix);
}
else
{
rpit->second.remove_participant(guid_prefix);
}
}
writer_it.second.remove_participant(guid_prefix);
}
for (auto& reader_it : readers_)
{
reader_it.second.remove_participant(guid_prefix);
}
}

Expand Down
265 changes: 265 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,271 @@ bool PDPClient::match_servers_EDP_endpoints()
return all;
}

<<<<<<< HEAD
=======
void PDPClient::update_remote_servers_list()
{
if (!mp_PDPReader || !mp_PDPWriter)
{
logError(SERVER_CLIENT_DISCOVERY, "Cannot update server list within an uninitialized Client");
return;
}

std::lock_guard<std::recursive_mutex> lock(*getMutex());

for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers)
{
if (!mp_PDPReader->matched_writer_is_matched(it.GetPDPWriter()))
{
match_pdp_writer_nts_(it);
}

if (!mp_PDPWriter->matched_reader_is_matched(it.GetPDPReader()))
{
match_pdp_reader_nts_(it);
}
}
mp_sync->restart_timer();
}

void PDPClient::match_pdp_writer_nts_(
const eprosima::fastdds::rtps::RemoteServerAttributes& server_att)
{
std::lock_guard<std::mutex> data_guard(temp_data_lock_);
const NetworkFactory& network = mp_RTPSParticipant->network_factory();
temp_writer_data_.clear();
temp_writer_data_.guid(server_att.GetPDPWriter());
temp_writer_data_.set_multicast_locators(server_att.metatrafficMulticastLocatorList, network);
temp_writer_data_.set_remote_unicast_locators(server_att.metatrafficUnicastLocatorList, network);
temp_writer_data_.m_qos.m_durability.kind = TRANSIENT_DURABILITY_QOS;
temp_writer_data_.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
mp_PDPReader->matched_writer_add(temp_writer_data_);
}

void PDPClient::match_pdp_reader_nts_(
const eprosima::fastdds::rtps::RemoteServerAttributes& server_att)
{
std::lock_guard<std::mutex> data_guard(temp_data_lock_);
const NetworkFactory& network = mp_RTPSParticipant->network_factory();
temp_reader_data_.clear();
temp_reader_data_.guid(server_att.GetPDPReader());
temp_reader_data_.set_multicast_locators(server_att.metatrafficMulticastLocatorList, network);
temp_reader_data_.set_remote_unicast_locators(server_att.metatrafficUnicastLocatorList, network);
temp_reader_data_.m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
temp_reader_data_.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
mp_PDPWriter->matched_reader_add(temp_reader_data_);
}

const std::string& ros_discovery_server_env()
{
static std::string servers;
SystemInfo::get_env(DEFAULT_ROS2_MASTER_URI, servers);
return servers;
}

bool load_environment_server_info(
RemoteServerList_t& attributes)
{
return load_environment_server_info(ros_discovery_server_env(), attributes);
}

bool load_environment_server_info(
std::string list,
RemoteServerList_t& attributes)
{
attributes.clear();
if (list.empty())
{
return true;
}

/* Parsing ancillary regex */
// Address should be <letter,numbers,dots>:<number>. We do not need to verify that the first part
// is an IPv4 address, as it is done latter.
const std::regex ROS2_ADDRESS_PATTERN(R"(^([A-Za-z0-9-.]+)?:?(?:(\d+))?$)");
const std::regex ROS2_SERVER_LIST_PATTERN(R"(([^;]*);?)");

try
{
// Do the parsing and populate the list
RemoteServerAttributes server_att;
Locator_t server_locator(LOCATOR_KIND_UDPv4, DEFAULT_ROS2_SERVER_PORT);
int server_id = 0;

std::sregex_iterator server_it(
list.begin(),
list.end(),
ROS2_SERVER_LIST_PATTERN,
std::regex_constants::match_not_null);

while (server_it != std::sregex_iterator())
{
const std::smatch::value_type sm = *++(server_it->cbegin());

if (sm.matched)
{
// now we must parse the inner expression
std::smatch mr;
std::string locator(sm);
if (std::regex_match(locator, mr, ROS2_ADDRESS_PATTERN, std::regex_constants::match_not_null))
{
std::smatch::iterator it = mr.cbegin();

while (++it != mr.cend())
{
std::string address = it->str();

// Check whether the address is IPv4
if (!IPLocator::isIPv4(address))
{
auto response = rtps::IPLocator::resolveNameDNS(address);

// Add the first valid IPv4 address that we can find
if (response.first.size() > 0)
{
address = response.first.begin()->data();
}
}

if (!IPLocator::setIPv4(server_locator, address))
{
std::stringstream ss;
ss << "Wrong ipv4 address passed into the server's list " << address;
throw std::invalid_argument(ss.str());
}

if (IPLocator::isAny(server_locator))
{
// A server cannot be reach in all interfaces, it's clearly a localhost call
IPLocator::setIPv4(server_locator, "127.0.0.1");
}

if (++it != mr.cend())
{
// reset the locator to default
IPLocator::setPhysicalPort(server_locator, DEFAULT_ROS2_SERVER_PORT);

if (it->matched)
{
// note stoi throws also an invalid_argument
int port = stoi(it->str());

if (port > std::numeric_limits<uint16_t>::max())
{
throw std::out_of_range("Too large udp port passed into the server's list");
}

if (!IPLocator::setPhysicalPort(server_locator, static_cast<uint16_t>(port)))
{
std::stringstream ss;
ss << "Wrong udp port passed into the server's list " << it->str();
throw std::invalid_argument(ss.str());
}
}
}
}

// add the server to the list
if (!get_server_client_default_guidPrefix(server_id, server_att.guidPrefix))
{
throw std::invalid_argument("The maximum number of default discovery servers has been reached");
}

server_att.metatrafficUnicastLocatorList.clear();
server_att.metatrafficUnicastLocatorList.push_back(server_locator);
attributes.push_back(server_att);
}
else
{
if (!locator.empty())
{
std::stringstream ss;
ss << "Wrong locator passed into the server's list " << locator;
throw std::invalid_argument(ss.str());
}
// else: it's intencionally empty to hint us to ignore this server
}
}
// advance to the next server if any
++server_it;
++server_id;
}

// Check for server info
if (attributes.empty())
{
throw std::invalid_argument("No default server locators were provided.");
}
}
catch (std::exception& e)
{
logError(SERVER_CLIENT_DISCOVERY, e.what());
attributes.clear();
return false;
}

return true;
}

GUID_t RemoteServerAttributes::GetParticipant() const
{
return GUID_t(guidPrefix, c_EntityId_RTPSParticipant);
}

GUID_t RemoteServerAttributes::GetPDPReader() const
{
return GUID_t(guidPrefix, c_EntityId_SPDPReader);
}

GUID_t RemoteServerAttributes::GetPDPWriter() const
{
return GUID_t(guidPrefix, c_EntityId_SPDPWriter);
}

bool get_server_client_default_guidPrefix(
int id,
GuidPrefix_t& guid)
{
if ( id >= 0
&& id < 256
&& std::istringstream(DEFAULT_ROS2_SERVER_GUIDPREFIX) >> guid)
{
// Third octet denotes the server id
guid.value[2] = static_cast<octet>(id);

return true;
}

return false;
}

bool PDPClient::remove_remote_participant(
const GUID_t& partGUID,
ParticipantDiscoveryInfo::DISCOVERY_STATUS reason)
{
if (PDP::remove_remote_participant(partGUID, reason))
{
// If it works fine, return
return true;
}

// Erase Proxies created before having the Participant
GUID_t wguid;
wguid.guidPrefix = partGUID.guidPrefix;
wguid.entityId = c_EntityId_SPDPWriter;
mp_PDPReader->matched_writer_remove(wguid);

GUID_t rguid;
rguid.guidPrefix = partGUID.guidPrefix;
rguid.entityId = c_EntityId_SPDPReader;
mp_PDPWriter->matched_reader_remove(rguid);

update_remote_servers_list();

return false;
}

>>>>>>> 436826000 (Discovery Server fix reconnection (#2246))
} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
Loading

0 comments on commit b63e46e

Please sign in to comment.