From b63e46e686fd8e55883331b189dd40c94f84a6d3 Mon Sep 17 00:00:00 2001 From: jparisu <69341543+jparisu@users.noreply.github.com> Date: Mon, 22 Nov 2021 12:34:12 +0100 Subject: [PATCH] Discovery Server fix reconnection (#2246) * Refs #12522: Fix minor ds errors Signed-off-by: jparisu * Refs #12522: fix typo Signed-off-by: jparisu * Refs #12522: apply suggestions Signed-off-by: jparisu * Refs #12522: Client reconnection fix Signed-off-by: jparisu * Refs #12522: apply suggestions to fix comment Signed-off-by: jparisu * Refs #12522: uncrustify Signed-off-by: jparisu * Refs #12522: Apply new fix Signed-off-by: jparisu (cherry picked from commit 436826000cfa272fa2098b1f400439fedefc0801) # Conflicts: # src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp # src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp --- .../builtin/discovery/participant/PDPClient.h | 10 + .../discovery/database/DiscoveryDataBase.cpp | 53 ++-- .../discovery/participant/PDPClient.cpp | 265 ++++++++++++++++++ .../participant/PDPServerListener.cpp | 213 ++++++++++++++ 4 files changed, 512 insertions(+), 29 deletions(-) diff --git a/include/fastdds/rtps/builtin/discovery/participant/PDPClient.h b/include/fastdds/rtps/builtin/discovery/participant/PDPClient.h index 83f16005c51..24e1f52fc95 100644 --- a/include/fastdds/rtps/builtin/discovery/participant/PDPClient.h +++ b/include/fastdds/rtps/builtin/discovery/participant/PDPClient.h @@ -119,6 +119,16 @@ class PDPClient : public PDP void notifyAboveRemoteEndpoints( const ParticipantProxyData& pdata) override; + /** + * This method removes a remote RTPSParticipant and all its writers and readers. + * @param participant_guid GUID_t of the remote RTPSParticipant. + * @param reason Why the participant is being removed (dropped vs removed) + * @return true if correct. + */ + bool remove_remote_participant( + const GUID_t& participant_guid, + ParticipantDiscoveryInfo::DISCOVERY_STATUS reason) override; + /** * Matching server EDP endpoints * @return true if all servers have been discovered diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp index ef4f1a7c87b..4c83cc7280b 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp @@ -855,7 +855,7 @@ void DiscoveryDataBase::create_writers_from_change_( } else { - logError(DISCOVERY_DATABASE, "Writer " << writer_guid << " as no associated participant. Skipping"); + logError(DISCOVERY_DATABASE, "Writer " << writer_guid << " has no associated participant. Skipping"); return; } @@ -972,7 +972,7 @@ void DiscoveryDataBase::create_readers_from_change_( } else { - logError(DISCOVERY_DATABASE, "Writer " << reader_guid << " as no associated participant. Skipping"); + logError(DISCOVERY_DATABASE, "Reader " << reader_guid << " has no associated participant. Skipping"); return; } @@ -1704,10 +1704,15 @@ void DiscoveryDataBase::AckedFunctor::operator () ( { if (reader_proxy->guid().guidPrefix == *it) { - // If the participant is already in the DB it means it has answered to the pinging - // or that is pinging us and we have already received its DATA(p) - // If neither of both has happenned we should not wait for it to ack this data, so we - // skip it and leave it as acked + /* + * If the participant is already in the DB it means it has answered to the pinging + * or that is pinging us and we have already received its DATA(p) + * If neither has happenned (participant is not in DB) + * we should not wait for it to ack this data, or it could get stucked in an endless loop + * (this Remote Server could not exist and/or never be discovered) + * Nevertheless, the ack is still pending for this participant and once it is discovered this + * data will be sent again + */ auto remote_server_it = db_->participants_.find(*it); if (remote_server_it == db_->participants_.end()) { @@ -1715,6 +1720,8 @@ void DiscoveryDataBase::AckedFunctor::operator () ( "check as acked for " << reader_proxy->guid() << " as it has not answered pinging yet"); return; } + + break; } } @@ -1731,32 +1738,20 @@ void DiscoveryDataBase::unmatch_participant_( { logInfo(DISCOVERY_DATABASE, "unmatching participant: " << guid_prefix); - auto pit = participants_.find(guid_prefix); - if (pit == participants_.end()) + // For each participant remove it + // IMPORTANT: This is not for every relevant participant, as participant A could be in other participant's B info + // and B not be relevant for A. So it must be done for every Participant. + for (auto& participant_it : participants_) { - logWarning(DISCOVERY_DATABASE, - "Attempting to unmatch an unexisting participant: " << guid_prefix); + participant_it.second.remove_participant(guid_prefix); } - - // For each relevant participant make not relevant - for (eprosima::fastrtps::rtps::GuidPrefix_t relevant_participant : pit->second.relevant_participants()) + for (auto& writer_it : writers_) { - if (relevant_participant != guid_prefix) - { - auto rpit = participants_.find(relevant_participant); - if (rpit == participants_.end()) - { - // This is not an error. Remote participants will try to unmatch with participants even - // when the match is not reciprocal - logInfo(DISCOVERY_DATABASE, - "Participant " << relevant_participant << " matched with an unexisting participant: " << - guid_prefix); - } - else - { - rpit->second.remove_participant(guid_prefix); - } - } + writer_it.second.remove_participant(guid_prefix); + } + for (auto& reader_it : readers_) + { + reader_it.second.remove_participant(guid_prefix); } } diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index e7d78c6a36b..b550e074c2a 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -589,6 +589,271 @@ bool PDPClient::match_servers_EDP_endpoints() return all; } +<<<<<<< HEAD +======= +void PDPClient::update_remote_servers_list() +{ + if (!mp_PDPReader || !mp_PDPWriter) + { + logError(SERVER_CLIENT_DISCOVERY, "Cannot update server list within an uninitialized Client"); + return; + } + + std::lock_guard lock(*getMutex()); + + for (const eprosima::fastdds::rtps::RemoteServerAttributes& it : mp_builtin->m_DiscoveryServers) + { + if (!mp_PDPReader->matched_writer_is_matched(it.GetPDPWriter())) + { + match_pdp_writer_nts_(it); + } + + if (!mp_PDPWriter->matched_reader_is_matched(it.GetPDPReader())) + { + match_pdp_reader_nts_(it); + } + } + mp_sync->restart_timer(); +} + +void PDPClient::match_pdp_writer_nts_( + const eprosima::fastdds::rtps::RemoteServerAttributes& server_att) +{ + std::lock_guard data_guard(temp_data_lock_); + const NetworkFactory& network = mp_RTPSParticipant->network_factory(); + temp_writer_data_.clear(); + temp_writer_data_.guid(server_att.GetPDPWriter()); + temp_writer_data_.set_multicast_locators(server_att.metatrafficMulticastLocatorList, network); + temp_writer_data_.set_remote_unicast_locators(server_att.metatrafficUnicastLocatorList, network); + temp_writer_data_.m_qos.m_durability.kind = TRANSIENT_DURABILITY_QOS; + temp_writer_data_.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; + mp_PDPReader->matched_writer_add(temp_writer_data_); +} + +void PDPClient::match_pdp_reader_nts_( + const eprosima::fastdds::rtps::RemoteServerAttributes& server_att) +{ + std::lock_guard data_guard(temp_data_lock_); + const NetworkFactory& network = mp_RTPSParticipant->network_factory(); + temp_reader_data_.clear(); + temp_reader_data_.guid(server_att.GetPDPReader()); + temp_reader_data_.set_multicast_locators(server_att.metatrafficMulticastLocatorList, network); + temp_reader_data_.set_remote_unicast_locators(server_att.metatrafficUnicastLocatorList, network); + temp_reader_data_.m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; + temp_reader_data_.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; + mp_PDPWriter->matched_reader_add(temp_reader_data_); +} + +const std::string& ros_discovery_server_env() +{ + static std::string servers; + SystemInfo::get_env(DEFAULT_ROS2_MASTER_URI, servers); + return servers; +} + +bool load_environment_server_info( + RemoteServerList_t& attributes) +{ + return load_environment_server_info(ros_discovery_server_env(), attributes); +} + +bool load_environment_server_info( + std::string list, + RemoteServerList_t& attributes) +{ + attributes.clear(); + if (list.empty()) + { + return true; + } + + /* Parsing ancillary regex */ + // Address should be :. We do not need to verify that the first part + // is an IPv4 address, as it is done latter. + const std::regex ROS2_ADDRESS_PATTERN(R"(^([A-Za-z0-9-.]+)?:?(?:(\d+))?$)"); + const std::regex ROS2_SERVER_LIST_PATTERN(R"(([^;]*);?)"); + + try + { + // Do the parsing and populate the list + RemoteServerAttributes server_att; + Locator_t server_locator(LOCATOR_KIND_UDPv4, DEFAULT_ROS2_SERVER_PORT); + int server_id = 0; + + std::sregex_iterator server_it( + list.begin(), + list.end(), + ROS2_SERVER_LIST_PATTERN, + std::regex_constants::match_not_null); + + while (server_it != std::sregex_iterator()) + { + const std::smatch::value_type sm = *++(server_it->cbegin()); + + if (sm.matched) + { + // now we must parse the inner expression + std::smatch mr; + std::string locator(sm); + if (std::regex_match(locator, mr, ROS2_ADDRESS_PATTERN, std::regex_constants::match_not_null)) + { + std::smatch::iterator it = mr.cbegin(); + + while (++it != mr.cend()) + { + std::string address = it->str(); + + // Check whether the address is IPv4 + if (!IPLocator::isIPv4(address)) + { + auto response = rtps::IPLocator::resolveNameDNS(address); + + // Add the first valid IPv4 address that we can find + if (response.first.size() > 0) + { + address = response.first.begin()->data(); + } + } + + if (!IPLocator::setIPv4(server_locator, address)) + { + std::stringstream ss; + ss << "Wrong ipv4 address passed into the server's list " << address; + throw std::invalid_argument(ss.str()); + } + + if (IPLocator::isAny(server_locator)) + { + // A server cannot be reach in all interfaces, it's clearly a localhost call + IPLocator::setIPv4(server_locator, "127.0.0.1"); + } + + if (++it != mr.cend()) + { + // reset the locator to default + IPLocator::setPhysicalPort(server_locator, DEFAULT_ROS2_SERVER_PORT); + + if (it->matched) + { + // note stoi throws also an invalid_argument + int port = stoi(it->str()); + + if (port > std::numeric_limits::max()) + { + throw std::out_of_range("Too large udp port passed into the server's list"); + } + + if (!IPLocator::setPhysicalPort(server_locator, static_cast(port))) + { + std::stringstream ss; + ss << "Wrong udp port passed into the server's list " << it->str(); + throw std::invalid_argument(ss.str()); + } + } + } + } + + // add the server to the list + if (!get_server_client_default_guidPrefix(server_id, server_att.guidPrefix)) + { + throw std::invalid_argument("The maximum number of default discovery servers has been reached"); + } + + server_att.metatrafficUnicastLocatorList.clear(); + server_att.metatrafficUnicastLocatorList.push_back(server_locator); + attributes.push_back(server_att); + } + else + { + if (!locator.empty()) + { + std::stringstream ss; + ss << "Wrong locator passed into the server's list " << locator; + throw std::invalid_argument(ss.str()); + } + // else: it's intencionally empty to hint us to ignore this server + } + } + // advance to the next server if any + ++server_it; + ++server_id; + } + + // Check for server info + if (attributes.empty()) + { + throw std::invalid_argument("No default server locators were provided."); + } + } + catch (std::exception& e) + { + logError(SERVER_CLIENT_DISCOVERY, e.what()); + attributes.clear(); + return false; + } + + return true; +} + +GUID_t RemoteServerAttributes::GetParticipant() const +{ + return GUID_t(guidPrefix, c_EntityId_RTPSParticipant); +} + +GUID_t RemoteServerAttributes::GetPDPReader() const +{ + return GUID_t(guidPrefix, c_EntityId_SPDPReader); +} + +GUID_t RemoteServerAttributes::GetPDPWriter() const +{ + return GUID_t(guidPrefix, c_EntityId_SPDPWriter); +} + +bool get_server_client_default_guidPrefix( + int id, + GuidPrefix_t& guid) +{ + if ( id >= 0 + && id < 256 + && std::istringstream(DEFAULT_ROS2_SERVER_GUIDPREFIX) >> guid) + { + // Third octet denotes the server id + guid.value[2] = static_cast(id); + + return true; + } + + return false; +} + +bool PDPClient::remove_remote_participant( + const GUID_t& partGUID, + ParticipantDiscoveryInfo::DISCOVERY_STATUS reason) +{ + if (PDP::remove_remote_participant(partGUID, reason)) + { + // If it works fine, return + return true; + } + + // Erase Proxies created before having the Participant + GUID_t wguid; + wguid.guidPrefix = partGUID.guidPrefix; + wguid.entityId = c_EntityId_SPDPWriter; + mp_PDPReader->matched_writer_remove(wguid); + + GUID_t rguid; + rguid.guidPrefix = partGUID.guidPrefix; + rguid.entityId = c_EntityId_SPDPReader; + mp_PDPWriter->matched_reader_remove(rguid); + + update_remote_servers_list(); + + return false; +} + +>>>>>>> 436826000 (Discovery Server fix reconnection (#2246)) } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp index cf939930d8f..c3a30ac6b04 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp @@ -53,7 +53,36 @@ void PDPServerListener::onNewCacheChangeAdded( RTPSReader* reader, const CacheChange_t* const change_in) { +<<<<<<< HEAD CacheChange_t* change = (CacheChange_t*)(change_in); +======= + logInfo(RTPS_PDP_LISTENER, ""); + logInfo(RTPS_PDP_LISTENER, "------------------ PDP SERVER LISTENER START ------------------"); + logInfo(RTPS_PDP_LISTENER, + "-------------------- " << pdp_server()->mp_RTPSParticipant->getGuid() << + " --------------------"); + logInfo(RTPS_PDP_LISTENER, "PDP Server Message received: " << change_in->instanceHandle); + + // Get PDP reader history + auto pdp_history = pdp_server()->mp_PDPReaderHistory; + // Get PDP reader to release change + auto pdp_reader = pdp_server()->mp_PDPReader; + + bool routine_should_be_awake = false; + + // Create a delete function to clear the data associated with the unique pointer in case the change is not passed + // to the database. + auto deleter = [pdp_history](CacheChange_t* p) + { + // Remove change from reader history, returning it to the pool + pdp_history->remove_change(p); + }; + + // Unique pointer to the change + std::unique_ptr change((CacheChange_t*)(change_in), deleter); + + // Get GUID of the writer that sent the change +>>>>>>> 436826000 (Discovery Server fix reconnection (#2246)) GUID_t writer_guid = change->writerGUID; logInfo(RTPS_PDP, "SPDP Message received"); @@ -98,8 +127,155 @@ void PDPServerListener::onNewCacheChangeAdded( parent_pdp_->getRTPSParticipant()->network_factory(), parent_pdp_->getRTPSParticipant()->has_shm_transport())) { +<<<<<<< HEAD change->instanceHandle = local_data.m_key; guid = local_data.m_guid; +======= + /* Check PID_VENDOR_ID */ + if (participant_data.m_VendorId != fastrtps::rtps::c_VendorId_eProsima) + { + logInfo(RTPS_PDP_LISTENER, + "DATA(p|Up) from different vendor is not supported for Discover-Server operation"); + return; + } + + fastrtps::ParameterPropertyList_t properties = participant_data.m_properties; + + /* Check DS_VERSION */ + auto ds_version = std::find_if( + properties.begin(), + properties.end(), + [](const dds::ParameterProperty_t& property) + { + return property.first() == dds::parameter_property_ds_version; + }); + + if (ds_version != properties.end()) + { + if (std::stof(ds_version->second()) < 1.0) + { + logError(RTPS_PDP_LISTENER, "Minimum " << dds::parameter_property_ds_version + << " is 1.0, found: " << ds_version->second()); + return; + } + logInfo(RTPS_PDP_LISTENER, "Participant " << dds::parameter_property_ds_version << ": " + << ds_version->second()); + } + else + { + logInfo(RTPS_PDP_LISTENER, dds::parameter_property_ds_version << " is not set. Assuming 1.0"); + } + + /* Check PARTICIPANT_TYPE */ + bool is_client = true; + auto participant_type = std::find_if( + properties.begin(), + properties.end(), + [](const dds::ParameterProperty_t& property) + { + return property.first() == dds::parameter_property_participant_type; + }); + + if (participant_type != properties.end()) + { + if (participant_type->second() == ParticipantType::SERVER || + participant_type->second() == ParticipantType::BACKUP || + participant_type->second() == ParticipantType::SUPER_CLIENT) + { + is_client = false; + } + else if (participant_type->second() == ParticipantType::SIMPLE) + { + logInfo(RTPS_PDP_LISTENER, "Ignoring " << dds::parameter_property_participant_type << ": " + << participant_type->second()); + return; + } + else if (participant_type->second() != ParticipantType::CLIENT) + { + logError(RTPS_PDP_LISTENER, "Wrong " << dds::parameter_property_participant_type << ": " + << participant_type->second()); + return; + } + logInfo(RTPS_PDP_LISTENER, "Participant type " << participant_type->second()); + } + else + { + logInfo(RTPS_PDP_LISTENER, dds::parameter_property_participant_type << " is not set"); + // Fallback to checking whether participant is a SERVER looking for the persistence GUID + auto persistence_guid = std::find_if( + properties.begin(), + properties.end(), + [](const dds::ParameterProperty_t& property) + { + return property.first() == dds::parameter_property_persistence_guid; + }); + // The presence of persistence GUID property suggests a SERVER. This assumption is made to keep + // backwards compatibility with Discovery Server v1.0. However, any participant that has been configured + // as persistent will have this property. + if (persistence_guid != properties.end()) + { + is_client = false; + } + logInfo(RTPS_PDP_LISTENER, "Participant is client: " << std::boolalpha << is_client); + } + + // Check whether the participant is a client/server of this server or if it has been forwarded from + // another entity (server). + // is_local means that the server is connected (or will be) with this entity directly + bool is_local = true; + + // In case a new changes arrives from a local entity, but the ParticipantProxyData already exists + // because we know it from other server + bool was_local = true; + + // If the instance handle is different from the writer GUID, then the change has been relayed + if (iHandle2GUID(change->instanceHandle).guidPrefix != change->writerGUID.guidPrefix) + { + is_local = false; + } + else + { + // We already know that the writer and the entity are the same, so we can use writerGUID + was_local = pdp_server()->discovery_db().is_participant_local(change->writerGUID.guidPrefix); + } + + if (!pdp_server()->discovery_db().backup_in_progress()) + { + // Notify the DiscoveryDataBase + if (pdp_server()->discovery_db().update( + change.get(), + ddb::DiscoveryParticipantChangeData( + participant_data.metatraffic_locators, + is_client, + is_local))) + { + // Remove change from PDP reader history, but do not return it to the pool. From here on, the discovery + // database takes ownership of the CacheChange_t. Henceforth there are no references to the change. + // Take change ownership away from the unique pointer, so that its destruction does not destroy the data + pdp_history->remove_change(pdp_history->find_change(change.release()), false); + + // Ensure processing time for the cache by triggering the Server thread (which process the updates) + // The server does not have to postpone the execution of the routine if a change is received, i.e. + // the server routine is triggered instantly as the default value of the interval that the server has + // to wait is 0. + routine_should_be_awake = true; + + // TODO: when the DiscoveryDataBase allows updating capabilities we can dismissed old PDP processing + } + else + { + // If the database doesn't take the ownership, then return the CacheChante_t to the pool. + pdp_reader->releaseCache(change.release()); + } + + } + else + { + // Release the unique pointer, not the change in the pool + change.release(); + } + +>>>>>>> 436826000 (Discovery Server fix reconnection (#2246)) // At this point we can release reader lock. reader->getMutex().unlock(); @@ -183,10 +359,47 @@ void PDPServerListener::onNewCacheChangeAdded( if (!parent_pdp_->lookup_participant_key(guid, key)) { +<<<<<<< HEAD logWarning(RTPS_PDP, "PDPServerListener received DATA(p) NOT_ALIVE_DISPOSED from unknown participant"); parent_pdp_->mp_PDPReaderHistory->remove_change(change); return; } +======= + // Ensure processing time for the cache by triggering the Server thread (which process the updates + // The server does not have to postpone the execution of the routine if a change is received, i.e. + // the server routine is triggered instantly as the default value of the interval that the server has + // to wait is 0. + routine_should_be_awake = true; + + // From here on, the discovery database takes ownership of the CacheChange_t. Henceforth there are no + // references to the change. Take change ownership away from the unique pointer, so that its destruction + // does not destroy the data + change.release(); + } + + // Remove participant from proxies + reader->getMutex().unlock(); + pdp_server()->remove_remote_participant(guid, ParticipantDiscoveryInfo::REMOVED_PARTICIPANT); + reader->getMutex().lock(); + } + + /* + * Awake routine thread if needed. + * Thread is awaken at the end of the listener as it is required to have created the Proxies before + * the data is processed and the new messages added to history. + * If not, could happen that a message is added to history in order to be sent to a relevant participant, and + * this Participant still not have a ReaderProxy associated, and so it will miss the message and it wont be + * sent again (because if there are no changes PDP is no sent again). + */ + if (routine_should_be_awake) + { + pdp_server()->awake_routine_thread(); + } + + // cache is removed from history (if it's still there) and returned to the pool on leaving the scope, since the + // unique pointer destruction grants it. If the ownership has been taken away from the unique pointer, then nothing + // happens at this point +>>>>>>> 436826000 (Discovery Server fix reconnection (#2246)) if (parent_pdp_->remove_remote_participant(guid, ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)) {