diff --git a/include/fastdds/rtps/resources/ResourceEvent.h b/include/fastdds/rtps/resources/ResourceEvent.h index e39f8b147be..3709f3ff120 100644 --- a/include/fastdds/rtps/resources/ResourceEvent.h +++ b/include/fastdds/rtps/resources/ResourceEvent.h @@ -25,8 +25,9 @@ #include #include -#include #include +#include +#include #include namespace eprosima { @@ -49,8 +50,12 @@ class ResourceEvent /*! * @brief Method to initialize the internal thread. + * + * @param[in] configure_cb Function to be called in the context of the started thread + * before calling the internal service routine. */ - void init_thread(); + void init_thread( + std::function configure_cb = {}); void stop_thread(); diff --git a/src/cpp/fastdds/log/Log.cpp b/src/cpp/fastdds/log/Log.cpp index fa568b4de37..27e37ff53b4 100644 --- a/src/cpp/fastdds/log/Log.cpp +++ b/src/cpp/fastdds/log/Log.cpp @@ -26,6 +26,7 @@ #include #include #include +#include namespace eprosima { namespace fastdds { @@ -258,6 +259,8 @@ struct LogResources void run() { + set_name_to_current_thread("dds.log"); + std::unique_lock guard(cv_mutex_); while (logging_) diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.cpp b/src/cpp/rtps/DataSharing/DataSharingListener.cpp index 0e4f0dbc955..a358767d317 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -49,6 +50,8 @@ DataSharingListener::~DataSharingListener() void DataSharingListener::run() { + set_name_to_current_thread("dds.dsha.%u", reader_->getGuid().entityId.to_uint32() & 0x0000FFFF); + std::unique_lock lock(notification_->notification_->notification_mutex, std::defer_lock); while (is_running_.load()) { diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index 1bd25b69c33..6e524dd8e7b 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -54,6 +54,8 @@ #include +#include + namespace eprosima { namespace fastdds { namespace rtps { @@ -160,7 +162,11 @@ bool PDPServer::init( getRTPSParticipant()->enableReader(edp->publications_reader_.first); // Initialize server dedicated thread. - resource_event_thread_.init_thread(); + uint32_t id_for_thread = static_cast(getRTPSParticipant()->getRTPSParticipantAttributes().participantID); + resource_event_thread_.init_thread([id_for_thread]() + { + set_name_to_current_thread("dds.ds_ev.%u", id_for_thread); + }); /* Given the fact that a participant is either a client or a server the diff --git a/src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp b/src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp index 31bd130cb2e..e1e3b9f5464 100644 --- a/src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp +++ b/src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp @@ -25,26 +25,26 @@ void FlowControllerFactory::init( pure_sync_flow_controller_name, std::unique_ptr( new FlowControllerImpl(participant_, nullptr)))); + FlowControllerFifoSchedule>(participant_, nullptr, 0)))); // SyncFlowController -> used by rest of besteffort writers. flow_controllers_.insert(decltype(flow_controllers_)::value_type( sync_flow_controller_name, std::unique_ptr( new FlowControllerImpl(participant_, nullptr)))); + FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++)))); // AsyncFlowController flow_controllers_.insert(decltype(flow_controllers_)::value_type( async_flow_controller_name, std::unique_ptr( new FlowControllerImpl(participant_, nullptr)))); + FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++)))); #ifdef FASTDDS_STATISTICS flow_controllers_.insert(decltype(flow_controllers_)::value_type( async_statistics_flow_controller_name, std::unique_ptr( new FlowControllerImpl(participant_, nullptr)))); + FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++)))); #endif // ifndef FASTDDS_STATISTICS } @@ -67,7 +67,8 @@ void FlowControllerFactory::register_flow_controller ( flow_controller_descr.name, std::unique_ptr( new FlowControllerImpl(participant_, &flow_controller_descr)))); + FlowControllerFifoSchedule>(participant_, + &flow_controller_descr, async_controller_index_++)))); break; case FlowControllerSchedulerPolicy::ROUND_ROBIN: flow_controllers_.insert(decltype(flow_controllers_)::value_type( @@ -75,7 +76,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr)))); + &flow_controller_descr, async_controller_index_++)))); break; case FlowControllerSchedulerPolicy::HIGH_PRIORITY: flow_controllers_.insert(decltype(flow_controllers_)::value_type( @@ -83,7 +84,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr)))); + &flow_controller_descr, async_controller_index_++)))); break; case FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION: flow_controllers_.insert(decltype(flow_controllers_)::value_type( @@ -91,7 +92,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr)))); + &flow_controller_descr, async_controller_index_++)))); break; default: assert(false); @@ -106,7 +107,8 @@ void FlowControllerFactory::register_flow_controller ( flow_controller_descr.name, std::unique_ptr( new FlowControllerImpl(participant_, &flow_controller_descr)))); + FlowControllerFifoSchedule>(participant_, + &flow_controller_descr, async_controller_index_++)))); break; case FlowControllerSchedulerPolicy::ROUND_ROBIN: flow_controllers_.insert(decltype(flow_controllers_)::value_type( @@ -114,7 +116,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr)))); + &flow_controller_descr, async_controller_index_++)))); break; case FlowControllerSchedulerPolicy::HIGH_PRIORITY: flow_controllers_.insert(decltype(flow_controllers_)::value_type( @@ -122,7 +124,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr)))); + &flow_controller_descr, async_controller_index_++)))); break; case FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION: flow_controllers_.insert(decltype(flow_controllers_)::value_type( @@ -130,7 +132,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr)))); + &flow_controller_descr, async_controller_index_++)))); break; default: assert(false); diff --git a/src/cpp/rtps/flowcontrol/FlowControllerFactory.hpp b/src/cpp/rtps/flowcontrol/FlowControllerFactory.hpp index bb9dcd7e633..009ba79cb06 100644 --- a/src/cpp/rtps/flowcontrol/FlowControllerFactory.hpp +++ b/src/cpp/rtps/flowcontrol/FlowControllerFactory.hpp @@ -66,6 +66,9 @@ class FlowControllerFactory //! Stores the created flow controllers. std::map> flow_controllers_; + //! Counter used for thread identification + uint32_t async_controller_index_ = 0; + }; } // namespace rtps diff --git a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp index 57d8a06cff5..4fffbb74d44 100644 --- a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp +++ b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp @@ -4,6 +4,8 @@ #include "FlowController.hpp" #include #include +#include +#include #include #include @@ -926,11 +928,19 @@ class FlowControllerImpl : public FlowController FlowControllerImpl( fastrtps::rtps::RTPSParticipantImpl* participant, - const FlowControllerDescriptor* descriptor + const FlowControllerDescriptor* descriptor, + uint32_t async_index ) : participant_(participant) , async_mode(participant, descriptor) + , participant_id_(0) + , async_index_(async_index) { + if (nullptr != participant) + { + participant_id_ = static_cast(participant->getRTPSParticipantAttributes().participantID); + } + uint32_t limitation = get_max_payload(); if (std::numeric_limits::max() != limitation) @@ -1257,6 +1267,8 @@ class FlowControllerImpl : public FlowController */ void run() { + set_name_to_current_thread("dds.asyn.%u.%u", participant_id_, async_index_); + while (async_mode.running) { // There are writers interested in removing a sample. @@ -1399,6 +1411,9 @@ class FlowControllerImpl : public FlowController // async_mode must be destroyed before sched. publish_mode async_mode; + + uint32_t participant_id_ = 0; + uint32_t async_index_ = 0; }; } // namespace rtps diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 48f00d6a449..59059fc0118 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -61,6 +61,11 @@ #include #include #include +#include + +#if HAVE_SECURITY +#include +#endif // HAVE_SECURITY namespace eprosima { namespace fastrtps { @@ -137,7 +142,7 @@ RTPSParticipantImpl::RTPSParticipantImpl( , internal_metatraffic_locators_(false) , internal_default_locators_(false) #if HAVE_SECURITY - , m_security_manager(this) + , m_security_manager(this, *this) #endif // if HAVE_SECURITY , mp_participantListener(plisten) , mp_userParticipant(par) @@ -239,7 +244,11 @@ RTPSParticipantImpl::RTPSParticipantImpl( } mp_userParticipant->mp_impl = this; - mp_event_thr.init_thread(); + uint32_t id_for_thread = static_cast(m_att.participantID); + mp_event_thr.init_thread([id_for_thread]() + { + set_name_to_current_thread("dds.ev.%u", id_for_thread); + }); if (!networkFactoryHasRegisteredTransports()) { @@ -2186,6 +2195,15 @@ bool RTPSParticipantImpl::is_security_enabled_for_reader( return false; } +security::Logging* RTPSParticipantImpl::create_builtin_logging_plugin() +{ + return new security::LogTopic([this]() + { + uint32_t participant_id = static_cast(m_att.participantID); + set_name_to_current_thread("dds.slog.%u", participant_id); + }); +} + #endif // if HAVE_SECURITY PDP* RTPSParticipantImpl::pdp() diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 03b68851747..815b55ed35f 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -59,6 +59,7 @@ #include #include #include +#include #endif // if HAVE_SECURITY namespace eprosima { @@ -106,6 +107,9 @@ class WLP; */ class RTPSParticipantImpl : public fastdds::statistics::StatisticsParticipantImpl +#if HAVE_SECURITY + , private security::SecurityPluginFactory +#endif // if HAVE_SECURITY { /* Receiver Control block is a struct we use to encapsulate the resources that take part in message reception. @@ -398,6 +402,8 @@ class RTPSParticipantImpl bool is_security_enabled_for_reader( const ReaderAttributes& reader_attributes); + security::Logging* create_builtin_logging_plugin() override; + #endif // if HAVE_SECURITY PDPSimple* pdpsimple(); diff --git a/src/cpp/rtps/resources/ResourceEvent.cpp b/src/cpp/rtps/resources/ResourceEvent.cpp index 82acd562dcb..d5f58eb0a81 100644 --- a/src/cpp/rtps/resources/ResourceEvent.cpp +++ b/src/cpp/rtps/resources/ResourceEvent.cpp @@ -303,7 +303,8 @@ void ResourceEvent::do_timer_actions() } } -void ResourceEvent::init_thread() +void ResourceEvent::init_thread( + std::function configure_cb) { std::lock_guard lock(mutex_); @@ -311,7 +312,14 @@ void ResourceEvent::init_thread() stop_.store(false); resize_collections(); - thread_ = std::thread(&ResourceEvent::event_service, this); + thread_ = std::thread([this, configure_cb]() + { + if (configure_cb) + { + configure_cb(); + } + event_service(); + }); } } /* namespace rtps */ diff --git a/src/cpp/rtps/security/ISecurityPluginFactory.h b/src/cpp/rtps/security/ISecurityPluginFactory.h new file mode 100644 index 00000000000..8dd30280616 --- /dev/null +++ b/src/cpp/rtps/security/ISecurityPluginFactory.h @@ -0,0 +1,78 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/*! + * @file ISecurityPluginFactory.h + */ +#ifndef _RTPS_SECURITY_ISECURITYPLUGINFACTORY_H_ +#define _RTPS_SECURITY_ISECURITYPLUGINFACTORY_H_ + +#include +#include +#include +#include +#include + +namespace eprosima { +namespace fastrtps { +namespace rtps { +namespace security { + +class ISecurityPluginFactory +{ +public: + + /*! + * @brief Create an Authentication plugin described in the PropertyPolicy. + * @param property_policy PropertyPolicy containing the definition of the Authentication + * plugin that has to be created. + * @param Pointer to the new Authentication plugin. In case of error nullptr will be returned. + */ + virtual Authentication* create_authentication_plugin( + const PropertyPolicy& property_policy) = 0; + + /*! + * @brief Create an AccessControl plugin described in the PropertyPolicy. + * @param property_policy PropertyPolicy containing the definition of the AccessControl + * plugin that has to be created. + * @param Pointer to the new AccessControl plugin. In case of error nullptr will be returned. + */ + virtual AccessControl* create_access_control_plugin( + const PropertyPolicy& property_policy) = 0; + + /*! + * @brief Create an Cryptographic plugin described in the PropertyPolicy. + * @param property_policy PropertyPolicy containing the definition of the Cryptographic + * plugin that has to be created. + * @param Pointer to the new Cryptographic plugin. In case of error nullptr will be returned. + */ + virtual Cryptography* create_cryptography_plugin( + const PropertyPolicy& property_policy) = 0; + + /** + * @brief Create a Logging plugin described in the PropertyPolicy. + * @param property_policy PropertyPolicy containing the definition of the Logging + * plugin that has to be created. + * @return Pointer to the new Logging plugin. In case of error nullptr will be returned. + */ + virtual Logging* create_logging_plugin( + const PropertyPolicy& property_policy) = 0; +}; + +} //namespace security +} //namespace rtps +} //namespace fastrtps +} //namespace eprosima + +#endif // _RTPS_SECURITY_SECURITYPLUGINFACTORY_H_ diff --git a/src/cpp/rtps/security/SecurityManager.cpp b/src/cpp/rtps/security/SecurityManager.cpp index 937890b9a3c..ccdc571efef 100644 --- a/src/cpp/rtps/security/SecurityManager.cpp +++ b/src/cpp/rtps/security/SecurityManager.cpp @@ -74,25 +74,13 @@ inline bool usleep_bool() } SecurityManager::SecurityManager( - RTPSParticipantImpl* participant) + RTPSParticipantImpl* participant, + ISecurityPluginFactory& plugin_factory) : participant_stateless_message_listener_(*this) , participant_volatile_message_secure_listener_(*this) , participant_(participant) - , participant_stateless_message_writer_(nullptr) - , participant_stateless_message_writer_history_(nullptr) - , participant_stateless_message_reader_(nullptr) - , participant_stateless_message_reader_history_(nullptr) - , participant_volatile_message_secure_writer_(nullptr) - , participant_volatile_message_secure_writer_history_(nullptr) - , participant_volatile_message_secure_reader_(nullptr) - , participant_volatile_message_secure_reader_history_(nullptr) - , logging_plugin_(nullptr) - , authentication_plugin_(nullptr) - , access_plugin_(nullptr) - , crypto_plugin_(nullptr) + , factory_(plugin_factory) , domain_id_(0) - , local_identity_handle_(nullptr) - , local_permissions_handle_(nullptr) , auth_last_sequence_number_(1) , crypto_last_sequence_number_(1) , temp_reader_proxies_({ diff --git a/src/cpp/rtps/security/SecurityManager.h b/src/cpp/rtps/security/SecurityManager.h index 26eeb984c98..dbd6410cd7f 100644 --- a/src/cpp/rtps/security/SecurityManager.h +++ b/src/cpp/rtps/security/SecurityManager.h @@ -18,7 +18,7 @@ #ifndef _RTPS_SECURITY_SECURITYMANAGER_H_ #define _RTPS_SECURITY_SECURITYMANAGER_H_ -#include +#include #include #include @@ -75,7 +75,8 @@ class SecurityManager * @param participant RTPSParticipantImpl* references the associated participant */ SecurityManager( - RTPSParticipantImpl* participant); + RTPSParticipantImpl* participant, + ISecurityPluginFactory& plugin_factory); // @brief Destructor ~SecurityManager(); @@ -769,30 +770,27 @@ class SecurityManager const ParticipantProxyData& participant_data, const SecurityException& exception) const; - RTPSParticipantImpl* participant_; - StatelessWriter* participant_stateless_message_writer_; - WriterHistory* participant_stateless_message_writer_history_; - StatelessReader* participant_stateless_message_reader_; - ReaderHistory* participant_stateless_message_reader_history_; - StatefulWriter* participant_volatile_message_secure_writer_; - WriterHistory* participant_volatile_message_secure_writer_history_; - StatefulReader* participant_volatile_message_secure_reader_; - ReaderHistory* participant_volatile_message_secure_reader_history_; - SecurityPluginFactory factory_; + RTPSParticipantImpl* participant_ = nullptr; + StatelessWriter* participant_stateless_message_writer_ = nullptr; + WriterHistory* participant_stateless_message_writer_history_ = nullptr; + StatelessReader* participant_stateless_message_reader_ = nullptr; + ReaderHistory* participant_stateless_message_reader_history_ = nullptr; + StatefulWriter* participant_volatile_message_secure_writer_ = nullptr; + WriterHistory* participant_volatile_message_secure_writer_history_ = nullptr; + StatefulReader* participant_volatile_message_secure_reader_ = nullptr; + ReaderHistory* participant_volatile_message_secure_reader_history_ = nullptr; + ISecurityPluginFactory& factory_; - Logging* logging_plugin_; + Logging* logging_plugin_ = nullptr; + Authentication* authentication_plugin_ = nullptr; + AccessControl* access_plugin_ = nullptr; + Cryptography* crypto_plugin_ = nullptr; - Authentication* authentication_plugin_; + uint32_t domain_id_ = 0; - AccessControl* access_plugin_; + IdentityHandle* local_identity_handle_ = nullptr; - Cryptography* crypto_plugin_; - - uint32_t domain_id_; - - IdentityHandle* local_identity_handle_; - - PermissionsHandle* local_permissions_handle_; + PermissionsHandle* local_permissions_handle_ = nullptr; std::shared_ptr local_participant_crypto_handle_; diff --git a/src/cpp/rtps/security/SecurityPluginFactory.cpp b/src/cpp/rtps/security/SecurityPluginFactory.cpp index 8ac864bd9c4..c5f07d83878 100644 --- a/src/cpp/rtps/security/SecurityPluginFactory.cpp +++ b/src/cpp/rtps/security/SecurityPluginFactory.cpp @@ -25,71 +25,95 @@ using namespace eprosima::fastrtps::rtps; using namespace eprosima::fastrtps::rtps::security; -Authentication* SecurityPluginFactory::create_authentication_plugin(const PropertyPolicy& property_policy) +Authentication* SecurityPluginFactory::create_authentication_plugin( + const PropertyPolicy& property_policy) { Authentication* plugin = nullptr; const std::string* auth_plugin_property = PropertyPolicyHelper::find_property(property_policy, - "dds.sec.auth.plugin"); + "dds.sec.auth.plugin"); - if(auth_plugin_property != nullptr) + if (auth_plugin_property != nullptr) { - if(auth_plugin_property->compare("builtin.PKI-DH") == 0) + if (auth_plugin_property->compare("builtin.PKI-DH") == 0) { - plugin = new PKIDH(); + plugin = create_builtin_authentication_plugin(); } } return plugin; } -AccessControl* SecurityPluginFactory::create_access_control_plugin(const PropertyPolicy& property_policy) +AccessControl* SecurityPluginFactory::create_access_control_plugin( + const PropertyPolicy& property_policy) { AccessControl* plugin = nullptr; const std::string* access_plugin_property = PropertyPolicyHelper::find_property(property_policy, - "dds.sec.access.plugin"); + "dds.sec.access.plugin"); - if(access_plugin_property != nullptr) + if (access_plugin_property != nullptr) { - if(access_plugin_property->compare("builtin.Access-Permissions") == 0) + if (access_plugin_property->compare("builtin.Access-Permissions") == 0) { - plugin = new Permissions(); + plugin = create_builtin_access_control_plugin(); } } return plugin; } -Cryptography* SecurityPluginFactory::create_cryptography_plugin(const PropertyPolicy& property_policy) +Cryptography* SecurityPluginFactory::create_cryptography_plugin( + const PropertyPolicy& property_policy) { Cryptography* plugin = nullptr; const std::string* crypto_plugin_property = PropertyPolicyHelper::find_property(property_policy, - "dds.sec.crypto.plugin"); + "dds.sec.crypto.plugin"); - if(crypto_plugin_property != nullptr) + if (crypto_plugin_property != nullptr) { // Check it is builtin DDS:Auth:PKI-DH. - if(crypto_plugin_property->compare("builtin.AES-GCM-GMAC") == 0) + if (crypto_plugin_property->compare("builtin.AES-GCM-GMAC") == 0) { - plugin = new AESGCMGMAC(); + plugin = create_builtin_cryptography_plugin(); } } return plugin; } -Logging* SecurityPluginFactory::create_logging_plugin(const PropertyPolicy& property_policy) +Logging* SecurityPluginFactory::create_logging_plugin( + const PropertyPolicy& property_policy) { Logging* plugin = nullptr; const std::string* logging_plugin_property = PropertyPolicyHelper::find_property(property_policy, "dds.sec.log.plugin"); - if(logging_plugin_property != nullptr) + if (logging_plugin_property != nullptr) { - if(logging_plugin_property->compare("builtin.DDS_LogTopic") == 0) + if (logging_plugin_property->compare("builtin.DDS_LogTopic") == 0) { - plugin = new LogTopic(); + plugin = create_builtin_logging_plugin(); } } return plugin; } + +Authentication* SecurityPluginFactory::create_builtin_authentication_plugin() +{ + return new PKIDH(); +} + +AccessControl* SecurityPluginFactory::create_builtin_access_control_plugin() +{ + return new Permissions(); +} + +Cryptography* SecurityPluginFactory::create_builtin_cryptography_plugin() +{ + return new AESGCMGMAC(); +} + +Logging* SecurityPluginFactory::create_builtin_logging_plugin() +{ + return new LogTopic(); +} diff --git a/src/cpp/rtps/security/SecurityPluginFactory.h b/src/cpp/rtps/security/SecurityPluginFactory.h index 41cb02f4ec3..772f867c8d4 100644 --- a/src/cpp/rtps/security/SecurityPluginFactory.h +++ b/src/cpp/rtps/security/SecurityPluginFactory.h @@ -24,40 +24,38 @@ #include #include +#include + namespace eprosima { namespace fastrtps { namespace rtps { namespace security { -class SecurityPluginFactory +class SecurityPluginFactory : public ISecurityPluginFactory { - public: - - /*! - * @brief Create an Authentication plugin described in the PropertyPolicy. - * @param property_policy PropertyPolicy containing the definition of the Authentication - * plugin that has to be created. - * @param Pointer to the new Authentication plugin. In case of error nullptr will be returned. - */ - Authentication* create_authentication_plugin(const PropertyPolicy& property_policy); - - AccessControl* create_access_control_plugin(const PropertyPolicy& property_policy); - - /*! - * @brief Create an Cryptographic plugin described in the PropertyPolicy. - * @param property_policy PropertyPolicy containing the definition of the Cryptographic - * plugin that has to be created. - * @param Pointer to the new Cryptographic plugin. In case of error nullptr will be returned. - */ - Cryptography* create_cryptography_plugin(const PropertyPolicy& property_policy); - - /** - * @brief Create a loggin plugin described in the PropertyPolicy. - * @param property_policy PropertyPolicy containing the definition of the Logging - * plugin that has to be created. - * @return Pointer to the new Logging plugin. In case of error nullptr will be returned. - */ - Logging* create_logging_plugin(const PropertyPolicy& property_policy); +public: + + Authentication* create_authentication_plugin( + const PropertyPolicy& property_policy) override; + + AccessControl* create_access_control_plugin( + const PropertyPolicy& property_policy) override; + + Cryptography* create_cryptography_plugin( + const PropertyPolicy& property_policy) override; + + Logging* create_logging_plugin( + const PropertyPolicy& property_policy) override; + +protected: + + virtual Authentication* create_builtin_authentication_plugin(); + + virtual AccessControl* create_builtin_access_control_plugin(); + + virtual Cryptography* create_builtin_cryptography_plugin(); + + virtual Logging* create_builtin_logging_plugin(); }; } //namespace security diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 334cff4c4dc..ae5735cf153 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -34,6 +34,7 @@ #endif // if TLS_FOUND #include #include +#include using namespace std; using namespace asio; @@ -418,6 +419,7 @@ bool TCPTransportInterface::init( auto ioServiceFunction = [&]() { + set_name_to_current_thread("dds.tcp_accept"); #if ASIO_VERSION >= 101200 asio::executor_work_guard work(io_service_.get_executor()); #else @@ -431,7 +433,7 @@ bool TCPTransportInterface::init( { io_service_timers_thread_ = std::make_shared([&]() { - + set_name_to_current_thread("dds.tcp_keep"); #if ASIO_VERSION >= 101200 asio::executor_work_guard work(io_service_timers_. get_executor()); @@ -805,6 +807,9 @@ void TCPTransportInterface::perform_listen_operation( if (channel) { + uint32_t port = channel->local_endpoint().port(); + set_name_to_current_thread("dds.tcp.%u", port); + if (channel->tcp_connection_type() == TCPChannelResource::TCPConnectionType::TCP_CONNECT_TYPE) { rtcp_message_manager->sendConnectionRequest(channel); diff --git a/src/cpp/rtps/transport/UDPChannelResource.cpp b/src/cpp/rtps/transport/UDPChannelResource.cpp index b68594909e1..62904444c7c 100644 --- a/src/cpp/rtps/transport/UDPChannelResource.cpp +++ b/src/cpp/rtps/transport/UDPChannelResource.cpp @@ -17,6 +17,7 @@ #include #include #include +#include namespace eprosima { namespace fastdds { @@ -53,6 +54,8 @@ UDPChannelResource::~UDPChannelResource() void UDPChannelResource::perform_listen_operation( Locator input_locator) { + set_name_to_current_thread("dds.udp.%u", input_locator.port); + Locator remote_locator; while (alive()) diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp index bc84b022d53..139e5284a95 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp @@ -22,6 +22,8 @@ #include #include +#include + namespace eprosima { namespace fastdds { namespace rtps { @@ -49,7 +51,7 @@ class SharedMemChannelResource : public ChannelResource auto packets_file_consumer = std::unique_ptr( new SHMPacketFileConsumer(dump_file)); - packet_logger_ = std::make_shared>(); + packet_logger_ = std::make_shared>(locator.port); packet_logger_->RegisterConsumer(std::move(packets_file_consumer)); } @@ -123,6 +125,8 @@ class SharedMemChannelResource : public ChannelResource void perform_listen_operation( Locator input_locator) { + set_name_to_current_thread("dds.shm.%u", input_locator.port); + Locator remote_locator; while (alive()) diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemLog.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemLog.hpp index 90a6af29297..893fef62e0e 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemLog.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemLog.hpp @@ -199,6 +199,12 @@ class PacketsLog { public: + PacketsLog( + uint32_t thread_id) + : thread_id_(thread_id) + { + } + ~PacketsLog() { Flush(); @@ -349,9 +355,11 @@ class PacketsLog }; Resources resources_; + uint32_t thread_id_; void run() { + set_name_to_current_thread("dds.shmd.%u", thread_id_); std::unique_lock guard(resources_.cv_mutex); while (resources_.logging) diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp index ea627dd1d2b..e9c6404d4e5 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp @@ -285,7 +285,7 @@ bool SharedMemTransport::init( auto packets_file_consumer = std::unique_ptr( new SHMPacketFileConsumer(configuration_.rtps_dump_file())); - packet_logger_ = std::make_shared>(); + packet_logger_ = std::make_shared>(0); packet_logger_->RegisterConsumer(std::move(packets_file_consumer)); } } diff --git a/src/cpp/security/logging/LogTopic.cpp b/src/cpp/security/logging/LogTopic.cpp index 88214bc6a12..b461cc9fc7e 100644 --- a/src/cpp/security/logging/LogTopic.cpp +++ b/src/cpp/security/logging/LogTopic.cpp @@ -3,32 +3,41 @@ #include #include +#include + namespace eprosima { namespace fastrtps { namespace rtps { namespace security { -LogTopic::LogTopic() +LogTopic::LogTopic( + std::function thread_init_cb) : stop_(false) - , thread_([this]() { - for (;;) - { - // Put the thread asleep until there is - // something to process - auto p = queue_.wait_pop(); + , thread_([this, thread_init_cb]() + { + if (thread_init_cb) + { + thread_init_cb(); + } + + while (true) + { + // Put the thread asleep until there is + // something to process + auto p = queue_.wait_pop(); - if (!p) + if (!p) + { + if (stop_) { - if (stop_) - { - return; - } - continue; + return; } - - publish(*p); + continue; } - }) + + publish(*p); + } + }) { // } @@ -68,7 +77,7 @@ bool LogTopic::enable_logging_impl( { file_stream_.open(options.log_file, std::ios::out | std::ios::app); - if ( (file_stream_.rdstate() & std::ofstream::failbit ) != 0 ) + if ((file_stream_.rdstate() & std::ofstream::failbit ) != 0 ) { exception = SecurityException("Error opening file: " + options.log_file); return false; diff --git a/src/cpp/security/logging/LogTopic.h b/src/cpp/security/logging/LogTopic.h index bd75ce735da..47ebcd9be7e 100644 --- a/src/cpp/security/logging/LogTopic.h +++ b/src/cpp/security/logging/LogTopic.h @@ -25,6 +25,7 @@ #include #include +#include #include namespace eprosima { @@ -41,7 +42,8 @@ class LogTopic final : public Logging public: - LogTopic(); + LogTopic( + std::function thread_init_cb = {}); ~LogTopic(); private: diff --git a/src/cpp/utils/SystemInfo.cpp b/src/cpp/utils/SystemInfo.cpp index 08c550438a7..b4bceda6686 100644 --- a/src/cpp/utils/SystemInfo.cpp +++ b/src/cpp/utils/SystemInfo.cpp @@ -34,6 +34,7 @@ #include #include +#include namespace eprosima { @@ -274,3 +275,6 @@ std::string SystemInfo::get_timestamp( std::string SystemInfo::environment_file_; } // eprosima + +// threading.hpp implementations +#include "threading/threading_empty.ipp" diff --git a/src/cpp/utils/shared_memory/SharedMemWatchdog.hpp b/src/cpp/utils/shared_memory/SharedMemWatchdog.hpp index 347d0c5dad3..e77c3131fad 100644 --- a/src/cpp/utils/shared_memory/SharedMemWatchdog.hpp +++ b/src/cpp/utils/shared_memory/SharedMemWatchdog.hpp @@ -22,6 +22,8 @@ #include #include +#include + namespace eprosima { namespace fastdds { namespace rtps { @@ -121,6 +123,8 @@ class SharedMemWatchdog void run() { + set_name_to_current_thread("dds.shm.wdog"); + while (!exit_thread_) { { diff --git a/src/cpp/utils/threading.hpp b/src/cpp/utils/threading.hpp new file mode 100644 index 00000000000..974fff639ad --- /dev/null +++ b/src/cpp/utils/threading.hpp @@ -0,0 +1,34 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef UTILS__THREADING_HPP_ +#define UTILS__THREADING_HPP_ + +namespace eprosima { + +void set_name_to_current_thread( + const char* name); + +void set_name_to_current_thread( + const char* fmt, + uint32_t arg); + +void set_name_to_current_thread( + const char* fmt, + uint32_t arg1, + uint32_t arg2); + +} // eprosima + +#endif // UTILS__THREADING_HPP_ diff --git a/src/cpp/utils/threading/threading_empty.ipp b/src/cpp/utils/threading/threading_empty.ipp new file mode 100644 index 00000000000..30d97890bca --- /dev/null +++ b/src/cpp/utils/threading/threading_empty.ipp @@ -0,0 +1,35 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace eprosima { + +void set_name_to_current_thread( + const char* /* name */) +{ +} + +void set_name_to_current_thread( + const char* /* fmt */, + uint32_t /* arg */) +{ +} + +void set_name_to_current_thread( + const char* /* fmt */, + uint32_t /* arg1 */, + uint32_t /* arg2 */) +{ +} + +} // namespace eprosima diff --git a/test/mock/rtps/SecurityPluginFactory/rtps/security/SecurityPluginFactory.h b/test/mock/rtps/SecurityPluginFactory/rtps/security/SecurityPluginFactory.h index e175d8d5ccd..74fe9e1b46d 100644 --- a/test/mock/rtps/SecurityPluginFactory/rtps/security/SecurityPluginFactory.h +++ b/test/mock/rtps/SecurityPluginFactory/rtps/security/SecurityPluginFactory.h @@ -24,60 +24,58 @@ #include #include +#include + namespace eprosima { namespace fastrtps { namespace rtps { namespace security { -class SecurityPluginFactory +class SecurityPluginFactory : public ISecurityPluginFactory { - public: +public: - /*! - * @brief Create an Authentication plugin described in the PropertyPolicy. - * @param property_policy PropertyPolicy containing the definition of the Authentication - * plugin that has to be created. - * @param Pointer to the new Authentication plugin. In case of error nullptr will be returned. - */ - Authentication* create_authentication_plugin(const PropertyPolicy& property_policy); + Authentication* create_authentication_plugin( + const PropertyPolicy& property_policy) override; - AccessControl* create_access_control_plugin(const PropertyPolicy& property_policy); + AccessControl* create_access_control_plugin( + const PropertyPolicy& property_policy) override; - /*! - * @brief Create an Cryptography plugin described in the PropertyPolicy. - * @param property_policy PropertyPolicy containing the definition of the Cryptography - * plugin that has to be created. - * @param Pointer to the new Cryptography plugin. In case of error nullptr will be returned. - */ - Cryptography* create_cryptography_plugin(const PropertyPolicy& property_policy); + Cryptography* create_cryptography_plugin( + const PropertyPolicy& property_policy) override; - Logging* create_logging_plugin(const PropertyPolicy& property_policy); + Logging* create_logging_plugin( + const PropertyPolicy& property_policy) override; - static void set_auth_plugin(Authentication* plugin); + static void set_auth_plugin( + Authentication* plugin); - static void release_auth_plugin(); + static void release_auth_plugin(); - static void set_access_control_plugin(AccessControl* plugin); + static void set_access_control_plugin( + AccessControl* plugin); - static void release_access_control_plugin(); + static void release_access_control_plugin(); - static void set_crypto_plugin(Cryptography* plugin); + static void set_crypto_plugin( + Cryptography* plugin); - static void release_crypto_plugin(); + static void release_crypto_plugin(); - static void set_logging_plugin(Logging* plugin); + static void set_logging_plugin( + Logging* plugin); - static void release_logging_plugin(); + static void release_logging_plugin(); - private: +private: - static Authentication* auth_plugin_; + static Authentication* auth_plugin_; - static AccessControl* access_plugin_; + static AccessControl* access_plugin_; - static Cryptography* crypto_plugin_; + static Cryptography* crypto_plugin_; - static Logging* logging_plugin_; + static Logging* logging_plugin_; }; } //namespace security diff --git a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnAsyncTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnAsyncTests.cpp index 4050360c81c..42dc3e261c6 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnAsyncTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnAsyncTests.cpp @@ -7,7 +7,7 @@ TYPED_TEST(FlowControllerPublishModes, async_publish_mode) { FlowControllerDescriptor flow_controller_descr; FlowControllerImpl async(nullptr, - &flow_controller_descr); + &flow_controller_descr, 0); async.init(); // Instantiate writers. diff --git a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnLimitedAsyncTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnLimitedAsyncTests.cpp index 8508d952d84..04dc216ef7f 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnLimitedAsyncTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnLimitedAsyncTests.cpp @@ -28,7 +28,7 @@ TYPED_TEST(FlowControllerPublishModes, limited_async_publish_mode) flow_controller_descr.max_bytes_per_period = 10200; flow_controller_descr.period_ms = 10; FlowControllerImpl async(nullptr, - &flow_controller_descr); + &flow_controller_descr, 0); async.init(); // Instantiate writers. diff --git a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnPureSyncTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnPureSyncTests.cpp index 73a08987d95..b06ff64d6a1 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnPureSyncTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnPureSyncTests.cpp @@ -7,7 +7,7 @@ TYPED_TEST(FlowControllerPublishModes, pure_sync_publish_mode) { FlowControllerDescriptor flow_controller_descr; FlowControllerImpl pure_sync(nullptr, - &flow_controller_descr); + &flow_controller_descr, 0); pure_sync.init(); // Initialize callback to get info. diff --git a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnSyncTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnSyncTests.cpp index d8438a25ae6..6d836977ad1 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnSyncTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnSyncTests.cpp @@ -7,7 +7,7 @@ TYPED_TEST(FlowControllerPublishModes, sync_publish_mode) { FlowControllerDescriptor flow_controller_descr; FlowControllerImpl sync(nullptr, - &flow_controller_descr); + &flow_controller_descr, 0); sync.init(); // Instantiate writers. diff --git a/test/unittest/rtps/flowcontrol/FlowControllerSchedulersTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerSchedulersTests.cpp index 7ffe0ba2bab..2b83a4cd886 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerSchedulersTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerSchedulersTests.cpp @@ -90,7 +90,7 @@ TEST_F(FlowControllerSchedulers, Fifo) flow_controller_descr.max_bytes_per_period = 10200; flow_controller_descr.period_ms = 10; FlowControllerImpl async(nullptr, - &flow_controller_descr); + &flow_controller_descr, 0); async.init(); // Instantiate writers. @@ -691,7 +691,7 @@ TEST_F(FlowControllerSchedulers, RoundRobin) flow_controller_descr.max_bytes_per_period = 10200; flow_controller_descr.period_ms = 10; FlowControllerImpl async(nullptr, - &flow_controller_descr); + &flow_controller_descr, 0); async.init(); // Instantiate writers. @@ -1292,7 +1292,7 @@ TEST_F(FlowControllerSchedulers, HighPriority) flow_controller_descr.max_bytes_per_period = 10200; flow_controller_descr.period_ms = 10; FlowControllerImpl async(nullptr, - &flow_controller_descr); + &flow_controller_descr, 0); async.init(); // Instantiate writers. @@ -1916,7 +1916,7 @@ TEST_F(FlowControllerSchedulers, PriorityWithReservation) flow_controller_descr.period_ms = 10; FlowControllerImpl async(nullptr, - &flow_controller_descr); + &flow_controller_descr, 0); async.init(); // Instantiate writers. diff --git a/test/unittest/rtps/security/SecurityTests.hpp b/test/unittest/rtps/security/SecurityTests.hpp index 2a9910a2c9c..3ac17a5076a 100644 --- a/test/unittest/rtps/security/SecurityTests.hpp +++ b/test/unittest/rtps/security/SecurityTests.hpp @@ -137,7 +137,7 @@ class SecurityTest : public ::testing::Test , stateless_reader_(nullptr) , volatile_writer_(nullptr) , volatile_reader_(nullptr) - , manager_(&participant_) + , manager_(&participant_, plugin_factory_) , participant_data_(c_default_RTPSParticipantAllocationAttributes) , default_cdr_message(RTPSMESSAGE_DEFAULT_SIZE) { @@ -162,6 +162,7 @@ class SecurityTest : public ::testing::Test ::testing::NiceMock* volatile_writer_; ::testing::NiceMock* volatile_reader_; PDP pdp_; + SecurityPluginFactory plugin_factory_; SecurityManager manager_; // handles diff --git a/thirdparty/filewatch/FileWatch.hpp b/thirdparty/filewatch/FileWatch.hpp index b2909226f0e..a714d284953 100644 --- a/thirdparty/filewatch/FileWatch.hpp +++ b/thirdparty/filewatch/FileWatch.hpp @@ -23,6 +23,8 @@ #ifndef FILEWATCHER_H #define FILEWATCHER_H +#include + #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN #define stat _stat @@ -209,6 +211,7 @@ namespace filewatch { #endif // WIN32 _callback_thread = std::move(std::thread([this]() { try { + eprosima::set_name_to_current_thread("dds.fwatch.cb"); callback_thread(); } catch (...) { try { @@ -219,6 +222,7 @@ namespace filewatch { })); _watch_thread = std::move(std::thread([this]() { try { + eprosima::set_name_to_current_thread("dds.fwatch"); monitor_directory(); } catch (...) { try {