Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow DomainParticipantQos updates for UserDataQosPolicy and adding new servers to the remote server list #2142

Merged
merged 5 commits into from
Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/fastdds/dds/core/policy/QosPolicies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class QosPolicy
{
public:

//! Boolean that indicates if the Qos has been changed
//! Boolean that indicates if the Qos has been changed with respect to the default Qos.
bool hasChanged;

/**
Expand Down
8 changes: 8 additions & 0 deletions include/fastdds/rtps/participant/RTPSParticipant.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ class RTPS_DllAPI RTPSParticipant
const TopicAttributes& topicAtt,
const ReaderQos& rqos);

/**
* Update participant attributes.
* @param patt New participant attributes.
* @return True on success, false otherwise.
*/
void update_attributes(
const RTPSParticipantAttributes& patt);

/**
* Update writer QOS
* @param Writer to update
Expand Down
116 changes: 111 additions & 5 deletions src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>

#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastdds/rtps/RTPSDomain.h>
#include <fastdds/rtps/builtin/liveliness/WLP.h>
#include <fastdds/rtps/participant/ParticipantDiscoveryInfo.h>
Expand Down Expand Up @@ -343,7 +344,15 @@ ReturnCode_t DomainParticipantImpl::set_qos(
{
return ReturnCode_t::RETCODE_IMMUTABLE_POLICY;
}
set_qos(qos_, qos_to_set, !enabled);

if (set_qos(qos_, qos_to_set, !enabled) && enabled)
{
// Notify the participant that there is a QoS update
fastrtps::rtps::RTPSParticipantAttributes patt;
set_attributes_from_qos(patt, qos_);
rtps_participant_->update_attributes(patt);
}

return ReturnCode_t::RETCODE_OK;
}

Expand Down Expand Up @@ -1733,11 +1742,13 @@ bool DomainParticipantImpl::has_active_entities()
return false;
}

void DomainParticipantImpl::set_qos(
bool DomainParticipantImpl::set_qos(
DomainParticipantQos& to,
const DomainParticipantQos& from,
bool first_time)
{
bool qos_should_be_updated = false;

if (!(to.entity_factory() == from.entity_factory()))
{
to.entity_factory() = from.entity_factory();
Expand All @@ -1746,6 +1757,10 @@ void DomainParticipantImpl::set_qos(
{
to.user_data() = from.user_data();
to.user_data().hasChanged = true;
if (!first_time)
{
qos_should_be_updated = true;
}
}
if (first_time && !(to.allocation() == from.allocation()))
{
Expand All @@ -1755,9 +1770,14 @@ void DomainParticipantImpl::set_qos(
{
to.properties() = from.properties();
}
if (first_time && !(to.wire_protocol() == from.wire_protocol()))
if (!(to.wire_protocol() == from.wire_protocol()))
{
to.wire_protocol() = from.wire_protocol();
to.wire_protocol().hasChanged = true;
if (!first_time)
{
qos_should_be_updated = true;
}
}
if (first_time && !(to.transport() == from.transport()))
{
Expand All @@ -1767,6 +1787,8 @@ void DomainParticipantImpl::set_qos(
{
to.name() = from.name();
}

return qos_should_be_updated;
}

fastrtps::types::ReturnCode_t DomainParticipantImpl::check_qos(
Expand Down Expand Up @@ -1797,8 +1819,92 @@ bool DomainParticipantImpl::can_qos_be_updated(
}
if (!(to.wire_protocol() == from.wire_protocol()))
{
updatable = false;
logWarning(RTPS_QOS_CHECK, "WireProtocolConfigQos cannot be changed after the participant is enabled");
// Check that the only modification was in wire_protocol().discovery_config.m_DiscoveryServers
if ((to.wire_protocol().builtin.discovery_config.m_DiscoveryServers ==
from.wire_protocol().builtin.discovery_config.m_DiscoveryServers) ||
(!(to.wire_protocol().builtin.discovery_config.m_DiscoveryServers ==
from.wire_protocol().builtin.discovery_config.m_DiscoveryServers) &&
(!(to.wire_protocol().prefix == from.wire_protocol().prefix) ||
!(to.wire_protocol().participant_id == from.wire_protocol().participant_id) ||
!(to.wire_protocol().port == from.wire_protocol().port) ||
!(to.wire_protocol().throughput_controller == from.wire_protocol().throughput_controller) ||
!(to.wire_protocol().default_unicast_locator_list ==
from.wire_protocol().default_unicast_locator_list) ||
!(to.wire_protocol().default_multicast_locator_list ==
from.wire_protocol().default_multicast_locator_list) ||
!(to.wire_protocol().builtin.use_WriterLivelinessProtocol ==
from.wire_protocol().builtin.use_WriterLivelinessProtocol) ||
!(to.wire_protocol().builtin.typelookup_config.use_client ==
from.wire_protocol().builtin.typelookup_config.use_client) ||
!(to.wire_protocol().builtin.typelookup_config.use_server ==
from.wire_protocol().builtin.typelookup_config.use_server) ||
!(to.wire_protocol().builtin.metatrafficUnicastLocatorList ==
from.wire_protocol().builtin.metatrafficUnicastLocatorList) ||
!(to.wire_protocol().builtin.metatrafficMulticastLocatorList ==
from.wire_protocol().builtin.metatrafficMulticastLocatorList) ||
!(to.wire_protocol().builtin.initialPeersList == from.wire_protocol().builtin.initialPeersList) ||
!(to.wire_protocol().builtin.readerHistoryMemoryPolicy ==
from.wire_protocol().builtin.readerHistoryMemoryPolicy) ||
!(to.wire_protocol().builtin.readerPayloadSize == from.wire_protocol().builtin.readerPayloadSize) ||
!(to.wire_protocol().builtin.writerHistoryMemoryPolicy ==
from.wire_protocol().builtin.writerHistoryMemoryPolicy) ||
!(to.wire_protocol().builtin.writerPayloadSize == from.wire_protocol().builtin.writerPayloadSize) ||
!(to.wire_protocol().builtin.mutation_tries == from.wire_protocol().builtin.mutation_tries) ||
!(to.wire_protocol().builtin.avoid_builtin_multicast ==
from.wire_protocol().builtin.avoid_builtin_multicast) ||
!(to.wire_protocol().builtin.discovery_config.discoveryProtocol ==
from.wire_protocol().builtin.discovery_config.discoveryProtocol) ||
!(to.wire_protocol().builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol ==
from.wire_protocol().builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol) ||
!(to.wire_protocol().builtin.discovery_config.use_STATIC_EndpointDiscoveryProtocol ==
from.wire_protocol().builtin.discovery_config.use_STATIC_EndpointDiscoveryProtocol) ||
!(to.wire_protocol().builtin.discovery_config.discoveryServer_client_syncperiod ==
from.wire_protocol().builtin.discovery_config.discoveryServer_client_syncperiod) ||
!(to.wire_protocol().builtin.discovery_config.m_PDPfactory ==
from.wire_protocol().builtin.discovery_config.m_PDPfactory) ||
!(to.wire_protocol().builtin.discovery_config.leaseDuration ==
from.wire_protocol().builtin.discovery_config.leaseDuration) ||
!(to.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod ==
from.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod) ||
!(to.wire_protocol().builtin.discovery_config.initial_announcements ==
from.wire_protocol().builtin.discovery_config.initial_announcements) ||
!(to.wire_protocol().builtin.discovery_config.m_simpleEDP ==
from.wire_protocol().builtin.discovery_config.m_simpleEDP) ||
!(strcmp(to.wire_protocol().builtin.discovery_config.static_edp_xml_config(),
from.wire_protocol().builtin.discovery_config.static_edp_xml_config()) == 0) ||
!(to.wire_protocol().builtin.discovery_config.ignoreParticipantFlags ==
from.wire_protocol().builtin.discovery_config.ignoreParticipantFlags))))
{
updatable = false;
logWarning(RTPS_QOS_CHECK, "WireProtocolConfigQos cannot be changed after the participant is enabled, "
<< "with the exception of builtin.discovery_config.m_DiscoveryServers");
}
else
{
// This means that the only change is in wire_protocol().builtin.discovery_config.m_DiscoveryServers
// In that case, we need to ensure that the current list (to) is strictly contained in the incoming
// list (from). For that, we check that every server in the current list (to) is also in the incoming one
// (from)
for (auto existing_server : to.wire_protocol().builtin.discovery_config.m_DiscoveryServers)
{
bool contained = false;
for (auto incoming_server : from.wire_protocol().builtin.discovery_config.m_DiscoveryServers)
{
if (existing_server.guidPrefix == incoming_server.guidPrefix)
{
contained = true;
break;
}
}
if (!contained)
{
updatable = false;
logWarning(RTPS_QOS_CHECK,
"Discovery Servers cannot be removed from the list; they can only be added");
break;
}
}
}
}
if (!(to.transport() == from.transport()))
{
Expand Down
12 changes: 11 additions & 1 deletion src/cpp/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,17 @@ class DomainParticipantImpl
std::string get_inner_type_name(
const fastrtps::rtps::SampleIdentity& id) const;

static void set_qos(
/**
* Set the DomainParticipantQos checking if the Qos can be updated or not
*
* @param to DomainParticipantQos to be updated
* @param from DomainParticipantQos desired
* @param first_time Whether the DomainParticipant has been already initialized or not
*
* @return true if there has been a changed in one of the attributes that can be updated.
* false otherwise.
*/
static bool set_qos(
DomainParticipantQos& to,
const DomainParticipantQos& from,
bool first_time);
Expand Down
1 change: 1 addition & 0 deletions src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ RTPSParticipant* RTPSDomain::clientServerEnvironmentCreationOverride(
RTPSParticipantAttributes client_att(att);

// Retrieve the info from the environment variable
// TODO(jlbueno) This should be protected with the PDP mutex.
if (!load_environment_server_info(client_att.builtin.discovery_config.m_DiscoveryServers))
{
// it's not an error, the environment variable may not be set. Any issue with environment
Expand Down
5 changes: 5 additions & 0 deletions src/cpp/rtps/builtin/BuiltinProtocols.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ bool BuiltinProtocols::initBuiltinProtocols(
m_metatrafficUnicastLocatorList = m_att.metatrafficUnicastLocatorList;
m_metatrafficMulticastLocatorList = m_att.metatrafficMulticastLocatorList;
m_initialPeersList = m_att.initialPeersList;

// TODO(jlbueno) The access to the list should be protected with the PDP mutex but requires a refactor on PDPClient
// and PDPServer: read the remote servers list after the initialization of PDP.
m_DiscoveryServers = m_att.discovery_config.m_DiscoveryServers;

transform_server_remote_locators(p_part->network_factory());
Expand Down Expand Up @@ -160,6 +163,8 @@ bool BuiltinProtocols::updateMetatrafficLocators(
void BuiltinProtocols::transform_server_remote_locators(
NetworkFactory& nf)
{
// TODO(jlbueno) The access to the list should be protected with the PDP mutex but requires a refactor on PDPClient
// and PDPServer: read the remote servers list after the initialization of PDP.
for (eprosima::fastdds::rtps::RemoteServerAttributes& rs : m_DiscoveryServers)
{
for (Locator_t& loc : rs.metatrafficUnicastLocatorList)
Expand Down
7 changes: 4 additions & 3 deletions src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

#include <mutex>
#include <set>

#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/common/EntityId_t.hpp>
Expand All @@ -37,7 +38,7 @@ namespace ddb {

DiscoveryDataBase::DiscoveryDataBase(
fastrtps::rtps::GuidPrefix_t server_guid_prefix,
std::vector<fastrtps::rtps::GuidPrefix_t> servers)
std::set<fastrtps::rtps::GuidPrefix_t> servers)
: server_guid_prefix_(server_guid_prefix)
, server_acked_by_all_(servers.size() == 0)
, servers_(servers)
Expand Down Expand Up @@ -65,7 +66,7 @@ void DiscoveryDataBase::add_server(
fastrtps::rtps::GuidPrefix_t server)
{
logInfo(DISCOVERY_DATABASE, "Server " << server << " added");
servers_.push_back(server);
servers_.insert(server);
}

std::vector<fastrtps::rtps::CacheChange_t*> DiscoveryDataBase::clear()
Expand Down Expand Up @@ -1716,7 +1717,7 @@ void DiscoveryDataBase::AckedFunctor::operator () (
{
// If the reader proxy is from a server that we are pinging, we may not want to wait
// for it to be acked as the routine will not stop
for (auto it = db_->servers_.begin(); it < db_->servers_.end(); ++it)
for (auto it = db_->servers_.begin(); it != db_->servers_.end(); ++it)
{
if (reader_proxy->guid().guidPrefix == *it)
{
Expand Down
11 changes: 6 additions & 5 deletions src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
#ifndef _FASTDDS_RTPS_DISCOVERY_DATABASE_H_
#define _FASTDDS_RTPS_DISCOVERY_DATABASE_H_

#include <vector>
#include <fstream>
#include <iostream>
#include <map>
#include <mutex>
#include <iostream>
#include <fstream>
#include <set>
#include <vector>

#include <fastrtps/utils/fixed_size_string.hpp>
#include <fastdds/rtps/writer/ReaderProxy.h>
Expand Down Expand Up @@ -117,7 +118,7 @@ class DiscoveryDataBase

DiscoveryDataBase(
fastrtps::rtps::GuidPrefix_t server_guid_prefix,
std::vector<fastrtps::rtps::GuidPrefix_t> servers);
std::set<fastrtps::rtps::GuidPrefix_t> servers);

~DiscoveryDataBase();

Expand Down Expand Up @@ -562,7 +563,7 @@ class DiscoveryDataBase
std::atomic<bool> server_acked_by_all_;

//! List of GUID prefixes of the remote servers
std::vector<fastrtps::rtps::GuidPrefix_t> servers_;
std::set<fastrtps::rtps::GuidPrefix_t> servers_;

// The virtual topic associated with virtual writers and readers
const std::string virtual_topic_ = "eprosima_server_virtual_topic";
Expand Down
1 change: 1 addition & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,7 @@ ParticipantProxyData* PDP::get_participant_proxy_data(

std::list<eprosima::fastdds::rtps::RemoteServerAttributes>& PDP::remote_server_attributes()
{
std::unique_lock<std::recursive_mutex> lock(*getMutex());
return mp_builtin->m_DiscoveryServers;
}

Expand Down
Loading