diff --git a/ddspipe_core/include/ddspipe_core/types/dds/CustomTransport.hpp b/ddspipe_core/include/ddspipe_core/types/dds/CustomTransport.hpp new file mode 100644 index 00000000..f705f374 --- /dev/null +++ b/ddspipe_core/include/ddspipe_core/types/dds/CustomTransport.hpp @@ -0,0 +1,43 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// limitations under the License. + +#pragma once + +namespace eprosima { +namespace ddspipe { +namespace core { +namespace types { + +//! Different options for transport configuration +enum class TransportDescriptors +{ + builtin, + udp_only, + shm_only +}; + +//! Possible values for Ignore Participant Flags +enum class IgnoreParticipantFlags +{ + no_filter, + filter_different_host, + filter_different_process, + filter_same_process, + filter_different_and_same_process +}; + +} /* namespace types */ +} /* namespace core */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp b/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp index af259e50..37fe7def 100644 --- a/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp +++ b/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp @@ -14,9 +14,11 @@ #pragma once +#include +#include #include #include -#include +#include namespace eprosima { namespace ddspipe { @@ -48,6 +50,12 @@ struct SimpleParticipantConfiguration : public ParticipantConfiguration ///////////////////////// core::types::DomainId domain {0u}; + + std::set whitelist {}; + + core::types::TransportDescriptors transport {core::types::TransportDescriptors::builtin}; + + core::types::IgnoreParticipantFlags ignore_participant_flags {core::types::IgnoreParticipantFlags::no_filter}; }; } /* namespace participants */ diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp new file mode 100644 index 00000000..a03fea8b --- /dev/null +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp @@ -0,0 +1,77 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include + +#include +#include +#include + +#include +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { +namespace dds { + +/** + * TODO comment + */ +class DdsCommonParticipant : public core::IParticipant, public eprosima::fastdds::dds::DomainParticipantListener +{ +public: + + DDSPIPE_PARTICIPANTS_DllAPI + virtual ~DdsCommonParticipant(); + + DDSPIPE_PARTICIPANTS_DllAPI + virtual core::types::ParticipantId id() const noexcept override; + + DDSPIPE_PARTICIPANTS_DllAPI + virtual bool is_rtps_kind() const noexcept override; + + // NOTE: This should not be initialized here, but it is just for simplicity and less code in childs + DDSPIPE_PARTICIPANTS_DllAPI + virtual bool is_repeater() const noexcept override; + + DDSPIPE_PARTICIPANTS_DllAPI + virtual void init(); + +protected: + + // TODO + DDSPIPE_PARTICIPANTS_DllAPI + DdsCommonParticipant( + const std::shared_ptr& participant_configuration); + + eprosima::fastdds::dds::DomainParticipant* dds_participant_; + eprosima::fastdds::dds::Publisher* dds_publisher_; + eprosima::fastdds::dds::Subscriber* dds_subscriber_; + + core::types::ParticipantId id_; + + std::shared_ptr participant_configuration_; +}; + +} /* namespace dds */ +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesPublicationParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesPublicationParticipant.hpp new file mode 100644 index 00000000..a086cb1c --- /dev/null +++ b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesPublicationParticipant.hpp @@ -0,0 +1,104 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { + +/** + * TODO comment + */ +class DynTypesPublicationParticipant : public dds::DdsCommonParticipant +{ +public: + + // TODO + DDSPIPE_PARTICIPANTS_DllAPI + DynTypesPublicationParticipant( + const std::shared_ptr& participant_configuration, + const std::shared_ptr& discovery_database); + + DDSPIPE_PARTICIPANTS_DllAPI + virtual ~DynTypesPublicationParticipant(); + + DDSPIPE_PARTICIPANTS_DllAPI + std::shared_ptr create_writer( + const core::ITopic& topic) override; + + DDSPIPE_PARTICIPANTS_DllAPI + std::shared_ptr create_reader( + const core::ITopic& topic) override; + +protected: + + DDSPIPE_PARTICIPANTS_DllAPI + utils::ReturnCode receive_type_object_( + eprosima::fastrtps::types::DynamicType_ptr dynamic_type); + + DDSPIPE_PARTICIPANTS_DllAPI + utils::ReturnCode receive_type_object_( + ddspipe::core::IRoutingData& data); + + DDSPIPE_PARTICIPANTS_DllAPI + void create_empty_datawriter_nts_( + const core::types::DdsTopic& topic); + + static + DDSPIPE_PARTICIPANTS_DllAPI + fastdds::dds::DataWriterQos + default_empty_datawriter_qos_( + const core::types::DdsTopic& topic) noexcept; + + static + DDSPIPE_PARTICIPANTS_DllAPI + fastdds::dds::TopicQos + default_topic_qos_( + const core::types::DdsTopic& topic) noexcept; + + std::map< + core::types::DdsTopic, + std::pair< + fastdds::dds::Topic*, + fastdds::dds::DataWriter*>> writers_; + + std::map types_discovered_; + + //! Type Object Internal Writer + std::shared_ptr type_object_writer_; + + std::mutex mutex_; +}; + +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesSubscriptionParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesSubscriptionParticipant.hpp new file mode 100644 index 00000000..93517263 --- /dev/null +++ b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesSubscriptionParticipant.hpp @@ -0,0 +1,87 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DynTypesSubscriptionParticipant.hpp + */ + +#pragma once + +#include +#include + +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { + +/** + * TODO comment + */ +class DynTypesSubscriptionParticipant : public dds::DdsCommonParticipant +{ +public: + + // TODO + DDSPIPE_PARTICIPANTS_DllAPI + DynTypesSubscriptionParticipant( + const std::shared_ptr& participant_configuration, + const std::shared_ptr& discovery_database); + + DDSPIPE_PARTICIPANTS_DllAPI + virtual ~DynTypesSubscriptionParticipant() = default; + + DDSPIPE_PARTICIPANTS_DllAPI + std::shared_ptr create_writer( + const core::ITopic& topic) override; + + DDSPIPE_PARTICIPANTS_DllAPI + std::shared_ptr create_reader( + const core::ITopic& topic) override; + + DDSPIPE_PARTICIPANTS_DllAPI + virtual void on_type_discovery( + eprosima::fastdds::dds::DomainParticipant* participant, + const eprosima::fastrtps::rtps::SampleIdentity& request_sample_id, + const eprosima::fastrtps::string_255& topic, + const eprosima::fastrtps::types::TypeIdentifier* identifier, + const eprosima::fastrtps::types::TypeObject* object, + eprosima::fastrtps::types::DynamicType_ptr dyn_type) override; + + DDSPIPE_PARTICIPANTS_DllAPI + virtual void on_type_information_received( + eprosima::fastdds::dds::DomainParticipant* participant, + const eprosima::fastrtps::string_255 topic_name, + const eprosima::fastrtps::string_255 type_name, + const eprosima::fastrtps::types::TypeInformation& type_information) override; + +protected: + + DDSPIPE_PARTICIPANTS_DllAPI + void internal_notify_type_object_( + eprosima::fastrtps::types::DynamicType_ptr dynamic_type); + + //! Type Object Internal Reader + std::shared_ptr type_object_reader_; +}; + +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp index 6fd71382..30129cf3 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp @@ -19,16 +19,18 @@ #include #include #include -#include #include +#include + -#include -#include #include #include +#include +#include -#include #include +#include +#include namespace eprosima { namespace ddspipe { @@ -156,6 +158,15 @@ class CommonParticipant const core::types::DdsTopic& topic, const core::types::ParticipantId& discoverer_id); + /** + * @brief Create a transport descriptor with whitelist and type given by specialization. + * + */ + template + DDSPIPE_PARTICIPANTS_DllAPI + static std::shared_ptr create_descriptor_( + std::set whitelist = {}); + protected: /** diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp index 668848a7..ee18060e 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp @@ -46,6 +46,14 @@ class SimpleParticipant : public CommonParticipant const std::shared_ptr& participant_configuration, const std::shared_ptr& payload_pool, const std::shared_ptr& discovery_database); + +protected: + + /** + * @brief Static method that gives the attributes for a Simple Participant. + */ + static fastrtps::rtps::RTPSParticipantAttributes reckon_participant_attributes_( + const SimpleParticipantConfiguration* configuration); }; } /* namespace rtps */ diff --git a/ddspipe_participants/include/ddspipe_participants/writer/auxiliar/InternalWriter.hpp b/ddspipe_participants/include/ddspipe_participants/writer/auxiliar/InternalWriter.hpp new file mode 100644 index 00000000..30ef3172 --- /dev/null +++ b/ddspipe_participants/include/ddspipe_participants/writer/auxiliar/InternalWriter.hpp @@ -0,0 +1,46 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { + +/** + * Writer implementation that allow to register a callback to be executed with each data received. + */ +class InternalWriter : public ddspipe::participants::BaseWriter +{ +public: + + InternalWriter( + const ddspipe::core::types::ParticipantId& participant_id, + const std::function& callback); + +protected: + + virtual utils::ReturnCode write_nts_( + ddspipe::core::IRoutingData& data) noexcept override; + + const std::function callback_; +}; + +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/configuration/SimpleParticipantConfiguration.cpp b/ddspipe_participants/src/cpp/configuration/SimpleParticipantConfiguration.cpp index 8c5f34e8..b7ac65ae 100644 --- a/ddspipe_participants/src/cpp/configuration/SimpleParticipantConfiguration.cpp +++ b/ddspipe_participants/src/cpp/configuration/SimpleParticipantConfiguration.cpp @@ -15,6 +15,7 @@ #include #include +#include namespace eprosima { namespace ddspipe { @@ -34,6 +35,16 @@ bool SimpleParticipantConfiguration::is_valid( return false; } + // Check whitelist interfaces + for (types::IpType ip : whitelist) + { + if (!types::Address::is_ipv4_correct(ip)) + { + error_msg << "Incorrect IPv4 address " << ip << " in whitelist interfaces. "; + return false; + } + } + return true; } diff --git a/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp new file mode 100644 index 00000000..f3efd4a2 --- /dev/null +++ b/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp @@ -0,0 +1,194 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { +namespace dds { + +using namespace eprosima::ddspipe::core; +using namespace eprosima::ddspipe::core::types; +using namespace eprosima::fastrtps::types; + +DdsCommonParticipant::DdsCommonParticipant( + const std::shared_ptr& participant_configuration) + : participant_configuration_(participant_configuration) +{ + // Do nothing +} + +DdsCommonParticipant::~DdsCommonParticipant() +{ + dds_participant_->set_listener(nullptr); + + dds_participant_->delete_publisher(dds_publisher_); + dds_participant_->delete_subscriber(dds_subscriber_); + + eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(dds_participant_); +} + +core::types::ParticipantId DdsCommonParticipant::id() const noexcept +{ + return participant_configuration_->id; +} + +bool DdsCommonParticipant::is_rtps_kind() const noexcept +{ + return false; +} + +bool DdsCommonParticipant::is_repeater() const noexcept +{ + return false; +} + +void DdsCommonParticipant::init() +{ + eprosima::fastdds::dds::DomainParticipantQos pqos; + pqos.name(this->id()); + + // Set Type LookUp to ON + // TODO this should not be true in all cases, but lets keep it for now + pqos.wire_protocol().builtin.typelookup_config.use_server = true; + pqos.wire_protocol().builtin.typelookup_config.use_client = true; + + // Configure Participant transports + if (participant_configuration_->transport == core::types::TransportDescriptors::builtin) + { + if (!participant_configuration_->whitelist.empty()) + { + pqos.transport().use_builtin_transports = false; + + std::shared_ptr shm_transport = + std::make_shared(); + pqos.transport().user_transports.push_back(shm_transport); + + std::shared_ptr udp_transport = + rtps::CommonParticipant::create_descriptor_( + participant_configuration_->whitelist); + pqos.transport().user_transports.push_back(udp_transport); + } + } + else if (participant_configuration_->transport == core::types::TransportDescriptors::shm_only) + { + pqos.transport().use_builtin_transports = false; + + std::shared_ptr shm_transport = + std::make_shared(); + pqos.transport().user_transports.push_back(shm_transport); + } + else if (participant_configuration_->transport == core::types::TransportDescriptors::udp_only) + { + pqos.transport().use_builtin_transports = false; + + std::shared_ptr udp_transport = + rtps::CommonParticipant::create_descriptor_( + participant_configuration_->whitelist); + pqos.transport().user_transports.push_back(udp_transport); + pqos.transport().user_transports.push_back(udp_transport); + } + + // Participant discovery filter participant_configuration_ + switch (participant_configuration_->ignore_participant_flags) + { + case core::types::IgnoreParticipantFlags::no_filter: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::NO_FILTER; + break; + case core::types::IgnoreParticipantFlags::filter_different_host: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST; + break; + case core::types::IgnoreParticipantFlags::filter_different_process: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS; + break; + case core::types::IgnoreParticipantFlags::filter_same_process: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS; + break; + case core::types::IgnoreParticipantFlags::filter_different_and_same_process: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + static_cast( + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS | + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS); + break; + default: + break; + } + + // Force DDS entities to be created disabled + // NOTE: this is very dangerous because we are modifying a global variable (and a not thread safe one) in a + // local function. + // However, this is required, otherwise we could fail in two points: + // - receive in this object, maybe in same thread a discovery callback, which could use this variable + // (e.g to check if the Participant called is this one) + // - lose a discovery callback + fastdds::dds::DomainParticipantFactoryQos original_fact_qos; + eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->get_qos( + original_fact_qos); + + fastdds::dds::DomainParticipantFactoryQos fact_qos; + fact_qos.entity_factory().autoenable_created_entities = false; + eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_qos( + fact_qos); + + // CREATE THE PARTICIPANT + dds_participant_ = eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant( + participant_configuration_->domain, + pqos, + this); + + dds_participant_->enable(); + + // Restore default DomainParticipantQoS (create enabled entities) after creating and enabling this participant + // WARNING: not thread safe at the moment of this writing, see note above. + eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_qos( + original_fact_qos); + + if (dds_participant_ == nullptr) + { + throw utils::InitializationException("Error creating DDS Participant."); + } + + dds_publisher_ = dds_participant_->create_publisher(fastdds::dds::PublisherQos()); + dds_subscriber_ = dds_participant_->create_subscriber(fastdds::dds::SubscriberQos()); +} + +} /* namespace dds */ +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp index a1caebd1..a1e3b945 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp @@ -26,7 +26,9 @@ #include #include #include - +#include +#include +#include #include #include @@ -179,6 +181,10 @@ void DynTypesParticipant::internal_notify_type_object_( void DynTypesParticipant::initialize_internal_dds_participant_() { + + std::shared_ptr configuration = + std::dynamic_pointer_cast(this->configuration_); + eprosima::fastdds::dds::DomainParticipantQos pqos; pqos.name(this->id()); @@ -186,6 +192,69 @@ void DynTypesParticipant::initialize_internal_dds_participant_() pqos.wire_protocol().builtin.typelookup_config.use_server = false; pqos.wire_protocol().builtin.typelookup_config.use_client = true; + // Configure Participant transports + if (configuration->transport == core::types::TransportDescriptors::builtin) + { + if (!configuration->whitelist.empty()) + { + pqos.transport().use_builtin_transports = false; + + std::shared_ptr shm_transport = + std::make_shared(); + pqos.transport().user_transports.push_back(shm_transport); + + std::shared_ptr udp_transport = + create_descriptor_(configuration->whitelist); + pqos.transport().user_transports.push_back(udp_transport); + } + } + else if (configuration->transport == core::types::TransportDescriptors::shm_only) + { + pqos.transport().use_builtin_transports = false; + + std::shared_ptr shm_transport = + std::make_shared(); + pqos.transport().user_transports.push_back(shm_transport); + } + else if (configuration->transport == core::types::TransportDescriptors::udp_only) + { + pqos.transport().use_builtin_transports = false; + + std::shared_ptr udp_transport = + create_descriptor_(configuration->whitelist); + pqos.transport().user_transports.push_back(udp_transport); + pqos.transport().user_transports.push_back(udp_transport); + } + + // Participant discovery filter configuration + switch (configuration->ignore_participant_flags) + { + case core::types::IgnoreParticipantFlags::no_filter: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::NO_FILTER; + break; + case core::types::IgnoreParticipantFlags::filter_different_host: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST; + break; + case core::types::IgnoreParticipantFlags::filter_different_process: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS; + break; + case core::types::IgnoreParticipantFlags::filter_same_process: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS; + break; + case core::types::IgnoreParticipantFlags::filter_different_and_same_process: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + static_cast( + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS | + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS); + break; + default: + break; + } + // Force DDS entities to be created disabled // NOTE: this is very dangerous because we are modifying a global variable (and a not thread safe one) in a // local function. @@ -200,7 +269,7 @@ void DynTypesParticipant::initialize_internal_dds_participant_() // CREATE THE PARTICIPANT dds_participant_ = eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant( - std::dynamic_pointer_cast(this->configuration_)->domain, + configuration->domain, pqos); dds_participant_->set_listener(this); diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp new file mode 100644 index 00000000..b87e18c3 --- /dev/null +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp @@ -0,0 +1,248 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { + +using namespace eprosima::ddspipe::core; +using namespace eprosima::ddspipe::core::types; +using namespace eprosima::fastrtps::types; + +DynTypesPublicationParticipant::DynTypesPublicationParticipant( + const std::shared_ptr& participant_configuration, + const std::shared_ptr& discovery_database) + : dds::DdsCommonParticipant(participant_configuration) +{ + // Create Internal Writer to receive Type Objects + // TODO: study why couldn't do with bind and try it, better than create a lambda + auto participant_callback = [this](ddspipe::core::IRoutingData& data) + { + return this->receive_type_object_(data); + }; + + type_object_writer_ = std::make_shared( + participant_configuration->id, + participant_callback); + + discovery_database->add_endpoint( + rtps::CommonParticipant::simulate_endpoint(type_object_topic(), this->id()) + ); +} + +DynTypesPublicationParticipant::~DynTypesPublicationParticipant() +{ + for (auto& writer : writers_) + { + if (nullptr != writer.second.second) + { + dds_publisher_->delete_datawriter(writer.second.second); + dds_participant_->delete_topic(writer.second.first); + } + } +} + +std::shared_ptr DynTypesPublicationParticipant::create_writer( + const ITopic& topic) +{ + if (is_type_object_topic(topic)) + { + return this->type_object_writer_; + } + + // This participant does not write anything + return std::make_shared(); +} + +std::shared_ptr DynTypesPublicationParticipant::create_reader( + const ITopic& topic) +{ + // In case of a DDS topic, check if datawriter must be created, or already exist + if (topic.internal_type_discriminator() == INTERNAL_TOPIC_TYPE_RTPS) + { + std::lock_guard _(mutex_); + create_empty_datawriter_nts_(dynamic_cast(topic)); + } + + // This participant does not read anything + return std::make_shared(); +} + +utils::ReturnCode DynTypesPublicationParticipant::receive_type_object_( + eprosima::fastrtps::types::DynamicType_ptr dynamic_type) +{ + // All this function is protected + std::lock_guard _(mutex_); + + auto type_name = dynamic_type->get_name(); + + logInfo(DDSPIPE_DYNTYPES_DDSPARTICIPANT, "Received type object for type " << type_name); + + // First, check if the type already exist. If so, nothing to do + auto it = types_discovered_.find(type_name); + if (it != types_discovered_.end()) + { + return utils::ReturnCode::RETCODE_OK; + } + + // It is a new type, so add it to the types + types_discovered_[type_name] = dynamic_type; + + // Register Participant + eprosima::fastdds::dds::TypeSupport type(new eprosima::fastrtps::types::DynamicPubSubType(dynamic_type)); + type->auto_fill_type_information(true); + type->auto_fill_type_object(true); + dds_participant_->register_type(type); + + // Check if any topic already discovered uses this new type. If so create data writer + std::vector topics_to_create_writers; + for (const auto& writer : writers_) + { + if (writer.first.type_name == type_name) + { + // This should always be nullptr, but just in case + if (nullptr == writer.second.second) + { + topics_to_create_writers.push_back(writer.first); + } + } + } + + for (const auto& topic : topics_to_create_writers) + { + create_empty_datawriter_nts_(topic); + } + + return utils::ReturnCode::RETCODE_OK; +} + +void DynTypesPublicationParticipant::create_empty_datawriter_nts_( + const core::types::DdsTopic& topic) +{ + auto topic_name = topic.topic_name(); + auto type_name = topic.type_name; + + // Check if topic already exist + auto it_topic = writers_.find(topic); + + // If not, create it empty + if (it_topic == writers_.end()) + { + writers_[topic] = {nullptr, nullptr}; + it_topic = writers_.find(topic); + } + + // If writer already created, finish function + if (nullptr != it_topic->second.first) + { + return; + } + + // Writer is not created + // Check if type exists. If so, create writer, if not leave + auto it_type = types_discovered_.find(type_name); + if (it_type == types_discovered_.end()) + { + return; + } + + // Create topic + fastdds::dds::Topic* dds_topic = dds_participant_->create_topic( + topic.topic_name(), + topic.type_name, + default_topic_qos_(topic) + ); + + // Create writer + writers_[topic] = + { + dds_topic, + dds_publisher_->create_datawriter( + dds_topic, + default_empty_datawriter_qos_(topic)) + }; + + logInfo(DDSPIPE_DYNTYPES_DDSPARTICIPANT, "Created writer for topic " << topic); +} + +fastdds::dds::DataWriterQos +DynTypesPublicationParticipant::default_empty_datawriter_qos_( + const core::types::DdsTopic& topic) noexcept +{ + // TODO decide which qos to use. Using less restrictive + auto qos = fastdds::dds::DataWriterQos(); + qos.durability().kind = + ( topic.topic_qos.is_transient_local() ? + fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS : + fastdds::dds::DurabilityQosPolicyKind::VOLATILE_DURABILITY_QOS + ); + qos.reliability().kind = + ( topic.topic_qos.is_reliable() ? + fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS : + fastdds::dds::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS + ); + qos.ownership().kind = + ( topic.topic_qos.has_ownership() ? + fastdds::dds::OwnershipQosPolicyKind::EXCLUSIVE_OWNERSHIP_QOS : + fastdds::dds::OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS + ); + + return qos; +} + +fastdds::dds::TopicQos +DynTypesPublicationParticipant::default_topic_qos_( + const core::types::DdsTopic& topic) noexcept +{ + // TODO decide which qos to use + return fastdds::dds::TopicQos(); +} + +utils::ReturnCode DynTypesPublicationParticipant::receive_type_object_( + ddspipe::core::IRoutingData& data) +{ + // Assuming that data is of type required + auto& dynamic_type_data = dynamic_cast(data); + return receive_type_object_(dynamic_type_data.dynamic_type); +} + +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp new file mode 100644 index 00000000..bc09c903 --- /dev/null +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp @@ -0,0 +1,169 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + + +#include +#include +#include +#include +#include + +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { + +using namespace eprosima::ddspipe::core; +using namespace eprosima::ddspipe::core::types; +using namespace eprosima::fastrtps::types; + +DynTypesSubscriptionParticipant::DynTypesSubscriptionParticipant( + const std::shared_ptr& participant_configuration, + const std::shared_ptr& discovery_database) + : dds::DdsCommonParticipant(participant_configuration) + , type_object_reader_(std::make_shared( + participant_configuration->id)) +{ + discovery_database->add_endpoint( + rtps::CommonParticipant::simulate_endpoint(type_object_topic(), this->id()) + ); +} + +std::shared_ptr DynTypesSubscriptionParticipant::create_writer( + const ITopic& topic) +{ + // This participant does not write anything + return std::make_shared(); +} + +std::shared_ptr DynTypesSubscriptionParticipant::create_reader( + const ITopic& topic) +{ + if (is_type_object_topic(topic)) + { + return this->type_object_reader_; + } + + // This participant does not read anything + return std::make_shared(); +} + +void DynTypesSubscriptionParticipant::on_type_discovery( + eprosima::fastdds::dds::DomainParticipant* /* participant */, + const fastrtps::rtps::SampleIdentity& /* request_sample_id */, + const fastrtps::string_255& /* topic */, + const fastrtps::types::TypeIdentifier* identifier, + const fastrtps::types::TypeObject* object, + fastrtps::types::DynamicType_ptr dyn_type) +{ + if (nullptr != dyn_type) + { + // Register type obj in singleton factory + TypeObjectFactory::get_instance()->add_type_object( + dyn_type->get_name(), identifier, object); + internal_notify_type_object_(dyn_type); + } +} + +void DynTypesSubscriptionParticipant::on_type_information_received( + eprosima::fastdds::dds::DomainParticipant* participant, + const fastrtps::string_255 /* topic_name */, + const fastrtps::string_255 type_name, + const fastrtps::types::TypeInformation& type_information) +{ + std::string type_name_ = type_name.to_string(); + const TypeIdentifier* type_identifier = nullptr; + const TypeObject* type_object = nullptr; + DynamicType_ptr dynamic_type(nullptr); + + // Check if complete identifier already present in factory + type_identifier = TypeObjectFactory::get_instance()->get_type_identifier(type_name_, true); + if (type_identifier) + { + type_object = TypeObjectFactory::get_instance()->get_type_object(type_name_, true); + } + + // If complete not found, try with minimal + if (!type_object) + { + type_identifier = TypeObjectFactory::get_instance()->get_type_identifier(type_name_, false); + if (type_identifier) + { + type_object = TypeObjectFactory::get_instance()->get_type_object(type_name_, false); + } + } + + // Build dynamic type if type identifier and object found in factory + if (type_identifier && type_object) + { + dynamic_type = TypeObjectFactory::get_instance()->build_dynamic_type(type_name_, type_identifier, type_object); + } + + // Request type object through TypeLookup if not present in factory, or if type building failed + if (!dynamic_type) + { + std::function callback( + [this] + (const std::string& /* type_name */, const DynamicType_ptr type) + { + this->internal_notify_type_object_(type); + }); + // Registering type and creating reader + participant->register_remote_type( + type_information, + type_name_, + callback); + } + else + { + internal_notify_type_object_(dynamic_type); + } +} + +void DynTypesSubscriptionParticipant::internal_notify_type_object_( + eprosima::fastrtps::types::DynamicType_ptr dynamic_type) +{ + logInfo(DDSPIPE_DYNTYPES_PARTICIPANT, + "Participant " << this->id() << " discovered type object " << dynamic_type->get_name()); + + // Create data containing Dynamic Type + auto data = std::make_unique(); + data->dynamic_type = dynamic_type; // TODO: add constructor with param + + // Insert new data in internal reader queue + type_object_reader_->simulate_data_reception(std::move(data)); +} + +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp index b2b297f6..709fa2a3 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp @@ -15,6 +15,10 @@ #include +#include +#include +#include +#include #include #include @@ -276,6 +280,114 @@ core::types::Endpoint CommonParticipant::simulate_endpoint( return endpoint; } +template<> +std::shared_ptr +CommonParticipant::create_descriptor_( + std::set whitelist) +{ + std::shared_ptr udp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv4_correct(ip)) + { + udp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_COMMON_PARTICIPANT, + "Adding " << ip << " to UDP whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_COMMON_PARTICIPANT, + "Not valid IPv4. Discarding UDP whitelist interface " << ip << "."); + } + } + + return udp_transport; +} + +template<> +std::shared_ptr +CommonParticipant::create_descriptor_( + std::set whitelist) +{ + std::shared_ptr udp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv6_correct(ip)) + { + udp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_COMMON_PARTICIPANT, + "Adding " << ip << " to UDP whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_COMMON_PARTICIPANT, + "Not valid IPv6. Discarding UDP whitelist interface " << ip << "."); + } + } + + return udp_transport; +} + +template<> +std::shared_ptr +CommonParticipant::create_descriptor_( + std::set whitelist) +{ + std::shared_ptr tcp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv4_correct(ip)) + { + tcp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_COMMON_PARTICIPANT, + "Adding " << ip << " to TCP whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_COMMON_PARTICIPANT, + "Not valid IPv4. Discarding TCP whitelist interface " << ip << "."); + } + } + + return tcp_transport; +} + +template<> +std::shared_ptr +CommonParticipant::create_descriptor_( + std::set whitelist) +{ + std::shared_ptr tcp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv6_correct(ip)) + { + tcp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_COMMON_PARTICIPANT, + "Adding " << ip << " to TCP whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_COMMON_PARTICIPANT, + "Not valid IPv6. Discarding TCP whitelist interface " << ip << "."); + } + } + + return tcp_transport; +} + bool CommonParticipant::is_repeater() const noexcept { return configuration_->is_repeater; diff --git a/ddspipe_participants/src/cpp/participant/rtps/DiscoveryServerParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/DiscoveryServerParticipant.cpp index 2892886d..09a05d3b 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/DiscoveryServerParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/DiscoveryServerParticipant.cpp @@ -120,7 +120,8 @@ DiscoveryServerParticipant::reckon_participant_attributes_( } else { - descriptor = std::make_shared(); + descriptor = create_descriptor_( + configuration->whitelist); descriptor->add_listener_port(address.port()); descriptor->set_WAN_address(address.ip()); @@ -139,7 +140,7 @@ DiscoveryServerParticipant::reckon_participant_attributes_( has_listening_tcp_ipv6 = true; std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); descriptor->add_listener_port(address.port()); @@ -299,7 +300,7 @@ DiscoveryServerParticipant::reckon_participant_attributes_( if (has_connection_tcp_ipv4 && !has_listening_tcp_ipv4) { std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); // Enable TLS if (tls_config.is_active()) @@ -315,7 +316,7 @@ DiscoveryServerParticipant::reckon_participant_attributes_( if (has_connection_tcp_ipv6 && !has_listening_tcp_ipv6) { std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); // Enable TLS if (tls_config.is_active()) @@ -333,7 +334,7 @@ DiscoveryServerParticipant::reckon_participant_attributes_( if (has_udp_ipv4) { std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); params.userTransports.push_back(descriptor); logDebug(DDSPIPE_DISCOVERYSERVER_PARTICIPANT, @@ -342,7 +343,7 @@ DiscoveryServerParticipant::reckon_participant_attributes_( if (has_udp_ipv6) { std::shared_ptr descriptor_v6 = - std::make_shared(); + create_descriptor_(configuration->whitelist); params.userTransports.push_back(descriptor_v6); logDebug(DDSPIPE_DISCOVERYSERVER_PARTICIPANT, diff --git a/ddspipe_participants/src/cpp/participant/rtps/InitialPeersParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/InitialPeersParticipant.cpp index 1f99e49e..7e19a06a 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/InitialPeersParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/InitialPeersParticipant.cpp @@ -115,7 +115,8 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic } else { - descriptor = std::make_shared(); + descriptor = create_descriptor_( + configuration->whitelist); descriptor->add_listener_port(address.port()); descriptor->set_WAN_address(address.ip()); @@ -134,7 +135,7 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic has_listening_tcp_ipv6 = true; std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); descriptor->add_listener_port(address.port()); @@ -254,7 +255,7 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic if (has_connection_tcp_ipv4 && !has_listening_tcp_ipv4) { std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); // Enable TLS if (tls_config.is_active()) @@ -271,7 +272,7 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic if (has_connection_tcp_ipv6 && !has_listening_tcp_ipv6) { std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); // Enable TLS if (tls_config.is_active()) @@ -289,7 +290,7 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic if (has_udp_ipv4) { std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); params.userTransports.push_back(descriptor); logDebug(DDSPIPE_INITIALPEERS_PARTICIPANT, @@ -299,7 +300,7 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic if (has_udp_ipv6) { std::shared_ptr descriptor_v6 = - std::make_shared(); + create_descriptor_(configuration->whitelist); params.userTransports.push_back(descriptor_v6); logDebug(DDSPIPE_INITIALPEERS_PARTICIPANT, diff --git a/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp index e8745d7e..09b43dce 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp @@ -16,6 +16,9 @@ #include #include +#include +#include +#include #include @@ -33,10 +36,82 @@ SimpleParticipant::SimpleParticipant( payload_pool, discovery_database, participant_configuration->domain, - CommonParticipant::reckon_participant_attributes_(participant_configuration.get())) + reckon_participant_attributes_(participant_configuration.get())) { } +fastrtps::rtps::RTPSParticipantAttributes +SimpleParticipant::reckon_participant_attributes_( + const SimpleParticipantConfiguration* configuration) +{ + // Use default as base attributes + fastrtps::rtps::RTPSParticipantAttributes params = CommonParticipant::reckon_participant_attributes_(configuration); + + // Configure Participant transports + if (configuration->transport == core::types::TransportDescriptors::builtin) + { + if (!configuration->whitelist.empty()) + { + params.useBuiltinTransports = false; + + std::shared_ptr shm_transport = + std::make_shared(); + params.userTransports.push_back(shm_transport); + + std::shared_ptr udp_transport = + create_descriptor_(configuration->whitelist); + params.userTransports.push_back(udp_transport); + } + } + else if (configuration->transport == core::types::TransportDescriptors::shm_only) + { + params.useBuiltinTransports = false; + + std::shared_ptr shm_transport = + std::make_shared(); + params.userTransports.push_back(shm_transport); + } + else if (configuration->transport == core::types::TransportDescriptors::udp_only) + { + params.useBuiltinTransports = false; + + std::shared_ptr udp_transport = + create_descriptor_(configuration->whitelist); + params.userTransports.push_back(udp_transport); + } + + // Participant discovery filter configuration + switch (configuration->ignore_participant_flags) + { + case core::types::IgnoreParticipantFlags::no_filter: + params.builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::NO_FILTER; + break; + case core::types::IgnoreParticipantFlags::filter_different_host: + params.builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST; + break; + case core::types::IgnoreParticipantFlags::filter_different_process: + params.builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS; + break; + case core::types::IgnoreParticipantFlags::filter_same_process: + params.builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS; + break; + case core::types::IgnoreParticipantFlags::filter_different_and_same_process: + params.builtin.discovery_config.ignoreParticipantFlags = + static_cast( + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS | + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS); + break; + default: + break; + } + + return params; +} + } /* namespace rtps */ } /* namespace participants */ } /* namespace ddspipe */ diff --git a/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp index b74bb87a..db6e23d4 100644 --- a/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp @@ -365,6 +365,10 @@ fastrtps::TopicAttributes CommonReader::reckon_topic_attributes_( att.historyQos.kind = eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_LAST_HISTORY_QOS; att.historyQos.depth = topic.topic_qos.history_depth; + // Disable type object/information filling + att.auto_fill_type_information = false; + att.auto_fill_type_object = false; + return att; } diff --git a/ddspipe_participants/src/cpp/writer/auxiliar/InternalWriter.cpp b/ddspipe_participants/src/cpp/writer/auxiliar/InternalWriter.cpp new file mode 100644 index 00000000..e26813e3 --- /dev/null +++ b/ddspipe_participants/src/cpp/writer/auxiliar/InternalWriter.cpp @@ -0,0 +1,38 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { + +InternalWriter::InternalWriter( + const ddspipe::core::types::ParticipantId& participant_id, + const std::function& callback) + : ddspipe::participants::BaseWriter(participant_id) + , callback_(callback) +{ + // Do nothing +} + +utils::ReturnCode InternalWriter::write_nts_( + ddspipe::core::IRoutingData& data) noexcept +{ + return callback_(data); +} + +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp b/ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp index ba4eac76..9cce51ad 100644 --- a/ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp +++ b/ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp @@ -378,6 +378,10 @@ fastrtps::TopicAttributes CommonWriter::reckon_topic_attributes_( att.topicName = topic.m_topic_name; att.topicDataType = topic.type_name; + // Disable type object/information filling + att.auto_fill_type_information = false; + att.auto_fill_type_object = false; + return att; } diff --git a/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp b/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp index e0eb9e80..6c1c78fb 100644 --- a/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp +++ b/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp @@ -57,6 +57,24 @@ constexpr const char* ECHO_DISCOVERY_TAG("discovery"); //! Echo Discovery recei constexpr const char* ECHO_VERBOSE_TAG("verbose"); //! Echo in verbose mode // RTPS related tags + +// Transport related tags +constexpr const char* WHITELIST_INTERFACES_TAG("whitelist-interfaces"); //! TODO: add comment + +// Custom transport descriptors tags +constexpr const char* TRANSPORT_DESCRIPTORS_TRANSPORT_TAG("transport"); //! TODO: add comment +constexpr const char* TRANSPORT_DESCRIPTORS_BUILTIN_TAG("builtin"); //! TODO: add comment +constexpr const char* TRANSPORT_DESCRIPTORS_UDP_TAG("udp"); //! TODO: add comment +constexpr const char* TRANSPORT_DESCRIPTORS_SHM_TAG("shm"); //! TODO: add comment + +// Participant discovery settings +constexpr const char* IGNORE_PARTICIPANT_FLAGS_TAG("ignore-participant-flags"); //! TODO: add comment +constexpr const char* IGNORE_PARTICIPANT_FLAGS_NO_FILTER_TAG("no_filter"); //! TODO: add comment +constexpr const char* IGNORE_PARTICIPANT_FLAGS_DIFFERENT_HOST_TAG("filter_different_host"); //! TODO: add comment +constexpr const char* IGNORE_PARTICIPANT_FLAGS_DIFFERENT_PROCESS_TAG("filter_different_process"); //! TODO: add comment +constexpr const char* IGNORE_PARTICIPANT_FLAGS_SAME_PROCESS_TAG("filter_same_process"); //! TODO: add comment +constexpr const char* IGNORE_PARTICIPANT_FLAGS_DIFFERENT_AND_SAME_PROCESS_TAG("filter_different_and_same_process"); //! TODO: add comment + // Simple RTPS related tags constexpr const char* DOMAIN_ID_TAG("domain"); //! Domain Id of the participant diff --git a/ddspipe_yaml/src/cpp/YamlReader_participants.cpp b/ddspipe_yaml/src/cpp/YamlReader_participants.cpp index c9188e82..a8aa358c 100644 --- a/ddspipe_yaml/src/cpp/YamlReader_participants.cpp +++ b/ddspipe_yaml/src/cpp/YamlReader_participants.cpp @@ -132,6 +132,33 @@ void YamlReader::fill( { object.domain = get(yml, DOMAIN_ID_TAG, version); } + + // Optional whitelist interfaces + if (YamlReader::is_tag_present(yml, WHITELIST_INTERFACES_TAG)) + { + object.whitelist = YamlReader::get_set(yml, WHITELIST_INTERFACES_TAG, version); + } + + // Optional get Transport descriptors + if (YamlReader::is_tag_present(yml, TRANSPORT_DESCRIPTORS_TRANSPORT_TAG)) + { + object.transport = get(yml, TRANSPORT_DESCRIPTORS_TRANSPORT_TAG, version); + } + else + { + object.transport = core::types::TransportDescriptors::builtin; + } + + // Optional get ignore participant flags + if (YamlReader::is_tag_present(yml, IGNORE_PARTICIPANT_FLAGS_TAG)) + { + object.ignore_participant_flags = get(yml, IGNORE_PARTICIPANT_FLAGS_TAG, + version); + } + else + { + object.ignore_participant_flags = core::types::IgnoreParticipantFlags::no_filter; + } } template <> @@ -157,6 +184,12 @@ void YamlReader::fill( // Parent class fill fill(object, yml, version); + // Optional whitelist interfaces + if (YamlReader::is_tag_present(yml, WHITELIST_INTERFACES_TAG)) + { + object.whitelist = YamlReader::get_set(yml, WHITELIST_INTERFACES_TAG, version); + } + // Optional listening addresses if (YamlReader::is_tag_present(yml, LISTENING_ADDRESSES_TAG)) { @@ -219,6 +252,12 @@ void YamlReader::fill( // Parent class fill fill(object, yml, version); + // Optional whitelist interfaces + if (YamlReader::is_tag_present(yml, WHITELIST_INTERFACES_TAG)) + { + object.whitelist = YamlReader::get_set(yml, WHITELIST_INTERFACES_TAG, version); + } + // Optional listening addresses if (YamlReader::is_tag_present(yml, LISTENING_ADDRESSES_TAG)) { diff --git a/ddspipe_yaml/src/cpp/YamlReader_types.cpp b/ddspipe_yaml/src/cpp/YamlReader_types.cpp index 74283a6b..5cecaede 100644 --- a/ddspipe_yaml/src/cpp/YamlReader_types.cpp +++ b/ddspipe_yaml/src/cpp/YamlReader_types.cpp @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -48,6 +49,39 @@ namespace yaml { using namespace eprosima::ddspipe::core::types; using namespace eprosima::ddspipe::participants::types; +template <> +DDSPIPE_YAML_DllAPI +TransportDescriptors YamlReader::get( + const Yaml& yml, + const YamlReaderVersion /* version */) +{ + return get_enumeration( + yml, + { + {TRANSPORT_DESCRIPTORS_BUILTIN_TAG, TransportDescriptors::builtin}, + {TRANSPORT_DESCRIPTORS_UDP_TAG, TransportDescriptors::udp_only}, + {TRANSPORT_DESCRIPTORS_SHM_TAG, TransportDescriptors::shm_only} + }); +} + +template <> +DDSPIPE_YAML_DllAPI +IgnoreParticipantFlags YamlReader::get( + const Yaml& yml, + const YamlReaderVersion /* version */) +{ + return get_enumeration( + yml, + { + {IGNORE_PARTICIPANT_FLAGS_NO_FILTER_TAG, IgnoreParticipantFlags::no_filter}, + {IGNORE_PARTICIPANT_FLAGS_DIFFERENT_HOST_TAG, IgnoreParticipantFlags::filter_different_host}, + {IGNORE_PARTICIPANT_FLAGS_DIFFERENT_PROCESS_TAG, IgnoreParticipantFlags::filter_different_process}, + {IGNORE_PARTICIPANT_FLAGS_SAME_PROCESS_TAG, IgnoreParticipantFlags::filter_same_process}, + {IGNORE_PARTICIPANT_FLAGS_DIFFERENT_AND_SAME_PROCESS_TAG, + IgnoreParticipantFlags::filter_different_and_same_process}, + }); +} + template <> DDSPIPE_YAML_DllAPI YamlReaderVersion YamlReader::get( @@ -73,7 +107,7 @@ TransportProtocol YamlReader::get( yml, { {ADDRESS_TRANSPORT_TCP_TAG, TransportProtocol::tcp}, - {ADDRESS_TRANSPORT_UDP_TAG, TransportProtocol::udp}, + {ADDRESS_TRANSPORT_UDP_TAG, TransportProtocol::udp} }); }