From 01b592e34deb04fcb67253294dea5e7e8bb88b56 Mon Sep 17 00:00:00 2001 From: tempate Date: Wed, 17 Jan 2024 08:43:19 +0100 Subject: [PATCH 1/5] Make remove unused entities compatible with the discovery trigger Signed-off-by: tempate --- .../communication/dds/DdsBridge.hpp | 112 +++--- .../ddspipe_core/communication/dds/Track.hpp | 7 +- .../configuration/RoutesConfiguration.hpp | 12 +- .../include/ddspipe_core/core/DdsPipe.hpp | 10 +- .../src/cpp/communication/dds/DdsBridge.cpp | 328 +++++++++++------- .../src/cpp/communication/dds/Track.cpp | 17 +- .../configuration/DdsPipeConfiguration.cpp | 6 - ddspipe_core/src/cpp/core/DdsPipe.cpp | 25 +- .../cpp/writer/dynamic_types/SchemaWriter.cpp | 2 +- 9 files changed, 311 insertions(+), 208 deletions(-) diff --git a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp index e1af73ec..835cd9d1 100644 --- a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp +++ b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp @@ -18,11 +18,14 @@ #include #include +#include #include +#include #include #include #include + namespace eprosima { namespace ddspipe { namespace core { @@ -61,7 +64,8 @@ class DdsBridge : public Bridge const std::shared_ptr& thread_pool, const RoutesConfiguration& routes_config, const bool remove_unused_entities, - const std::vector& manual_topics); + const std::vector& manual_topics, + const types::EndpointKind& endpoint_kind); DDSPIPE_CORE_DllAPI ~DdsBridge(); @@ -84,77 +88,89 @@ class DdsBridge : public Bridge DDSPIPE_CORE_DllAPI void disable() noexcept override; - /** - * Build the IReaders and IWriters inside the bridge for the new participant, - * and add them to the Tracks. - * - * Thread safe - * - * @param participant_id: The id of the participant who is creating the writer. - * - * @throw InitializationException in case \c IWriters or \c IReaders creation fails. - */ + // TODO DDSPIPE_CORE_DllAPI - void create_writer( - const types::ParticipantId& participant_id); + void create_endpoint( + const types::ParticipantId& participant_id, + const types::EndpointKind& discovered_endpoint_kind); - /** - * Remove the IWriter from all the Tracks in the bridge. - * Remove the IReaders and Tracks that don't have any IWriters. - * - * Thread safe - * - * @param participant_id: The id of the participant who is removing the writer. - */ + // TODO DDSPIPE_CORE_DllAPI - void remove_writer( - const types::ParticipantId& participant_id) noexcept; + void remove_endpoint( + const types::ParticipantId& participant_id, + const types::EndpointKind& removed_endpoint_kind); protected: /** - * Create the readers, writers, and tracks that are required by the routes. + * Create an IWriter for the new participant. + * Create the IReaders in the IWriter's route. + * Create the Tracks of the IReaderes with the IWriter. * - * Thread safe + * @param participant_id: The id of the participant who is creating the writer. * * @throw InitializationException in case \c IWriters or \c IReaders creation fails. */ - DDSPIPE_CORE_DllAPI - void create_all_tracks_(); + void create_writer_and_its_tracks_nts_( + const types::ParticipantId& participant_id); /** - * Add each Participant's IWriters to its Track. - * If the Participant's IReader doesn't exist, create it. - * If the Participant's Track doesn't exist, create it. + * Create an IReader for the new participant. + * Create the IWriters in the IReader's route. + * Create the Track with the IReader and IWriters. * - * @param writers: The map of ids to writers that are required for the tracks. + * @param participant_id: The id of the participant who is creating the reader. * - * @throw InitializationException in case \c IReaders creation fails. + * @throw InitializationException in case \c IWriters or \c IReaders creation fails. */ - DDSPIPE_CORE_DllAPI - void add_writer_to_tracks_nts_( - const types::ParticipantId& participant_id, - std::shared_ptr& writer); + void create_reader_and_its_track_nts_( + const types::ParticipantId& participant_id); /** - * Add each Participant's IWriters to its Track. - * If the Participant's IReader doesn't exist, create it. - * If the Participant's Track doesn't exist, create it. + * Remove the IWriter from all the Tracks in the bridge. + * Remove the IReaders and Tracks that don't have any IWriters. * - * @param writers: The map of ids to writers that are required for the tracks. + * @param participant_id: The id of the participant who is removing the writer. + */ + void remove_writer_and_its_tracks_nts_( + const types::ParticipantId& participant_id) noexcept; + + /** + * Remove the IReader and its Track from the bridge. + * Remove the IWriters that don't belong to a Track. * - * @throw InitializationException in case \c IReaders creation fails. + * @param participant_id: The id of the participant who is removing the reader. */ - DDSPIPE_CORE_DllAPI - void add_writers_to_tracks_nts_( + void remove_reader_and_its_track_nts_( + const types::ParticipantId& participant_id) noexcept; + + // TODO + void create_track_nts_( + const types::ParticipantId& id, + const std::shared_ptr& reader, std::map>& writers); + // TODO + std::shared_ptr create_writer_nts_( + const types::ParticipantId& participant_id); + + // TODO + std::shared_ptr create_reader_nts_( + const types::ParticipantId& participant_id); + + // TODO + std::set readers_in_writers_route_nts_( + const types::ParticipantId& writer_id); + + // TODO + std::set writers_in_readers_route_nts_( + const types::ParticipantId& reader_id); + /** * @brief Impose the Topic QoS that have been pre-configured for a participant. * * First, it imposes the Topic QoS configured at \c manual_topics and then the ones configured at \c participants. */ - DDSPIPE_CORE_DllAPI utils::Heritable create_topic_for_participant_nts_( const std::shared_ptr& participant) noexcept; @@ -171,12 +187,12 @@ class DdsBridge : public Bridge //! Topics that explicitally set a QoS attribute for this participant. std::vector manual_topics_; - /** - * Inside \c Tracks - * They are indexed by the Id of the participant that is source - */ + //! The tracks of the bridge indexed by the participant_id of their reader. std::map> tracks_; + //! The writers of the bridge index by their participant_id. + std::map> writers_; + //! Mutex to prevent simultaneous calls to enable and/or disable std::mutex mutex_; diff --git a/ddspipe_core/include/ddspipe_core/communication/dds/Track.hpp b/ddspipe_core/include/ddspipe_core/communication/dds/Track.hpp index fcbd1d2c..b1b15e66 100644 --- a/ddspipe_core/include/ddspipe_core/communication/dds/Track.hpp +++ b/ddspipe_core/include/ddspipe_core/communication/dds/Track.hpp @@ -124,7 +124,7 @@ class Track */ DDSPIPE_CORE_DllAPI bool has_writer( - const types::ParticipantId& id) noexcept; + const types::ParticipantId& id) const noexcept; /** * Check if a track has at least one writer. @@ -132,7 +132,7 @@ class Track * Tread safe */ DDSPIPE_CORE_DllAPI - bool has_writers() noexcept; + bool has_writers() const noexcept; protected: @@ -209,8 +209,9 @@ class Track /** * Mutex to prevent simultaneous calls to \c enable and/or \c disable . * It manages access to variable \c enabled_ . + * It is mutable so it can be used in const methods. */ - std::mutex track_mutex_; + mutable std::mutex track_mutex_; ///// // Transmit thread part diff --git a/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp b/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp index e9e5ff01..a4ec74db 100644 --- a/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp +++ b/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp @@ -40,16 +40,19 @@ struct RoutesConfiguration : public IConfiguration // CONSTRUCTORS ///////////////////////// - DDSPIPE_CORE_DllAPI RoutesConfiguration() = default; + DDSPIPE_CORE_DllAPI + RoutesConfiguration() = default; ///////////////////////// // METHODS ///////////////////////// - DDSPIPE_CORE_DllAPI virtual bool is_valid( + DDSPIPE_CORE_DllAPI + virtual bool is_valid( utils::Formatter& error_msg) const noexcept override; - DDSPIPE_CORE_DllAPI bool is_valid( + DDSPIPE_CORE_DllAPI + bool is_valid( utils::Formatter& error_msg, const std::map& participants) const noexcept; @@ -57,7 +60,8 @@ struct RoutesConfiguration : public IConfiguration // OPERATORS ///////////////////////// - DDSPIPE_CORE_DllAPI RoutesMap operator () () const; + DDSPIPE_CORE_DllAPI + RoutesMap operator () () const; ///////////////////////// // VARIABLES diff --git a/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp b/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp index 57687b03..d6471f82 100644 --- a/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp +++ b/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp @@ -26,6 +26,7 @@ #include #include #include +#include #include @@ -262,7 +263,8 @@ class DdsPipe * @param [in] topic : topic discovered */ void discovered_topic_nts_( - const utils::Heritable& topic) noexcept; + const utils::Heritable& topic, + const types::EndpointKind& endpoint_kind = types::EndpointKind::reader) noexcept; /** * @brief Method called every time a new endpoint (corresponding to a server) has been discovered/updated @@ -304,7 +306,8 @@ class DdsPipe */ void create_new_bridge_nts_( const utils::Heritable& topic, - bool enabled = false) noexcept; + bool enabled = false, + const types::EndpointKind& endpoint_kind = types::EndpointKind::reader) noexcept; /** * @brief Create a new \c RpcBridge object @@ -324,7 +327,8 @@ class DdsPipe * @param [in] topic : Topic to be enabled */ void activate_topic_nts_( - const utils::Heritable& topic) noexcept; + const utils::Heritable& topic, + const types::EndpointKind& endpoint_kind = types::EndpointKind::reader) noexcept; /** * @brief Disable a specific topic. diff --git a/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp b/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp index d1247c95..db404198 100644 --- a/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp +++ b/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp @@ -30,7 +30,8 @@ DdsBridge::DdsBridge( const std::shared_ptr& thread_pool, const RoutesConfiguration& routes_config, const bool remove_unused_entities, - const std::vector& manual_topics) + const std::vector& manual_topics, + const EndpointKind& endpoint_kind) : Bridge(participants_database, payload_pool, thread_pool) , topic_(topic) , manual_topics_(manual_topics) @@ -41,11 +42,15 @@ DdsBridge::DdsBridge( if (remove_unused_entities && topic->topic_discoverer() != DEFAULT_PARTICIPANT_ID) { - create_writer(topic->topic_discoverer()); + create_endpoint(topic->topic_discoverer(), endpoint_kind); } else { - create_all_tracks_(); + for (const ParticipantId& id : participants_->get_participants_ids()) + { + std::lock_guard lock(mutex_); + create_reader_and_its_track_nts_(id); + } } logDebug(DDSPIPE_DDSBRIDGE, "DdsBridge " << *this << " created."); @@ -69,7 +74,6 @@ void DdsBridge::enable() noexcept { EPROSIMA_LOG_INFO(DDSPIPE_DDSBRIDGE, "Enabling DdsBridge for topic " << topic_ << "."); - // ATTENTION: reference needed or it would copy Track for (auto& track_it : tracks_) { track_it.second->enable(); @@ -87,7 +91,6 @@ void DdsBridge::disable() noexcept { EPROSIMA_LOG_INFO(DDSPIPE_DDSBRIDGE, "Disabling DdsBridge for topic " << topic_ << "."); - // ATTENTION: reference needed or it would copy Track for (auto& track_it : tracks_) { track_it.second->disable(); @@ -97,187 +100,266 @@ void DdsBridge::disable() noexcept } } -void DdsBridge::create_all_tracks_() +void DdsBridge::create_endpoint( + const ParticipantId& participant_id, + const EndpointKind& discovered_endpoint_kind) { std::lock_guard lock(mutex_); - const auto& ids = participants_->get_participants_ids(); + if (discovered_endpoint_kind == EndpointKind::reader) + { + create_writer_and_its_tracks_nts_(participant_id); + } + else + { + create_reader_and_its_track_nts_(participant_id); + } +} - // Figure out what writers need to be created - std::set writers_to_create; +void DdsBridge::remove_endpoint( + const ParticipantId& participant_id, + const EndpointKind& removed_endpoint_kind) +{ + std::lock_guard lock(mutex_); - for (const ParticipantId& id : ids) + if (removed_endpoint_kind == EndpointKind::reader) { - const auto& routes_it = routes_.find(id); + remove_writer_and_its_tracks_nts_(participant_id); + } + else + { + remove_reader_and_its_track_nts_(participant_id); + } +} - if (routes_it != routes_.end()) - { - // The reader has a route. Create only the writers in the route. +void DdsBridge::create_writer_and_its_tracks_nts_( + const ParticipantId& participant_id) +{ + assert(participant_id != DEFAULT_PARTICIPANT_ID); + + // 1. Create the writer + auto writer = create_writer_nts_(participant_id); - // We are not going to modify the writers_ids in this route. We can get the writers_ids by reference. - const auto& writers_ids = routes_it->second; - writers_to_create.insert(writers_ids.begin(), writers_ids.end()); + // Save the writer + writers_[participant_id] = writer; + + // 2. Find the readers in the writer's route + const auto& readers_in_route = readers_in_writers_route_nts_(participant_id); + + // 3. Find or create the tracks in the writer's route + for (const auto& id : readers_in_route) + { + if (tracks_.count(id)) + { + // The track already exists. Add the writer. + tracks_[id]->add_writer(participant_id, writer); } else { - // The reader doesn't have a route. Create every writer (+ itself if repeater) - auto writers_ids = ids; + // The track doesn't exist - if (!participants_->get_participant(id)->is_repeater()) - { - // The participant is not a repeater. Do not add its writer. - writers_ids.erase(id); - } + // 3.1. Create the reader + auto reader = create_reader_nts_(id); - writers_to_create.insert(writers_ids.begin(), writers_ids.end()); + // 3.2. Create a writers set from the writer + std::map> writers; + writers[participant_id] = writer; + + // 3.3. Create the track + create_track_nts_(id, reader, writers); } } +} + +void DdsBridge::create_reader_and_its_track_nts_( + const ParticipantId& participant_id) +{ + assert(participant_id != DEFAULT_PARTICIPANT_ID); + + // 1. Create the reader + auto reader = create_reader_nts_(participant_id); + + // 2. Find the writers in the reader's route + const auto& writers_in_route = writers_in_readers_route_nts_(participant_id); - // Create the writers. + // 3. Find or create the writers in the reader's route std::map> writers; - for (const auto& id : writers_to_create) + for (const auto& id : writers_in_route) { - std::shared_ptr participant = participants_->get_participant(id); - const auto topic = create_topic_for_participant_nts_(participant); - writers[id] = participant->create_writer(*topic); + if (writers_.count(id)) + { + // The writer already exists. Add it to the reader's track. + writers[id] = writers_[id]; + } + else + { + // The writer doesn't exist + + // 3.1. Create the writer + auto writer = create_writer_nts_(id); + + // 3.2. Save the writer + writers_[id] = writer; + + // 3.3. Add the writer to the reader's track + writers[id] = writer; + } } - // Add the writers to the tracks they have routes for. - add_writers_to_tracks_nts_(writers); + // 4. Create the track + create_track_nts_(participant_id, reader, writers); } -void DdsBridge::create_writer( - const ParticipantId& participant_id) +void DdsBridge::remove_writer_and_its_tracks_nts_( + const ParticipantId& participant_id) noexcept { assert(participant_id != DEFAULT_PARTICIPANT_ID); - std::lock_guard lock(mutex_); + // 1. Remove the writer from the tracks and remove the tracks without writers + for (const auto& track_it : tracks_) + { + auto& track = track_it.second; - // Create the writer. - std::shared_ptr participant = participants_->get_participant(participant_id); - const auto topic = create_topic_for_participant_nts_(participant); - auto writer = participant->create_writer(*topic); + track->remove_writer(participant_id); - // Add the writer to the tracks it has routes for. - add_writer_to_tracks_nts_(participant_id, writer); + if (!track->has_writers()) + { + tracks_.erase(track_it.first); + } + } + + // 2. Remove the writer + writers_.erase(participant_id); } -void DdsBridge::remove_writer( +void DdsBridge::remove_reader_and_its_track_nts_( const ParticipantId& participant_id) noexcept { assert(participant_id != DEFAULT_PARTICIPANT_ID); - std::lock_guard lock(mutex_); + // 1. Find the writers in the reader's route + const auto& writers_in_route = writers_in_readers_route_nts_(participant_id); - for (auto it = tracks_.cbegin(), next_it = it; it != tracks_.cend(); it = next_it) + // 2. Remove the writers that don't belong to another track + for (const auto& writer_id : writers_in_route) { - ++next_it; + bool is_writer_in_another_track = false; - const auto& track = it->second; + for (const auto& track_it : tracks_) + { + if (track_it.first == participant_id) + { + continue; + } - // If the writer is in the track, remove it. - track->remove_writer(participant_id); + if (track_it.second->has_writer(writer_id)) + { + is_writer_in_another_track = true; + break; + } + } - if (!track->has_writers()) + if (!is_writer_in_another_track) { - // The track doesn't have any writers. Remove it. - tracks_.erase(it); + writers_.erase(writer_id); } } + + // 3. Remove the track + tracks_.erase(participant_id); } -void DdsBridge::add_writer_to_tracks_nts_( - const ParticipantId& participant_id, - std::shared_ptr& writer) +void DdsBridge::create_track_nts_( + const ParticipantId& id, + const std::shared_ptr& reader, + std::map>& writers) { - // Create the writer. - std::map> writers; - writers[participant_id] = writer; + tracks_[id] = std::make_unique( + topic_, + id, + std::move(reader), + std::move(writers), + payload_pool_, + thread_pool_); - // Add the writer to the tracks it has routes for. - add_writers_to_tracks_nts_(writers); + if (enabled_) + { + tracks_[id]->enable(); + } } -void DdsBridge::add_writers_to_tracks_nts_( - std::map>& writers) +std::shared_ptr DdsBridge::create_writer_nts_( + const ParticipantId& participant_id) { - // Add writers to the tracks of the readers in their route. - // If the readers in their route don't exist, create them with their tracks. - for (const ParticipantId& id : participants_->get_participants_ids()) - { - // Select the necessary writers - std::map> writers_of_track; + // Find the participant and the topic + std::shared_ptr participant = participants_->get_participant(participant_id); + const auto topic = create_topic_for_participant_nts_(participant); - const auto& routes_it = routes_.find(id); + // Create the writer + return participant->create_writer(*topic); +} - if (routes_it != routes_.end()) - { - // The reader has a route. Add only the writers in the route. - const auto& writers_in_route = routes_it->second; +std::shared_ptr DdsBridge::create_reader_nts_( + const ParticipantId& participant_id) +{ + // Find the participant and the topic + std::shared_ptr participant = participants_->get_participant(participant_id); + const auto topic = create_topic_for_participant_nts_(participant); - for (const auto& writer_id : writers_in_route) - { - if (writers.count(writer_id) >= 1) - { - writers_of_track[writer_id] = writers[writer_id]; - } - } - } - else - { - // The reader doesn't have a route. Add every writer (+ itself if repeater) - writers_of_track = writers; + // Create the reader + return participant->create_reader(*topic); +} - if (!participants_->get_participant(id)->is_repeater()) - { - // The participant is not a repeater. Do not add its writer. - writers_of_track.erase(id); - } - } +std::set DdsBridge::readers_in_writers_route_nts_(const ParticipantId& writer_id) +{ + std::set readers_in_route; - if (writers_of_track.size() == 0) + for (const ParticipantId& id : participants_->get_participants_ids()) + { + if (writer_id == id && !participants_->get_participant(id)->is_repeater()) { - // There are no writers in the route. There is nothing to do. Skip participant. continue; } - if (tracks_.count(id)) + const auto& routes_it = routes_.find(id); + + if (routes_it == routes_.end() || routes_it->second.count(writer_id)) { - // The track already exists. Add the writers to it. - for (const auto& writers_of_track_it : writers_of_track) - { - const auto& writer_id = writers_of_track_it.first; - const auto& writer = writers_of_track_it.second; - - if (!tracks_[id]->has_writer(writer_id)) - { - // Add the writer to the track - tracks_[id]->add_writer(writer_id, writer); - } - } + // Only add the reader if: + // 1. The reader doesn't have a route. + // 2. The writer is in the reader's route. + readers_in_route.insert(id); } - else + } + + return readers_in_route; +} + +std::set DdsBridge::writers_in_readers_route_nts_(const ParticipantId& reader_id) +{ + const auto& routes_it = routes_.find(reader_id); + + std::set writers_in_route; + + if (routes_it != routes_.end()) + { + // The reader has a route. Add only the writers in the route. + writers_in_route = routes_it->second; + } + else + { + // The reader doesn't have a route. Add every writer (+ itself if repeater). + writers_in_route = participants_->get_participants_ids(); + + if (!participants_->get_participant(reader_id)->is_repeater()) { - // The track doesn't exist. Create it. - std::shared_ptr participant = participants_->get_participant(id); - const auto topic = create_topic_for_participant_nts_(participant); - auto reader = participant->create_reader(*topic); - - tracks_[id] = std::make_unique( - topic_, - id, - std::move(reader), - std::move(writers_of_track), - payload_pool_, - thread_pool_); - - if (enabled_) - { - tracks_[id]->enable(); - } + // The participant is not a repeater. Do not add its writer. + writers_in_route.erase(reader_id); } } + + return writers_in_route; } utils::Heritable DdsBridge::create_topic_for_participant_nts_( diff --git a/ddspipe_core/src/cpp/communication/dds/Track.cpp b/ddspipe_core/src/cpp/communication/dds/Track.cpp index 1f964a03..cff433f4 100644 --- a/ddspipe_core/src/cpp/communication/dds/Track.cpp +++ b/ddspipe_core/src/cpp/communication/dds/Track.cpp @@ -68,7 +68,7 @@ Track::~Track() { logDebug(DDSPIPE_TRACK, "Destroying Track " << *this << "."); - // Disable reader and writers + // Disable the track disable(); // Unset callback on the Reader (this is needed as Reader will live longer than Track) @@ -131,11 +131,7 @@ void Track::disable() noexcept // Disabling Reader reader_->disable(); - // Disabling Writers - for (auto& writer_it : writers_) - { - writer_it.second->disable(); - } + // Don't disable writers, as they may be used by other tracks } } @@ -163,16 +159,16 @@ void Track::remove_writer( } bool Track::has_writer( - const ParticipantId& id) noexcept + const ParticipantId& id) const noexcept { std::lock_guard lock(track_mutex_); return writers_.count(id) != 0; } -bool Track::has_writers() noexcept +bool Track::has_writers() const noexcept { std::lock_guard lock(track_mutex_); - return writers_.size() > 0; + return !writers_.empty(); } bool Track::should_transmit_() noexcept @@ -265,8 +261,9 @@ void Track::transmit_() noexcept { EPROSIMA_LOG_WARNING( DDSPIPE_TRACK, - "Error writting data in Track " << topic_->serialize() + "Error writing data in Track " << *this << " for writer " << writer_it.second.get() + << " in participant " << writer_it.first << ". Error code " << ret << ". Skipping data for this writer and continue."); continue; diff --git a/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp b/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp index 5e3f8561..39d30008 100644 --- a/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp +++ b/ddspipe_core/src/cpp/configuration/DdsPipeConfiguration.cpp @@ -29,12 +29,6 @@ namespace core { bool DdsPipeConfiguration::is_valid( utils::Formatter& error_msg) const noexcept { - if (remove_unused_entities && discovery_trigger != DiscoveryTrigger::READER) - { - error_msg << "A discovery-trigger different from reader is incompatible with remove-unused-entities."; - return false; - } - return routes.is_valid(error_msg) && topic_routes.is_valid(error_msg); } diff --git a/ddspipe_core/src/cpp/core/DdsPipe.cpp b/ddspipe_core/src/cpp/core/DdsPipe.cpp index 5a4eb898..a0a6398e 100644 --- a/ddspipe_core/src/cpp/core/DdsPipe.cpp +++ b/ddspipe_core/src/cpp/core/DdsPipe.cpp @@ -292,7 +292,9 @@ void DdsPipe::discovered_endpoint_nts_( } else if (is_endpoint_relevant_(endpoint)) { - discovered_topic_nts_(utils::Heritable::make_heritable(endpoint.topic)); + discovered_topic_nts_( + utils::Heritable::make_heritable(endpoint.topic), + endpoint.kind); } } @@ -320,7 +322,7 @@ void DdsPipe::removed_endpoint_nts_( if (it_bridge != bridges_.end() && endpoint.discoverer_participant_id != DEFAULT_PARTICIPANT_ID) { - it_bridge->second->remove_writer(endpoint.discoverer_participant_id); + it_bridge->second->remove_endpoint(endpoint.discoverer_participant_id, endpoint.kind); } } } @@ -412,7 +414,8 @@ void DdsPipe::init_bridges_nts_( } void DdsPipe::discovered_topic_nts_( - const utils::Heritable& topic) noexcept + const utils::Heritable& topic, + const EndpointKind& endpoint_kind /*= EndpointKind::reader*/) noexcept { EPROSIMA_LOG_INFO(DDSPIPE, "Discovered topic: " << topic << " by: " << topic->topic_discoverer() << "."); @@ -427,13 +430,12 @@ void DdsPipe::discovered_topic_nts_( // If Pipe is enabled and topic allowed, activate it if (enabled_ && allowed_topics_->is_topic_allowed(*topic)) { - activate_topic_nts_(topic); + activate_topic_nts_(topic, endpoint_kind); } } else if (configuration_.remove_unused_entities && topic->topic_discoverer() != DEFAULT_PARTICIPANT_ID) { - // The bridge already exists. Create a writer in the participant who discovered it. - it_bridge->second->create_writer(topic->topic_discoverer()); + it_bridge->second->create_endpoint(topic->topic_discoverer(), endpoint_kind); } } @@ -485,7 +487,8 @@ void DdsPipe::removed_service_nts_( void DdsPipe::create_new_bridge_nts_( const utils::Heritable& topic, - bool enabled /*= false*/) noexcept + bool enabled, /*= false*/ + const EndpointKind& endpoint_kind /*= EndpointKind::reader*/) noexcept { EPROSIMA_LOG_INFO(DDSPIPE, "Creating Bridge for topic: " << topic << "."); @@ -501,7 +504,8 @@ void DdsPipe::create_new_bridge_nts_( thread_pool_, routes_config, configuration_.remove_unused_entities, - manual_topics); + manual_topics, + endpoint_kind); if (enabled) { @@ -528,7 +532,8 @@ void DdsPipe::create_new_service_nts_( } void DdsPipe::activate_topic_nts_( - const utils::Heritable& topic) noexcept + const utils::Heritable& topic, + const EndpointKind& endpoint_kind /*= EndpointKind::reader*/) noexcept { EPROSIMA_LOG_INFO(DDSPIPE, "Activating topic: " << topic << "."); @@ -541,7 +546,7 @@ void DdsPipe::activate_topic_nts_( if (it_bridge == bridges_.end()) { // The Bridge did not exist - create_new_bridge_nts_(topic, true); + create_new_bridge_nts_(topic, true, endpoint_kind); } else { diff --git a/ddspipe_participants/src/cpp/writer/dynamic_types/SchemaWriter.cpp b/ddspipe_participants/src/cpp/writer/dynamic_types/SchemaWriter.cpp index a678610d..f1e5a025 100644 --- a/ddspipe_participants/src/cpp/writer/dynamic_types/SchemaWriter.cpp +++ b/ddspipe_participants/src/cpp/writer/dynamic_types/SchemaWriter.cpp @@ -64,7 +64,7 @@ utils::ReturnCode SchemaWriter::write_nts_( { EPROSIMA_LOG_WARNING( DDSPIPE_SCHEMA_WRITER, - "Error writting data in topic " << topic_ << " : <" << e.what() << ">."); + "Error writing data in topic " << topic_ << " : <" << e.what() << ">."); return utils::ReturnCode::RETCODE_ERROR; } From ddae53bc5cad1b16346538308317d241b9b60335 Mon Sep 17 00:00:00 2001 From: tempate Date: Wed, 17 Jan 2024 10:06:47 +0100 Subject: [PATCH 2/5] Generate the routes outside of the DdsBridge Signed-off-by: tempate --- .../communication/dds/DdsBridge.hpp | 3 +- .../configuration/RoutesConfiguration.hpp | 8 ++ .../src/cpp/communication/dds/DdsBridge.cpp | 60 ++----------- .../cpp/configuration/RoutesConfiguration.cpp | 85 ++++++++++++++++++- 4 files changed, 99 insertions(+), 57 deletions(-) diff --git a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp index 835cd9d1..68f0a4b1 100644 --- a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp +++ b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp @@ -182,7 +182,8 @@ class DdsBridge : public Bridge utils::Heritable topic_; //! Routes associated to the Topic. - RoutesConfiguration::RoutesMap routes_; + RoutesConfiguration::RoutesMap routes_of_readers_; + RoutesConfiguration::RoutesMap routes_of_writers_; //! Topics that explicitally set a QoS attribute for this participant. std::vector manual_topics_; diff --git a/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp b/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp index a4ec74db..dbf6851c 100644 --- a/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp +++ b/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp @@ -56,6 +56,14 @@ struct RoutesConfiguration : public IConfiguration utils::Formatter& error_msg, const std::map& participants) const noexcept; + DDSPIPE_CORE_DllAPI + RoutesMap routes_of_readers( + const std::map& participant_ids) const noexcept; + + DDSPIPE_CORE_DllAPI + RoutesMap routes_of_writers( + const std::map& participant_ids) const noexcept; + ///////////////////////// // OPERATORS ///////////////////////// diff --git a/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp b/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp index db404198..af85c0d4 100644 --- a/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp +++ b/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp @@ -38,7 +38,8 @@ DdsBridge::DdsBridge( { logDebug(DDSPIPE_DDSBRIDGE, "Creating DdsBridge " << *this << "."); - routes_ = routes_config(); + routes_of_readers_ = routes_config.routes_of_readers(participants_database->get_participants_repeater_map()); + routes_of_writers_ = routes_config.routes_of_writers(participants_database->get_participants_repeater_map()); if (remove_unused_entities && topic->topic_discoverer() != DEFAULT_PARTICIPANT_ID) { @@ -144,7 +145,7 @@ void DdsBridge::create_writer_and_its_tracks_nts_( writers_[participant_id] = writer; // 2. Find the readers in the writer's route - const auto& readers_in_route = readers_in_writers_route_nts_(participant_id); + const auto& readers_in_route = routes_of_writers_[participant_id]; // 3. Find or create the tracks in the writer's route for (const auto& id : readers_in_route) @@ -180,7 +181,7 @@ void DdsBridge::create_reader_and_its_track_nts_( auto reader = create_reader_nts_(participant_id); // 2. Find the writers in the reader's route - const auto& writers_in_route = writers_in_readers_route_nts_(participant_id); + const auto& writers_in_route = routes_of_readers_[participant_id]; // 3. Find or create the writers in the reader's route std::map> writers; @@ -239,7 +240,7 @@ void DdsBridge::remove_reader_and_its_track_nts_( assert(participant_id != DEFAULT_PARTICIPANT_ID); // 1. Find the writers in the reader's route - const auto& writers_in_route = writers_in_readers_route_nts_(participant_id); + const auto& writers_in_route = routes_of_readers_[participant_id]; // 2. Remove the writers that don't belong to another track for (const auto& writer_id : writers_in_route) @@ -311,57 +312,6 @@ std::shared_ptr DdsBridge::create_reader_nts_( return participant->create_reader(*topic); } -std::set DdsBridge::readers_in_writers_route_nts_(const ParticipantId& writer_id) -{ - std::set readers_in_route; - - for (const ParticipantId& id : participants_->get_participants_ids()) - { - if (writer_id == id && !participants_->get_participant(id)->is_repeater()) - { - continue; - } - - const auto& routes_it = routes_.find(id); - - if (routes_it == routes_.end() || routes_it->second.count(writer_id)) - { - // Only add the reader if: - // 1. The reader doesn't have a route. - // 2. The writer is in the reader's route. - readers_in_route.insert(id); - } - } - - return readers_in_route; -} - -std::set DdsBridge::writers_in_readers_route_nts_(const ParticipantId& reader_id) -{ - const auto& routes_it = routes_.find(reader_id); - - std::set writers_in_route; - - if (routes_it != routes_.end()) - { - // The reader has a route. Add only the writers in the route. - writers_in_route = routes_it->second; - } - else - { - // The reader doesn't have a route. Add every writer (+ itself if repeater). - writers_in_route = participants_->get_participants_ids(); - - if (!participants_->get_participant(reader_id)->is_repeater()) - { - // The participant is not a repeater. Do not add its writer. - writers_in_route.erase(reader_id); - } - } - - return writers_in_route; -} - utils::Heritable DdsBridge::create_topic_for_participant_nts_( const std::shared_ptr& participant) noexcept { diff --git a/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp b/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp index 59dae06e..c8e0117f 100644 --- a/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp +++ b/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp @@ -40,7 +40,7 @@ bool RoutesConfiguration::is_valid( bool RoutesConfiguration::is_valid( utils::Formatter& error_msg, - const std::map& participant_ids) const noexcept + const std::map& participant_ids) const noexcept { if (!is_valid(error_msg)) { @@ -101,6 +101,89 @@ RoutesConfiguration::RoutesMap RoutesConfiguration::operator ()() const return routes; } +RoutesConfiguration::RoutesMap RoutesConfiguration::routes_of_readers( + const std::map& participant_ids) const noexcept +{ + static RoutesConfiguration::RoutesMap readers_routes; + + if (!readers_routes.empty()) + { + return readers_routes; + } + + for (const auto& it : participant_ids) + { + const auto& reader_id = it.first; + const auto& is_repeater = it.second; + + const auto& routes_it = routes.find(reader_id); + + if (routes_it != routes.end()) + { + // The reader has a route. Add only the writers in the route. + readers_routes[reader_id] = routes_it->second; + } + else + { + // The reader doesn't have a route. Add every writer (+ itself if repeater). + for (const auto& it : participant_ids) + { + const auto& writer_id = it.first; + + if (reader_id != writer_id || is_repeater) + { + readers_routes[reader_id].insert(writer_id); + } + } + } + } + + return readers_routes; +} + +RoutesConfiguration::RoutesMap RoutesConfiguration::routes_of_writers( + const std::map& participant_ids) const noexcept +{ + static RoutesConfiguration::RoutesMap writers_routes; + + if (!writers_routes.empty()) + { + return writers_routes; + } + + for (const auto& it : participant_ids) + { + const auto& reader_id = it.first; + const auto& is_repeater = it.second; + + const auto& routes_it = routes.find(reader_id); + + if (routes_it != routes.end()) + { + // The reader has a route. Add only the writers in the route. + for (const auto& writer_id : routes_it->second) + { + writers_routes[writer_id].insert(reader_id); + } + } + else + { + // The reader doesn't have a route. Add every writer (+ itself if repeater). + for (const auto& it : participant_ids) + { + const auto& writer_id = it.first; + + if (reader_id != writer_id || is_repeater) + { + writers_routes[writer_id].insert(reader_id); + } + } + } + } + + return writers_routes; +} + } /* namespace core */ } /* namespace ddspipe */ } /* namespace eprosima */ From e77bab79445db3b1009646e8501c08327f1cff63 Mon Sep 17 00:00:00 2001 From: tempate Date: Wed, 17 Jan 2024 10:07:34 +0100 Subject: [PATCH 3/5] Uncrustify Signed-off-by: tempate --- .../ddspipe_core/communication/dds/DdsBridge.hpp | 4 ++-- ddspipe_core/src/cpp/communication/dds/Track.cpp | 10 +++++----- .../src/cpp/configuration/RoutesConfiguration.cpp | 4 ++-- ddspipe_core/src/cpp/core/DdsPipe.cpp | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp index 68f0a4b1..8556ad4b 100644 --- a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp +++ b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp @@ -152,11 +152,11 @@ class DdsBridge : public Bridge // TODO std::shared_ptr create_writer_nts_( - const types::ParticipantId& participant_id); + const types::ParticipantId& participant_id); // TODO std::shared_ptr create_reader_nts_( - const types::ParticipantId& participant_id); + const types::ParticipantId& participant_id); // TODO std::set readers_in_writers_route_nts_( diff --git a/ddspipe_core/src/cpp/communication/dds/Track.cpp b/ddspipe_core/src/cpp/communication/dds/Track.cpp index cff433f4..e44e531f 100644 --- a/ddspipe_core/src/cpp/communication/dds/Track.cpp +++ b/ddspipe_core/src/cpp/communication/dds/Track.cpp @@ -261,11 +261,11 @@ void Track::transmit_() noexcept { EPROSIMA_LOG_WARNING( DDSPIPE_TRACK, - "Error writing data in Track " << *this - << " for writer " << writer_it.second.get() - << " in participant " << writer_it.first - << ". Error code " << ret - << ". Skipping data for this writer and continue."); + "Error writing data in Track " << *this + << " for writer " << writer_it.second.get() + << " in participant " << writer_it.first + << ". Error code " << ret + << ". Skipping data for this writer and continue."); continue; } } diff --git a/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp b/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp index c8e0117f..0a366851 100644 --- a/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp +++ b/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp @@ -102,7 +102,7 @@ RoutesConfiguration::RoutesMap RoutesConfiguration::operator ()() const } RoutesConfiguration::RoutesMap RoutesConfiguration::routes_of_readers( - const std::map& participant_ids) const noexcept + const std::map& participant_ids) const noexcept { static RoutesConfiguration::RoutesMap readers_routes; @@ -142,7 +142,7 @@ RoutesConfiguration::RoutesMap RoutesConfiguration::routes_of_readers( } RoutesConfiguration::RoutesMap RoutesConfiguration::routes_of_writers( - const std::map& participant_ids) const noexcept + const std::map& participant_ids) const noexcept { static RoutesConfiguration::RoutesMap writers_routes; diff --git a/ddspipe_core/src/cpp/core/DdsPipe.cpp b/ddspipe_core/src/cpp/core/DdsPipe.cpp index a0a6398e..46d491a4 100644 --- a/ddspipe_core/src/cpp/core/DdsPipe.cpp +++ b/ddspipe_core/src/cpp/core/DdsPipe.cpp @@ -293,8 +293,8 @@ void DdsPipe::discovered_endpoint_nts_( else if (is_endpoint_relevant_(endpoint)) { discovered_topic_nts_( - utils::Heritable::make_heritable(endpoint.topic), - endpoint.kind); + utils::Heritable::make_heritable(endpoint.topic), + endpoint.kind); } } From e00db76bb1e31b8415e5d3894f045a349cbb4ddb Mon Sep 17 00:00:00 2001 From: tempate Date: Wed, 10 Apr 2024 16:03:25 +0200 Subject: [PATCH 4/5] Apply suggestions Signed-off-by: tempate --- .../communication/dds/DdsBridge.hpp | 70 ++++++++++---- .../configuration/RoutesConfiguration.hpp | 18 +++- .../src/cpp/communication/dds/DdsBridge.cpp | 92 ++++++------------- .../cpp/configuration/RoutesConfiguration.cpp | 4 +- 4 files changed, 97 insertions(+), 87 deletions(-) diff --git a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp index 8556ad4b..474d4ef8 100644 --- a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp +++ b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp @@ -88,13 +88,33 @@ class DdsBridge : public Bridge DDSPIPE_CORE_DllAPI void disable() noexcept override; - // TODO + /** + * Create a new endpoint in the bridge. + * + * It will call either \c create_writer_and_its_tracks_nts_ or \c create_reader_and_its_track_nts_ depending on the + * \c discovered_endpoint_kind. + * + * @param participant_id: The id of the participant who is creating the endpoint. + * @param discovered_endpoint_kind: The kind of endpoint that has been discovered. + * + * Thread safe. + */ DDSPIPE_CORE_DllAPI void create_endpoint( const types::ParticipantId& participant_id, const types::EndpointKind& discovered_endpoint_kind); - // TODO + /** + * Remove an endpoint from the bridge. + * + * It will call either \c remove_writer_and_its_tracks_nts_ or \c remove_reader_and_its_track_nts_ depending on the + * \c removed_endpoint_kind. + * + * @param participant_id: The id of the participant who is removing the endpoint. + * @param removed_endpoint_kind: The kind of endpoint that has been removed. + * + * Thread safe. + */ DDSPIPE_CORE_DllAPI void remove_endpoint( const types::ParticipantId& participant_id, @@ -105,7 +125,7 @@ class DdsBridge : public Bridge /** * Create an IWriter for the new participant. * Create the IReaders in the IWriter's route. - * Create the Tracks of the IReaderes with the IWriter. + * Create the Tracks of the IReaders with the IWriter. * * @param participant_id: The id of the participant who is creating the writer. * @@ -127,8 +147,8 @@ class DdsBridge : public Bridge const types::ParticipantId& participant_id); /** - * Remove the IWriter from all the Tracks in the bridge. - * Remove the IReaders and Tracks that don't have any IWriters. + * Remove the IWriter from all the Tracks. + * Remove the IReaders and Tracks without IWriters. * * @param participant_id: The id of the participant who is removing the writer. */ @@ -136,36 +156,46 @@ class DdsBridge : public Bridge const types::ParticipantId& participant_id) noexcept; /** - * Remove the IReader and its Track from the bridge. - * Remove the IWriters that don't belong to a Track. + * Remove the IReader and its Track. + * Remove the IWriters that no longer belong to a Track. * * @param participant_id: The id of the participant who is removing the reader. */ void remove_reader_and_its_track_nts_( const types::ParticipantId& participant_id) noexcept; - // TODO + /** + * @brief Create a Track for an IReader and its IWriters. + * + * @param id: The id of the participant who is creating the track. + * @param reader: The IReader of the track. + * @param writers: The IWriters of the track. + */ void create_track_nts_( const types::ParticipantId& id, const std::shared_ptr& reader, std::map>& writers); - // TODO + /** + * @brief Create an IWriter for a participant in the topic. + * + * A tailored Topic is created for the participant, depending on the QoS configured for it. + * + * @param participant_id: The id of the participant who is creating the writer. + */ std::shared_ptr create_writer_nts_( const types::ParticipantId& participant_id); - // TODO + /** + * @brief Create an IReader for a participant in the topic. + * + * A tailored Topic is created for the participant, depending on the QoS configured for it. + * + * @param participant_id: The id of the participant who is creating the reader. + */ std::shared_ptr create_reader_nts_( const types::ParticipantId& participant_id); - // TODO - std::set readers_in_writers_route_nts_( - const types::ParticipantId& writer_id); - - // TODO - std::set writers_in_readers_route_nts_( - const types::ParticipantId& reader_id); - /** * @brief Impose the Topic QoS that have been pre-configured for a participant. * @@ -182,8 +212,8 @@ class DdsBridge : public Bridge utils::Heritable topic_; //! Routes associated to the Topic. - RoutesConfiguration::RoutesMap routes_of_readers_; - RoutesConfiguration::RoutesMap routes_of_writers_; + RoutesConfiguration::RoutesMap writers_in_route_; + RoutesConfiguration::RoutesMap readers_in_route_; //! Topics that explicitally set a QoS attribute for this participant. std::vector manual_topics_; diff --git a/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp b/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp index dbf6851c..f6f44cab 100644 --- a/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp +++ b/ddspipe_core/include/ddspipe_core/configuration/RoutesConfiguration.hpp @@ -48,7 +48,7 @@ struct RoutesConfiguration : public IConfiguration ///////////////////////// DDSPIPE_CORE_DllAPI - virtual bool is_valid( + bool is_valid( utils::Formatter& error_msg) const noexcept override; DDSPIPE_CORE_DllAPI @@ -56,10 +56,26 @@ struct RoutesConfiguration : public IConfiguration utils::Formatter& error_msg, const std::map& participants) const noexcept; + /** + * @brief Returns the writers in each reader's route. + * + * It returns a map with the readers as keys and the writers in each reader's route as values. + * + * The first time this method is called, it will calculate the routes and store them in the object. + * Subsequent calls will return the stored routes. + */ DDSPIPE_CORE_DllAPI RoutesMap routes_of_readers( const std::map& participant_ids) const noexcept; + /** + * @brief Returns the readers in each writer's route. + * + * It returns a map with the writers as keys and the readers in each writer's route as values. + * + * The first time this method is called, it will calculate the routes and store them in the object. + * Subsequent calls will return the stored routes. + */ DDSPIPE_CORE_DllAPI RoutesMap routes_of_writers( const std::map& participant_ids) const noexcept; diff --git a/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp b/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp index af85c0d4..c01d54bd 100644 --- a/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp +++ b/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include #include @@ -38,8 +40,10 @@ DdsBridge::DdsBridge( { logDebug(DDSPIPE_DDSBRIDGE, "Creating DdsBridge " << *this << "."); - routes_of_readers_ = routes_config.routes_of_readers(participants_database->get_participants_repeater_map()); - routes_of_writers_ = routes_config.routes_of_writers(participants_database->get_participants_repeater_map()); + const auto& participants_repeater_map = participants_database->get_participants_repeater_map(); + + writers_in_route_ = routes_config.routes_of_readers(participants_repeater_map); + readers_in_route_ = routes_config.routes_of_writers(participants_repeater_map); if (remove_unused_entities && topic->topic_discoverer() != DEFAULT_PARTICIPANT_ID) { @@ -49,7 +53,6 @@ DdsBridge::DdsBridge( { for (const ParticipantId& id : participants_->get_participants_ids()) { - std::lock_guard lock(mutex_); create_reader_and_its_track_nts_(id); } } @@ -138,17 +141,14 @@ void DdsBridge::create_writer_and_its_tracks_nts_( { assert(participant_id != DEFAULT_PARTICIPANT_ID); - // 1. Create the writer + // Create the writer auto writer = create_writer_nts_(participant_id); // Save the writer writers_[participant_id] = writer; - // 2. Find the readers in the writer's route - const auto& readers_in_route = routes_of_writers_[participant_id]; - - // 3. Find or create the tracks in the writer's route - for (const auto& id : readers_in_route) + // Find or create the tracks in the writer's route + for (const auto& id : readers_in_route_[participant_id]) { if (tracks_.count(id)) { @@ -157,16 +157,12 @@ void DdsBridge::create_writer_and_its_tracks_nts_( } else { - // The track doesn't exist - - // 3.1. Create the reader + // The track doesn't exist. Create it. auto reader = create_reader_nts_(id); - // 3.2. Create a writers set from the writer std::map> writers; writers[participant_id] = writer; - // 3.3. Create the track create_track_nts_(id, reader, writers); } } @@ -177,39 +173,21 @@ void DdsBridge::create_reader_and_its_track_nts_( { assert(participant_id != DEFAULT_PARTICIPANT_ID); - // 1. Create the reader + // Create the reader auto reader = create_reader_nts_(participant_id); - // 2. Find the writers in the reader's route - const auto& writers_in_route = routes_of_readers_[participant_id]; - - // 3. Find or create the writers in the reader's route - std::map> writers; - - for (const auto& id : writers_in_route) + // Create the missing writers in the reader's route + for (const auto& id : writers_in_route_[participant_id]) { - if (writers_.count(id)) - { - // The writer already exists. Add it to the reader's track. - writers[id] = writers_[id]; - } - else + if (writers_.count(id) == 0) { - // The writer doesn't exist - - // 3.1. Create the writer - auto writer = create_writer_nts_(id); - - // 3.2. Save the writer - writers_[id] = writer; - - // 3.3. Add the writer to the reader's track - writers[id] = writer; + // The writer doesn't exist. Create it. + writers_[id] = create_writer_nts_(id); } } - // 4. Create the track - create_track_nts_(participant_id, reader, writers); + // Create the track + create_track_nts_(participant_id, reader, writers_); } void DdsBridge::remove_writer_and_its_tracks_nts_( @@ -217,7 +195,7 @@ void DdsBridge::remove_writer_and_its_tracks_nts_( { assert(participant_id != DEFAULT_PARTICIPANT_ID); - // 1. Remove the writer from the tracks and remove the tracks without writers + // Remove the writer from the tracks and remove the tracks without writers for (const auto& track_it : tracks_) { auto& track = track_it.second; @@ -230,7 +208,7 @@ void DdsBridge::remove_writer_and_its_tracks_nts_( } } - // 2. Remove the writer + // Remove the writer writers_.erase(participant_id); } @@ -239,35 +217,21 @@ void DdsBridge::remove_reader_and_its_track_nts_( { assert(participant_id != DEFAULT_PARTICIPANT_ID); - // 1. Find the writers in the reader's route - const auto& writers_in_route = routes_of_readers_[participant_id]; - - // 2. Remove the writers that don't belong to another track - for (const auto& writer_id : writers_in_route) + // Remove the writers that don't belong to another track + for (const auto& writer_id : writers_in_route_[participant_id]) { - bool is_writer_in_another_track = false; - - for (const auto& track_it : tracks_) - { - if (track_it.first == participant_id) - { - continue; - } - - if (track_it.second->has_writer(writer_id)) - { - is_writer_in_another_track = true; - break; - } - } + const auto& different_track_doesnt_contain_writer = [&](const auto& track_it) + { + return track_it.first == participant_id || !track_it.second->has_writer(writer_id); + }; - if (!is_writer_in_another_track) + if (std::all_of(tracks_.begin(), tracks_.end(), different_track_doesnt_contain_writer)) { writers_.erase(writer_id); } } - // 3. Remove the track + // Remove the track tracks_.erase(participant_id); } diff --git a/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp b/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp index 0a366851..ecba2383 100644 --- a/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp +++ b/ddspipe_core/src/cpp/configuration/RoutesConfiguration.cpp @@ -160,7 +160,7 @@ RoutesConfiguration::RoutesMap RoutesConfiguration::routes_of_writers( if (routes_it != routes.end()) { - // The reader has a route. Add only the writers in the route. + // The reader has a route. Add the reader to the route of the writers in its route. for (const auto& writer_id : routes_it->second) { writers_routes[writer_id].insert(reader_id); @@ -168,7 +168,7 @@ RoutesConfiguration::RoutesMap RoutesConfiguration::routes_of_writers( } else { - // The reader doesn't have a route. Add every writer (+ itself if repeater). + // The reader doesn't have a route. Add the reader to the route of every writer (+ itself if repeater). for (const auto& it : participant_ids) { const auto& writer_id = it.first; From 0789d9586da41299470db4036204a284d6c636f0 Mon Sep 17 00:00:00 2001 From: tempate Date: Mon, 15 Jul 2024 16:27:35 +0200 Subject: [PATCH 5/5] Apply suggestions Signed-off-by: tempate --- .../communication/dds/DdsBridge.hpp | 8 +++- .../include/ddspipe_core/core/DdsPipe.hpp | 7 +++- .../src/cpp/communication/dds/DdsBridge.cpp | 39 ++++++++++++------- ddspipe_core/src/cpp/core/DdsPipe.cpp | 12 +++--- 4 files changed, 41 insertions(+), 25 deletions(-) diff --git a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp index 474d4ef8..1fadfb64 100644 --- a/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp +++ b/ddspipe_core/include/ddspipe_core/communication/dds/DdsBridge.hpp @@ -18,7 +18,6 @@ #include #include -#include #include #include #include @@ -52,7 +51,10 @@ class DdsBridge : public Bridge * @param participant_database: Collection of Participants to manage communication * @param payload_pool: Payload Pool that handles the reservation/release of payloads throughout the DDS Router * @param thread_pool: Shared pool of threads in charge of data transmission. - * @param enable: Whether the Bridge should be initialized as enabled + * @param routes_config: Configuration of the routes of the Bridge + * @param remove_unused_entities: Whether the Bridge should remove unused entities + * @param manual_topics: Topics that explicitally set a QoS attribute for this participant + * @param endpoint_kind: Kind of the endpoint that discovered the topic * * @throw InitializationException in case \c IWriters or \c IReaders creation fails. */ @@ -200,6 +202,8 @@ class DdsBridge : public Bridge * @brief Impose the Topic QoS that have been pre-configured for a participant. * * First, it imposes the Topic QoS configured at \c manual_topics and then the ones configured at \c participants. + * + * @param participant: The participant to impose the QoS on. */ utils::Heritable create_topic_for_participant_nts_( const std::shared_ptr& participant) noexcept; diff --git a/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp b/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp index d6471f82..36322db3 100644 --- a/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp +++ b/ddspipe_core/include/ddspipe_core/core/DdsPipe.hpp @@ -261,6 +261,7 @@ class DdsPipe * @note This is the only method that adds topics to \c current_topics_ * * @param [in] topic : topic discovered + * @param [in] endpoint_kind : kind of the endpoint */ void discovered_topic_nts_( const utils::Heritable& topic, @@ -303,11 +304,12 @@ class DdsPipe * It is created enabled if the DdsPipe is enabled. * * @param [in] topic : new topic + * @param [in] enabled : whether to enable the bridge on creation or not */ void create_new_bridge_nts_( const utils::Heritable& topic, - bool enabled = false, - const types::EndpointKind& endpoint_kind = types::EndpointKind::reader) noexcept; + const types::EndpointKind endpoint_kind = types::EndpointKind::reader, + bool enabled = false) noexcept; /** * @brief Create a new \c RpcBridge object @@ -325,6 +327,7 @@ class DdsPipe * If the topic did not exist before, the Bridge is created. * * @param [in] topic : Topic to be enabled + * @param [in] endpoint_kind : Kind of endpoint who discovered the topic */ void activate_topic_nts_( const utils::Heritable& topic, diff --git a/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp b/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp index c01d54bd..46b81cdc 100644 --- a/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp +++ b/ddspipe_core/src/cpp/communication/dds/DdsBridge.cpp @@ -110,13 +110,17 @@ void DdsBridge::create_endpoint( { std::lock_guard lock(mutex_); - if (discovered_endpoint_kind == EndpointKind::reader) + switch (discovered_endpoint_kind) { - create_writer_and_its_tracks_nts_(participant_id); - } - else - { - create_reader_and_its_track_nts_(participant_id); + case EndpointKind::reader: + create_writer_and_its_tracks_nts_(participant_id); + break; + case EndpointKind::writer: + create_reader_and_its_track_nts_(participant_id); + break; + default: + logError(DDSPIPE_DDSBRIDGE, "Invalid kind " << discovered_endpoint_kind << " to create an endpoint."); + break; } } @@ -126,13 +130,17 @@ void DdsBridge::remove_endpoint( { std::lock_guard lock(mutex_); - if (removed_endpoint_kind == EndpointKind::reader) - { - remove_writer_and_its_tracks_nts_(participant_id); - } - else + switch (removed_endpoint_kind) { - remove_reader_and_its_track_nts_(participant_id); + case EndpointKind::reader: + remove_writer_and_its_tracks_nts_(participant_id); + break; + case EndpointKind::writer: + remove_reader_and_its_track_nts_(participant_id); + break; + default: + logError(DDSPIPE_DDSBRIDGE, "Invalid kind " << removed_endpoint_kind << " to remove an endpoint."); + break; } } @@ -196,15 +204,16 @@ void DdsBridge::remove_writer_and_its_tracks_nts_( assert(participant_id != DEFAULT_PARTICIPANT_ID); // Remove the writer from the tracks and remove the tracks without writers - for (const auto& track_it : tracks_) + for (auto it = tracks_.cbegin(), next_it = it; it != tracks_.cend(); it = next_it) { - auto& track = track_it.second; + ++next_it; + const auto& track = it->second; track->remove_writer(participant_id); if (!track->has_writers()) { - tracks_.erase(track_it.first); + tracks_.erase(it); } } diff --git a/ddspipe_core/src/cpp/core/DdsPipe.cpp b/ddspipe_core/src/cpp/core/DdsPipe.cpp index 46d491a4..5eb292b9 100644 --- a/ddspipe_core/src/cpp/core/DdsPipe.cpp +++ b/ddspipe_core/src/cpp/core/DdsPipe.cpp @@ -391,13 +391,13 @@ bool DdsPipe::is_endpoint_relevant_( if (endpoint.active) { - // An active reader is relevant when it is the only active reader in a topic + // An active endpoint is relevant when it is the only active endpoint in a topic // with a discoverer participant id. return relevant_endpoints.size() == 1 && relevant_endpoints.count(endpoint.guid); } else { - // An inactive reader is relevant when there aren't any active readers in a topic + // An inactive endpoint is relevant when there aren't any active endpoints in a topic // with a discoverer participant id. return relevant_endpoints.size() == 0; } @@ -409,7 +409,7 @@ void DdsPipe::init_bridges_nts_( for (const auto& topic : builtin_topics) { discovered_topic_nts_(topic); - create_new_bridge_nts_(topic, false); + create_new_bridge_nts_(topic, EndpointKind::reader, false); } } @@ -487,8 +487,8 @@ void DdsPipe::removed_service_nts_( void DdsPipe::create_new_bridge_nts_( const utils::Heritable& topic, - bool enabled, /*= false*/ - const EndpointKind& endpoint_kind /*= EndpointKind::reader*/) noexcept + const EndpointKind endpoint_kind /*= EndpointKind::reader*/, + bool enabled /*= false*/) noexcept { EPROSIMA_LOG_INFO(DDSPIPE, "Creating Bridge for topic: " << topic << "."); @@ -546,7 +546,7 @@ void DdsPipe::activate_topic_nts_( if (it_bridge == bridges_.end()) { // The Bridge did not exist - create_new_bridge_nts_(topic, true, endpoint_kind); + create_new_bridge_nts_(topic, endpoint_kind, true); } else {