From 77f2c8a0476e587ad616e688277424156be14c8a Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 23 Oct 2023 12:19:43 +0200 Subject: [PATCH] Apply thread settings (#3874) * Refs #19436. Added thread creation wrapper infrastructure. Signed-off-by: Miguel Company * Refs #19436. Added empty implementation for apply_thread_settings_to_current_thread. Signed-off-by: Miguel Company * Refs #19436. Refactor on Log.cpp Signed-off-by: Miguel Company * Refs #19436. Add implementation for setting scheduler and priority. Signed-off-by: Miguel Company * Refs #19436. Add implementation for setting cpu affinity. Signed-off-by: Miguel Company * Refs #19436. Add test setting config for Log thread. Signed-off-by: Miguel Company * Refs #19436. Fix SystemInfoTests link issue. Signed-off-by: Miguel Company * Refs #19436. Changes on ResourceEvent. Signed-off-by: Miguel Company * Refs #19436. Changes on DataSharingListener. Signed-off-by: Miguel Company * Refs #19436. Changes on FlowControllerImpl. Signed-off-by: Miguel Company * Refs #19436. Changes on security LogTopic. Signed-off-by: Miguel Company * Refs #19436. Apply settings on SharedMemWatchdog. Signed-off-by: Miguel Company * Refs #19436. Apply settings on SharedMem reception threads. Signed-off-by: Miguel Company * Refs #19436. Apply settings on SharedMem packet dump threads. Signed-off-by: Miguel Company * Refs #19436. Apply settings on UDP reception threads. Signed-off-by: Miguel Company * Refs #19436. Apply settings on TCP accept and keep_alive threads. Signed-off-by: Miguel Company * Refs #19436. Apply settings on TCP reception threads. Signed-off-by: Miguel Company * Refs #19436. Include what you use. Signed-off-by: Miguel Company * Refs #19436. Add MacOS implementation for setting scheduler and priority. Signed-off-by: Miguel Company * Refs #19436. Add MacOS implementation for setting thread affinity. Signed-off-by: Miguel Company * Refs #19437. Member cpu_mask changed to affinity and made it 64 bits. Signed-off-by: Miguel Company * Refs #19437. Windows implementation for thread affinity. Signed-off-by: Miguel Company * Refs #19437. Windows implementation for thread priority. Signed-off-by: Miguel Company * Refs #19436. Made `get_thread_config_for_port` a const method. Signed-off-by: Miguel Company * Refs #19436. Apply suggestions from code review. Signed-off-by: Miguel Company Co-authored-by: Eduardo Ponz Segrelles * Refs #19435. Some refactors on FileWatch: - Namespace moved to eprosima::filewatch - Constructor receives thread settings - Copy constructors deleted Signed-off-by: Miguel Company * Refs #19435. SystemInfo::watch_file receives thread settings. Signed-off-by: Miguel Company * Refs #19435. Added RTPSDomain::set_filewatch_thread_config Signed-off-by: Miguel Company * Refs #19435. Call RTPSDomain::set_filewatch_thread_config inside DomainParticipantFactory::create_participant Signed-off-by: Miguel Company * Refs #19435. Change priority default value. Signed-off-by: Miguel Company * Refs #19435. Account for default values in threading_pthread Signed-off-by: Miguel Company * Refs #19435. Account for default values in threading_osx Signed-off-by: Miguel Company * Refs #19435. Account for default values in threading_win32 Signed-off-by: Miguel Company * Refs #19435. Linters. Signed-off-by: Miguel Company * Refs #19435. Use C++ headers. Signed-off-by: Miguel Company * Refs #19435. Documentation updates. Signed-off-by: Miguel Company * Refs #19435. Suggestions on Log test. Signed-off-by: Miguel Company * Refs #19435. Removed unused overload of create_thread. Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company Co-authored-by: Eduardo Ponz Segrelles --- include/fastdds/rtps/RTPSDomain.h | 13 ++ .../rtps/attributes/ThreadSettings.hpp | 14 +- .../fastdds/rtps/resources/ResourceEvent.h | 18 ++- .../PortBasedTransportDescriptor.hpp | 2 +- .../domain/DomainParticipantFactory.cpp | 5 + src/cpp/fastdds/log/Log.cpp | 37 +++-- .../rtps/DataSharing/DataSharingListener.cpp | 18 ++- .../rtps/DataSharing/DataSharingListener.hpp | 16 ++- src/cpp/rtps/RTPSDomain.cpp | 21 ++- src/cpp/rtps/RTPSDomainImpl.hpp | 15 ++ src/cpp/rtps/attributes/ThreadSettings.cpp | 2 +- .../discovery/participant/PDPServer.cpp | 9 +- .../rtps/flowcontrol/FlowControllerImpl.hpp | 7 +- .../rtps/participant/RTPSParticipantImpl.cpp | 13 +- src/cpp/rtps/reader/RTPSReader.cpp | 1 + src/cpp/rtps/resources/ResourceEvent.cpp | 19 ++- .../PortBasedTransportDescriptor.cpp | 2 +- .../rtps/transport/TCPTransportInterface.cpp | 57 ++++---- .../rtps/transport/TCPTransportInterface.h | 30 ++-- src/cpp/rtps/transport/UDPChannelResource.cpp | 14 +- src/cpp/rtps/transport/UDPChannelResource.h | 6 +- .../rtps/transport/UDPTransportInterface.cpp | 2 +- .../shared_mem/SharedMemChannelResource.hpp | 20 ++- .../transport/shared_mem/SharedMemLog.hpp | 34 ++--- .../shared_mem/SharedMemTransport.cpp | 7 +- .../test_SharedMemChannelResource.hpp | 7 +- src/cpp/security/logging/LogTopic.cpp | 20 ++- src/cpp/security/logging/LogTopic.h | 4 +- src/cpp/utils/SystemInfo.cpp | 8 +- src/cpp/utils/SystemInfo.hpp | 8 +- .../utils/shared_memory/SharedMemWatchdog.hpp | 22 ++- src/cpp/utils/threading.hpp | 45 ++++++ src/cpp/utils/threading/threading_empty.ipp | 5 + src/cpp/utils/threading/threading_osx.ipp | 86 +++++++++++- src/cpp/utils/threading/threading_pthread.ipp | 129 +++++++++++++++++- src/cpp/utils/threading/threading_win32.ipp | 34 +++++ .../rtps/RTPSDomain/fastdds/rtps/RTPSDomain.h | 7 + .../dds/participant/ParticipantTests.cpp | 8 +- test/unittest/logging/LogTests.cpp | 31 +++++ .../rtps/attributes/ThreadSettingsTests.cpp | 34 ++--- .../PortBasedTransportDescriptorTests.cpp | 24 ++-- test/unittest/utils/CMakeLists.txt | 4 + test/unittest/utils/SystemInfoTests.cpp | 2 +- thirdparty/filewatch/FileWatch.hpp | 60 ++++---- 44 files changed, 685 insertions(+), 235 deletions(-) diff --git a/include/fastdds/rtps/RTPSDomain.h b/include/fastdds/rtps/RTPSDomain.h index 287ad1f792d..401a1d1a6b3 100644 --- a/include/fastdds/rtps/RTPSDomain.h +++ b/include/fastdds/rtps/RTPSDomain.h @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -54,6 +55,18 @@ class RTPSDomain { public: + /** + * Method to set the configuration of the threads created by the file watcher for the environment file. + * In order for these settings to take effect, this method must be called before the first call + * to @ref createParticipant. + * + * @param watch_thread Settings for the thread watching the environment file. + * @param callback_thread Settings for the thread executing the callback when the environment file changed. + */ + RTPS_DllAPI static void set_filewatch_thread_config( + const fastdds::rtps::ThreadSettings& watch_thread, + const fastdds::rtps::ThreadSettings& callback_thread); + /** * Method to shut down all RTPSParticipants, readers, writers, etc. * It must be called at the end of the process to avoid memory leaks. diff --git a/include/fastdds/rtps/attributes/ThreadSettings.hpp b/include/fastdds/rtps/attributes/ThreadSettings.hpp index bd6c9ad34ae..ed2a85f0b9b 100644 --- a/include/fastdds/rtps/attributes/ThreadSettings.hpp +++ b/include/fastdds/rtps/attributes/ThreadSettings.hpp @@ -17,6 +17,7 @@ */ #include +#include #include @@ -41,6 +42,7 @@ struct RTPS_DllAPI ThreadSettings * A value of -1 indicates system default. * * This value is platform specific and it is used as-is to configure the specific platform thread. + * It is ignored on Windows platforms. * Setting this value to something other than the default one may require different privileges * on different platforms. */ @@ -50,25 +52,27 @@ struct RTPS_DllAPI ThreadSettings * @brief The thread's priority. * * Configures the thread's priority. - * A value of 0 indicates system default. + * A value of -2^31 indicates system default. * * This value is platform specific and it is used as-is to configure the specific platform thread. * Setting this value to something other than the default one may require different privileges * on different platforms. */ - int32_t priority = 0; + int32_t priority = std::numeric_limits::min(); /** - * @brief The thread's core affinity. + * @brief The thread's affinity. * - * cpu_mask is a bit mask for setting the threads affinity to each core individually. + * On some systems (Windows, Linux), this is a bit mask for setting the threads affinity to each core individually. + * On MacOS, this sets the affinity tag for the thread, and the OS tries to share the L2 cache between threads + * with the same affinity. * A value of 0 indicates no particular affinity. * * This value is platform specific and it is used as-is to configure the specific platform thread. * Setting this value to something other than the default one may require different privileges * on different platforms. */ - uint32_t cpu_mask = 0; + uint64_t affinity = 0; /** * @brief The thread's stack size in bytes. diff --git a/include/fastdds/rtps/resources/ResourceEvent.h b/include/fastdds/rtps/resources/ResourceEvent.h index 3709f3ff120..5cb68311720 100644 --- a/include/fastdds/rtps/resources/ResourceEvent.h +++ b/include/fastdds/rtps/resources/ResourceEvent.h @@ -22,14 +22,15 @@ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC -#include -#include - #include #include #include #include +#include +#include +#include + namespace eprosima { namespace fastrtps { namespace rtps { @@ -51,11 +52,16 @@ class ResourceEvent /*! * @brief Method to initialize the internal thread. * - * @param[in] configure_cb Function to be called in the context of the started thread - * before calling the internal service routine. + * @param[in] thread_cfg Settings to apply to the created thread. + * @param[in] name_fmt A null-terminated string to be used as the format argument of + * a `snprintf` like function, taking `thread_id` as additional + * argument, and used to give a name to the created thread. + * @param[in] thread_id Single variadic argument passed to the formatting function. */ void init_thread( - std::function configure_cb = {}); + const fastdds::rtps::ThreadSettings& thread_cfg = {}, + const char* name_fmt = "event %u", + uint32_t thread_id = 0); void stop_thread(); diff --git a/include/fastdds/rtps/transport/PortBasedTransportDescriptor.hpp b/include/fastdds/rtps/transport/PortBasedTransportDescriptor.hpp index e34002ac0a5..e35ad1dfa33 100644 --- a/include/fastdds/rtps/transport/PortBasedTransportDescriptor.hpp +++ b/include/fastdds/rtps/transport/PortBasedTransportDescriptor.hpp @@ -77,7 +77,7 @@ class PortBasedTransportDescriptor : public TransportDescriptorInterface * @return The ThreadSettings for the given port. */ virtual RTPS_DllAPI const ThreadSettings& get_thread_config_for_port( - uint32_t port); + uint32_t port) const; virtual RTPS_DllAPI bool set_thread_config_for_port( const uint32_t& port, diff --git a/src/cpp/fastdds/domain/DomainParticipantFactory.cpp b/src/cpp/fastdds/domain/DomainParticipantFactory.cpp index 12049b35d0f..ed27c18d7c5 100644 --- a/src/cpp/fastdds/domain/DomainParticipantFactory.cpp +++ b/src/cpp/fastdds/domain/DomainParticipantFactory.cpp @@ -35,6 +35,7 @@ #include #include #include +#include using namespace eprosima::fastrtps::xmlparser; @@ -156,6 +157,8 @@ DomainParticipant* DomainParticipantFactory::create_participant( { load_profiles(); + RTPSDomain::set_filewatch_thread_config(factory_qos_.file_watch_threads(), factory_qos_.file_watch_threads()); + const DomainParticipantQos& pqos = (&qos == &PARTICIPANT_QOS_DEFAULT) ? default_participant_qos_ : qos; DomainParticipant* dom_part = new DomainParticipant(mask); @@ -422,6 +425,8 @@ void DomainParticipantFactory::set_qos( (void) first_time; //As all the Qos can always be updated and none of them need to be sent to = from; + + rtps::SharedMemWatchdog::set_thread_settings(to.shm_watchdog_thread()); } ReturnCode_t DomainParticipantFactory::check_qos( diff --git a/src/cpp/fastdds/log/Log.cpp b/src/cpp/fastdds/log/Log.cpp index ff33c5f5b96..0652d1145c0 100644 --- a/src/cpp/fastdds/log/Log.cpp +++ b/src/cpp/fastdds/log/Log.cpp @@ -134,14 +134,17 @@ struct LogResources void SetThreadConfig( const rtps::ThreadSettings& config) { - static_cast(config); - return; + std::lock_guard guard(cv_mutex_); + thread_settings_ = config; } //! Returns the logging_ engine to configuration defaults. void Reset() { - std::unique_lock configGuard(config_mutex_); + rtps::ThreadSettings thr_config{}; + SetThreadConfig(thr_config); + + std::lock_guard configGuard(config_mutex_); category_filter_.reset(); filename_filter_.reset(); error_string_filter_.reset(); @@ -162,7 +165,7 @@ struct LogResources { std::unique_lock guard(cv_mutex_); - if (!logging_ && !logging_thread_) + if (!logging_ && !logging_thread_.joinable()) { // already killed return; @@ -237,19 +240,13 @@ struct LogResources work_ = false; } - if (logging_thread_) + if (logging_thread_.joinable()) { cv_.notify_all(); - // The #ifdef workaround here is due to an unsolved MSVC bug, which Microsoft has announced - // they have no intention of solving: https://connect.microsoft.com/VisualStudio/feedback/details/747145 - // Each VS version deals with post-main deallocation of threads in a very different way. -#if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800 - if (logging_thread_->joinable() && logging_thread_->get_id() != std::this_thread::get_id()) + if (logging_thread_.get_id() != std::this_thread::get_id()) { - logging_thread_->join(); + logging_thread_.join(); } -#endif // if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800 - logging_thread_.reset(); } } @@ -258,17 +255,19 @@ struct LogResources void StartThread() { std::unique_lock guard(cv_mutex_); - if (!logging_ && !logging_thread_) + if (!logging_ && !logging_thread_.joinable()) { logging_ = true; - logging_thread_.reset(new std::thread(&LogResources::run, this)); + auto thread_fn = [this]() + { + run(); + }; + logging_thread_ = eprosima::create_thread(thread_fn, thread_settings_, "dds.log"); } } void run() { - set_name_to_current_thread("dds.log"); - std::unique_lock guard(cv_mutex_); while (logging_) @@ -343,7 +342,7 @@ struct LogResources fastrtps::DBQueue logs_; std::vector> consumers_; - std::unique_ptr logging_thread_; + std::thread logging_thread_; // Condition variable segment. std::condition_variable cv_; @@ -361,7 +360,7 @@ struct LogResources std::unique_ptr error_string_filter_; std::atomic verbosity_; - + rtps::ThreadSettings thread_settings_; }; std::shared_ptr get_log_resources() diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.cpp b/src/cpp/rtps/DataSharing/DataSharingListener.cpp index a358767d317..e4aed898a11 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.cpp @@ -31,6 +31,7 @@ namespace rtps { DataSharingListener::DataSharingListener( std::shared_ptr notification, const std::string& datasharing_pools_directory, + const fastdds::rtps::ThreadSettings& thr_config, ResourceLimitedContainerConfig limits, RTPSReader* reader) : notification_(notification) @@ -39,6 +40,7 @@ DataSharingListener::DataSharingListener( , writer_pools_(limits) , writer_pools_changed_(false) , datasharing_pools_directory_(datasharing_pools_directory) + , thread_config_(thr_config) { } @@ -50,8 +52,6 @@ DataSharingListener::~DataSharingListener() void DataSharingListener::run() { - set_name_to_current_thread("dds.dsha.%u", reader_->getGuid().entityId.to_uint32() & 0x0000FFFF); - std::unique_lock lock(notification_->notification_->notification_mutex, std::defer_lock); while (is_running_.load()) { @@ -100,13 +100,15 @@ void DataSharingListener::start() } // Initialize the thread - listening_thread_ = new std::thread(&DataSharingListener::run, this); + uint32_t thread_id = reader_->getGuid().entityId.to_uint32() & 0x0000FFFF; + listening_thread_ = create_thread([this]() + { + run(); + }, thread_config_, "dds.dsha.%u", thread_id); } void DataSharingListener::stop() { - std::thread* thr = nullptr; - { std::lock_guard guard(mutex_); @@ -116,15 +118,11 @@ void DataSharingListener::stop() { return; } - - thr = listening_thread_; - listening_thread_ = nullptr; } // Notify the thread and wait for it to finish notification_->notify(); - thr->join(); - delete thr; + listening_thread_.join(); } void DataSharingListener::process_new_data () diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.hpp b/src/cpp/rtps/DataSharing/DataSharingListener.hpp index 41d27b4029c..3afc0bce6f5 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.hpp @@ -19,15 +19,17 @@ #ifndef RTPS_DATASHARING_DATASHARINGLISTENER_HPP #define RTPS_DATASHARING_DATASHARINGLISTENER_HPP +#include +#include +#include + #include +#include +#include + #include #include #include -#include - -#include -#include -#include namespace eprosima { namespace fastrtps { @@ -46,6 +48,7 @@ class DataSharingListener : public IDataSharingListener DataSharingListener( std::shared_ptr notification, const std::string& datasharing_pools_directory, + const fastdds::rtps::ThreadSettings& thr_config, ResourceLimitedContainerConfig limits, RTPSReader* reader); @@ -111,10 +114,11 @@ class DataSharingListener : public IDataSharingListener std::shared_ptr notification_; std::atomic is_running_; RTPSReader* reader_; - std::thread* listening_thread_; + std::thread listening_thread_; ResourceLimitedVector writer_pools_; std::atomic writer_pools_changed_; std::string datasharing_pools_directory_; + fastdds::rtps::ThreadSettings thread_config_; mutable std::mutex mutex_; }; diff --git a/src/cpp/rtps/RTPSDomain.cpp b/src/cpp/rtps/RTPSDomain.cpp index d89595b498f..0b385ea4359 100644 --- a/src/cpp/rtps/RTPSDomain.cpp +++ b/src/cpp/rtps/RTPSDomain.cpp @@ -66,6 +66,13 @@ std::shared_ptr RTPSDomainImpl::get_instance() return instance; } +void RTPSDomain::set_filewatch_thread_config( + const fastdds::rtps::ThreadSettings& watch_thread, + const fastdds::rtps::ThreadSettings& callback_thread) +{ + RTPSDomainImpl::set_filewatch_thread_config(watch_thread, callback_thread); +} + void RTPSDomain::stopAll() { RTPSDomainImpl::stopAll(); @@ -143,8 +150,10 @@ RTPSParticipant* RTPSDomainImpl::createParticipant( std::string filename = SystemInfo::get_environment_file(); if (!filename.empty() && SystemInfo::file_exists(filename)) { + std::lock_guard guard(instance->m_mutex); // Create filewatch - instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback); + instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback, + instance->watch_thread_config_, instance->callback_thread_config_); } else if (!filename.empty()) { @@ -778,6 +787,16 @@ void RTPSDomainImpl::file_watch_callback() } } +void RTPSDomainImpl::set_filewatch_thread_config( + const fastdds::rtps::ThreadSettings& watch_thread, + const fastdds::rtps::ThreadSettings& callback_thread) +{ + auto instance = get_instance(); + std::lock_guard guard(instance->m_mutex); + instance->watch_thread_config_ = watch_thread; + instance->callback_thread_config_ = callback_thread; +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/RTPSDomainImpl.hpp b/src/cpp/rtps/RTPSDomainImpl.hpp index fefa90b6001..88025e52404 100644 --- a/src/cpp/rtps/RTPSDomainImpl.hpp +++ b/src/cpp/rtps/RTPSDomainImpl.hpp @@ -25,6 +25,7 @@ #include #endif // defined(_WIN32) || defined(__unix__) +#include #include #include #include @@ -203,6 +204,18 @@ class RTPSDomainImpl */ static void file_watch_callback(); + /** + * Method to set the configuration of the threads created by the file watcher for the environment file. + * In order for these settings to take effect, this method must be called before the first call + * to @ref createParticipant. + * + * @param watch_thread Settings for the thread watching the environment file. + * @param callback_thread Settings for the thread executing the callback when the environment file changed. + */ + static void set_filewatch_thread_config( + const fastdds::rtps::ThreadSettings& watch_thread, + const fastdds::rtps::ThreadSettings& callback_thread); + private: /** @@ -252,6 +265,8 @@ class RTPSDomainImpl std::unordered_map m_RTPSParticipantIDs; FileWatchHandle file_watch_handle_; + fastdds::rtps::ThreadSettings watch_thread_config_; + fastdds::rtps::ThreadSettings callback_thread_config_; }; } // namespace rtps diff --git a/src/cpp/rtps/attributes/ThreadSettings.cpp b/src/cpp/rtps/attributes/ThreadSettings.cpp index 8ee925711d1..0c2761539f6 100644 --- a/src/cpp/rtps/attributes/ThreadSettings.cpp +++ b/src/cpp/rtps/attributes/ThreadSettings.cpp @@ -21,7 +21,7 @@ bool ThreadSettings::operator ==( { return (scheduling_policy == rhs.scheduling_policy && priority == rhs.priority && - cpu_mask == rhs.cpu_mask && + affinity == rhs.affinity && stack_size == rhs.stack_size); } diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index 6e524dd8e7b..cb39e120049 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -162,11 +162,10 @@ bool PDPServer::init( getRTPSParticipant()->enableReader(edp->publications_reader_.first); // Initialize server dedicated thread. - uint32_t id_for_thread = static_cast(getRTPSParticipant()->getRTPSParticipantAttributes().participantID); - resource_event_thread_.init_thread([id_for_thread]() - { - set_name_to_current_thread("dds.ds_ev.%u", id_for_thread); - }); + const RTPSParticipantAttributes& part_attr = getRTPSParticipant()->getRTPSParticipantAttributes(); + uint32_t id_for_thread = static_cast(part_attr.participantID); + const fastdds::rtps::ThreadSettings& thr_config = part_attr.discovery_server_thread; + resource_event_thread_.init_thread(thr_config, "dds.ds_ev.%u", id_for_thread); /* Given the fact that a participant is either a client or a server the diff --git a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp index 0ae267ff6d3..a31a68b8534 100644 --- a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp +++ b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp @@ -1062,7 +1062,10 @@ class FlowControllerImpl : public FlowController if (async_mode.running.compare_exchange_strong(expected, true)) { // Code for initializing the asynchronous thread. - async_mode.thread = std::thread(&FlowControllerImpl::run, this); + async_mode.thread = create_thread([this]() + { + run(); + }, thread_settings_, "dds.asyn.%u.%u", participant_id_, async_index_); } } @@ -1341,8 +1344,6 @@ class FlowControllerImpl : public FlowController */ void run() { - set_name_to_current_thread("dds.asyn.%u.%u", participant_id_, async_index_); - while (async_mode.running) { // There are writers interested in removing a sample. diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 7be88aad027..ed8a32a0ee4 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -245,10 +245,8 @@ RTPSParticipantImpl::RTPSParticipantImpl( mp_userParticipant->mp_impl = this; uint32_t id_for_thread = static_cast(m_att.participantID); - mp_event_thr.init_thread([id_for_thread]() - { - set_name_to_current_thread("dds.ev.%u", id_for_thread); - }); + const fastdds::rtps::ThreadSettings& thr_config = m_att.timed_events_thread; + mp_event_thr.init_thread(thr_config, "dds.ev.%u", id_for_thread); if (!networkFactoryHasRegisteredTransports()) { @@ -2207,11 +2205,8 @@ bool RTPSParticipantImpl::is_security_enabled_for_reader( security::Logging* RTPSParticipantImpl::create_builtin_logging_plugin() { - return new security::LogTopic([this]() - { - uint32_t participant_id = static_cast(m_att.participantID); - set_name_to_current_thread("dds.slog.%u", participant_id); - }); + uint32_t participant_id = static_cast(m_att.participantID); + return new security::LogTopic(participant_id, m_att.security_log_thread); } #endif // if HAVE_SECURITY diff --git a/src/cpp/rtps/reader/RTPSReader.cpp b/src/cpp/rtps/reader/RTPSReader.cpp index d337520d456..297feee6ef9 100644 --- a/src/cpp/rtps/reader/RTPSReader.cpp +++ b/src/cpp/rtps/reader/RTPSReader.cpp @@ -130,6 +130,7 @@ void RTPSReader::init( datasharing_listener_.reset(new DataSharingListener( notification, att.endpoint.data_sharing_configuration().shm_directory(), + att.data_sharing_listener_thread, att.matched_writers_allocation, this)); diff --git a/src/cpp/rtps/resources/ResourceEvent.cpp b/src/cpp/rtps/resources/ResourceEvent.cpp index d5f58eb0a81..66acf31f4c1 100644 --- a/src/cpp/rtps/resources/ResourceEvent.cpp +++ b/src/cpp/rtps/resources/ResourceEvent.cpp @@ -16,13 +16,14 @@ * @file ResourceEvent.cpp */ +#include +#include + #include #include #include "TimedEventImpl.h" - -#include -#include +#include namespace eprosima { namespace fastrtps { @@ -304,7 +305,9 @@ void ResourceEvent::do_timer_actions() } void ResourceEvent::init_thread( - std::function configure_cb) + const fastdds::rtps::ThreadSettings& thread_cfg, + const char* name_fmt, + uint32_t thread_id) { std::lock_guard lock(mutex_); @@ -312,14 +315,10 @@ void ResourceEvent::init_thread( stop_.store(false); resize_collections(); - thread_ = std::thread([this, configure_cb]() + thread_ = eprosima::create_thread([this]() { - if (configure_cb) - { - configure_cb(); - } event_service(); - }); + }, thread_cfg, name_fmt, thread_id); } } /* namespace rtps */ diff --git a/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp b/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp index d57ba8d30dd..91640346586 100644 --- a/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp +++ b/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp @@ -38,7 +38,7 @@ bool PortBasedTransportDescriptor::operator ==( } const ThreadSettings& PortBasedTransportDescriptor::get_thread_config_for_port( - uint32_t port) + uint32_t port) const { auto search = reception_threads_.find(port); if (search != reception_threads_.end()) diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 7b4f3ba528f..e4a3c2a111f 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -196,11 +196,10 @@ void TCPTransportInterface::clean() alive_.store(false); keep_alive_event_.cancel(); - if (io_service_timers_thread_) + if (io_service_timers_thread_.joinable()) { io_service_timers_.stop(); - io_service_timers_thread_->join(); - io_service_timers_thread_ = nullptr; + io_service_timers_thread_.join(); } { @@ -242,11 +241,10 @@ void TCPTransportInterface::clean() } } - if (io_service_thread_) + if (io_service_thread_.joinable()) { io_service_.stop(); - io_service_thread_->join(); - io_service_thread_ = nullptr; + io_service_thread_.join(); } } @@ -455,7 +453,6 @@ bool TCPTransportInterface::init( auto ioServiceFunction = [&]() { - set_name_to_current_thread("dds.tcp_accept"); #if ASIO_VERSION >= 101200 asio::executor_work_guard work(io_service_.get_executor()); #else @@ -463,21 +460,22 @@ bool TCPTransportInterface::init( #endif // if ASIO_VERSION >= 101200 io_service_.run(); }; - io_service_thread_ = std::make_shared(ioServiceFunction); + io_service_thread_ = create_thread(ioServiceFunction, configuration()->accept_thread, "dds.tcp_accept"); if (0 < configuration()->keep_alive_frequency_ms) { - io_service_timers_thread_ = std::make_shared([&]() - { - set_name_to_current_thread("dds.tcp_keep"); + auto ioServiceTimersFunction = [&]() + { #if ASIO_VERSION >= 101200 - asio::executor_work_guard work(io_service_timers_. + asio::executor_work_guard work(io_service_timers_. get_executor()); #else - io_service::work work(io_service_timers_); + io_service::work work(io_service_timers_); #endif // if ASIO_VERSION >= 101200 - io_service_timers_.run(); - }); + io_service_timers_.run(); + }; + io_service_timers_thread_ = create_thread(ioServiceTimersFunction, + configuration()->keep_alive_thread, "dds.tcp_keep"); } return true; @@ -839,6 +837,20 @@ void TCPTransportInterface::keep_alive() */ } +void TCPTransportInterface::create_listening_thread( + const std::shared_ptr& channel) +{ + std::weak_ptr channel_weak_ptr = channel; + std::weak_ptr rtcp_manager_weak_ptr = rtcp_message_manager_; + auto fn = [this, channel_weak_ptr, rtcp_manager_weak_ptr]() + { + perform_listen_operation(channel_weak_ptr, rtcp_manager_weak_ptr); + }; + uint32_t port = channel->local_endpoint().port(); + const ThreadSettings& thr_config = configuration()->get_thread_config_for_port(port); + channel->thread(create_thread(fn, thr_config, "dds.tcp.%u", port)); +} + void TCPTransportInterface::perform_listen_operation( std::weak_ptr channel_weak, std::weak_ptr rtcp_manager) @@ -1342,10 +1354,7 @@ void TCPTransportInterface::SocketAccepted( } channel->set_options(configuration()); - std::weak_ptr channel_weak_ptr = channel; - std::weak_ptr rtcp_manager_weak_ptr = rtcp_message_manager_; - channel->thread(std::thread(&TCPTransportInterface::perform_listen_operation, this, - channel_weak_ptr, rtcp_manager_weak_ptr)); + create_listening_thread(channel); EPROSIMA_LOG_INFO(RTCP, "Accepted connection (local: " << IPLocator::to_string(locator) << ", remote: " @@ -1389,10 +1398,7 @@ void TCPTransportInterface::SecureSocketAccepted( } secure_channel->set_options(configuration()); - std::weak_ptr channel_weak_ptr = secure_channel; - std::weak_ptr rtcp_manager_weak_ptr = rtcp_message_manager_; - secure_channel->thread(std::thread(&TCPTransportInterface::perform_listen_operation, this, - channel_weak_ptr, rtcp_manager_weak_ptr)); + create_listening_thread(secure_channel); EPROSIMA_LOG_INFO(RTCP, " Accepted connection (local: " << IPLocator::to_string(locator) << ", remote: " << socket->lowest_layer().remote_endpoint().address() @@ -1434,10 +1440,7 @@ void TCPTransportInterface::SocketConnected( { channel->change_status(TCPChannelResource::eConnectionStatus::eConnected); channel->set_options(configuration()); - - std::weak_ptr rtcp_manager_weak_ptr = rtcp_message_manager_; - channel->thread(std::thread(&TCPTransportInterface::perform_listen_operation, this, - channel_weak_ptr, rtcp_manager_weak_ptr)); + create_listening_thread(channel); } } else diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index f21f61b2e3f..412c905b475 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -15,12 +15,23 @@ #ifndef _FASTDDS_TCP_TRANSPORT_INTERFACE_H_ #define _FASTDDS_TCP_TRANSPORT_INTERFACE_H_ -#include +#include +#include +#include +#include +#include + +#include +#include + #include +#include #include + #include -#include #include +#include + #if TLS_FOUND #define OPENSSL_API_COMPAT 10101 #include @@ -29,14 +40,6 @@ #include -#include -#include -#include -#include -#include -#include -#include - namespace eprosima { namespace fastdds { namespace rtps { @@ -79,8 +82,8 @@ class TCPTransportInterface : public TransportInterface #if TLS_FOUND asio::ssl::context ssl_context_; #endif // if TLS_FOUND - std::shared_ptr io_service_thread_; - std::shared_ptr io_service_timers_thread_; + std::thread io_service_thread_; + std::thread io_service_timers_thread_; std::shared_ptr rtcp_message_manager_; std::mutex rtcp_message_manager_mutex_; std::condition_variable rtcp_message_manager_cv_; @@ -195,6 +198,9 @@ class TCPTransportInterface : public TransportInterface std::shared_ptr& channel, const Locator& remote_locator); + void create_listening_thread( + const std::shared_ptr& channel); + public: friend class RTCPMessageManager; diff --git a/src/cpp/rtps/transport/UDPChannelResource.cpp b/src/cpp/rtps/transport/UDPChannelResource.cpp index 62904444c7c..03efd8737d1 100644 --- a/src/cpp/rtps/transport/UDPChannelResource.cpp +++ b/src/cpp/rtps/transport/UDPChannelResource.cpp @@ -15,7 +15,10 @@ #include #include + +#include #include + #include #include @@ -32,7 +35,8 @@ UDPChannelResource::UDPChannelResource( uint32_t maxMsgSize, const Locator& locator, const std::string& sInterface, - TransportReceiverInterface* receiver) + TransportReceiverInterface* receiver, + const ThreadSettings& thread_config) : ChannelResource(maxMsgSize) , message_receiver_(receiver) , socket_(moveSocket(socket)) @@ -40,7 +44,11 @@ UDPChannelResource::UDPChannelResource( , interface_(sInterface) , transport_(transport) { - thread(std::thread(&UDPChannelResource::perform_listen_operation, this, locator)); + auto fn = [this, locator]() + { + perform_listen_operation(locator); + }; + thread(create_thread(fn, thread_config, "dds.udp.%u", locator.port)); } UDPChannelResource::~UDPChannelResource() @@ -54,8 +62,6 @@ UDPChannelResource::~UDPChannelResource() void UDPChannelResource::perform_listen_operation( Locator input_locator) { - set_name_to_current_thread("dds.udp.%u", input_locator.port); - Locator remote_locator; while (alive()) diff --git a/src/cpp/rtps/transport/UDPChannelResource.h b/src/cpp/rtps/transport/UDPChannelResource.h index 146b164c3f4..a496a89bf13 100644 --- a/src/cpp/rtps/transport/UDPChannelResource.h +++ b/src/cpp/rtps/transport/UDPChannelResource.h @@ -16,7 +16,10 @@ #define _FASTDDS_UDP_CHANNEL_RESOURCE_INFO_ #include + +#include #include + #include namespace eprosima { @@ -110,7 +113,8 @@ class UDPChannelResource : public ChannelResource uint32_t maxMsgSize, const Locator& locator, const std::string& sInterface, - TransportReceiverInterface* receiver); + TransportReceiverInterface* receiver, + const ThreadSettings& thread_config); virtual ~UDPChannelResource() override; diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 19adeb1a91a..820056e26b3 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -231,7 +231,7 @@ UDPChannelResource* UDPTransportInterface::CreateInputChannelResource( eProsimaUDPSocket unicastSocket = OpenAndBindInputSocket(sInterface, IPLocator::getPhysicalPort(locator), is_multicast); UDPChannelResource* p_channel_resource = new UDPChannelResource(this, unicastSocket, maxMsgSize, locator, - sInterface, receiver); + sInterface, receiver, configuration()->get_thread_config_for_port(locator.port)); return p_channel_resource; } diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp index 139e5284a95..321f1870d7a 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp @@ -15,6 +15,7 @@ #ifndef _FASTDDS_SHAREDMEM_CHANNEL_RESOURCE_ #define _FASTDDS_SHAREDMEM_CHANNEL_RESOURCE_ +#include #include #include @@ -39,7 +40,9 @@ class SharedMemChannelResource : public ChannelResource const Locator& locator, TransportReceiverInterface* receiver, const std::string& dump_file, - bool should_init_thread = true) + const ThreadSettings& dump_thr_config, + bool should_init_thread, + const ThreadSettings& thr_config) : ChannelResource() , message_receiver_(receiver) , listener_(listener) @@ -51,13 +54,13 @@ class SharedMemChannelResource : public ChannelResource auto packets_file_consumer = std::unique_ptr( new SHMPacketFileConsumer(dump_file)); - packet_logger_ = std::make_shared>(locator.port); + packet_logger_ = std::make_shared>(locator.port, dump_thr_config); packet_logger_->RegisterConsumer(std::move(packets_file_consumer)); } if (should_init_thread) { - init_thread(locator); + init_thread(locator, thr_config); } } @@ -125,8 +128,6 @@ class SharedMemChannelResource : public ChannelResource void perform_listen_operation( Locator input_locator) { - set_name_to_current_thread("dds.shm.%u", input_locator.port); - Locator remote_locator; while (alive()) @@ -168,9 +169,14 @@ class SharedMemChannelResource : public ChannelResource protected: void init_thread( - const Locator& locator) + const Locator& locator, + const ThreadSettings& thr_config) { - this->thread(std::thread(&SharedMemChannelResource::perform_listen_operation, this, locator)); + auto fn = [this, locator]() + { + perform_listen_operation(locator); + }; + this->thread(create_thread(fn, thr_config, "dds.shm.%u", locator.port)); } /** diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemLog.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemLog.hpp index 893fef62e0e..80f3ebf0bea 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemLog.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemLog.hpp @@ -15,10 +15,13 @@ #ifndef _FASTDDS_SHAREDMEM_LOG_H_ #define _FASTDDS_SHAREDMEM_LOG_H_ +#include #include #include + #include #include +#include namespace eprosima { namespace fastdds { @@ -200,8 +203,10 @@ class PacketsLog public: PacketsLog( - uint32_t thread_id) + uint32_t thread_id, + const ThreadSettings& thread_config) : thread_id_(thread_id) + , thread_config_(thread_config) { } @@ -242,7 +247,7 @@ class PacketsLog { std::unique_lock guard(resources_.cv_mutex); - if (!resources_.logging && !resources_.logging_thread) + if (!resources_.logging && !resources_.logging_thread.joinable()) { // already killed return; @@ -286,31 +291,26 @@ class PacketsLog resources_.work = false; } - if (resources_.logging_thread) + if (resources_.logging_thread.joinable()) { resources_.cv.notify_all(); - // The #ifdef workaround here is due to an unsolved MSVC bug, which Microsoft has announced - // they have no intention of solving: https://connect.microsoft.com/VisualStudio/feedback/details/747145 - // Each VS version deals with post-main deallocation of threads in a very different way. - #if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800 - resources_.logging_thread->join(); - #endif // if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800 - resources_.logging_thread.reset(); + resources_.logging_thread.join(); } } - // Note: In VS2013, if you're linking this class statically, you will have to call KillThread before leaving - // main, due to an unsolved MSVC bug. - void QueueLog( const typename TPacketConsumer::Pkt& packet) { { std::unique_lock guard(resources_.cv_mutex); - if (!resources_.logging && !resources_.logging_thread) + if (!resources_.logging && !resources_.logging_thread.joinable()) { resources_.logging = true; - resources_.logging_thread.reset(new std::thread(&PacketsLog::run, this)); + auto fn = [this]() + { + run(); + }; + resources_.logging_thread = create_thread(fn, thread_config_, "dds.shmd.%u", thread_id_); } } @@ -333,7 +333,7 @@ class PacketsLog { eprosima::fastrtps::DBQueue logs; std::vector> consumers; - std::unique_ptr logging_thread; + std::thread logging_thread; // Condition variable segment. std::condition_variable cv; @@ -356,10 +356,10 @@ class PacketsLog Resources resources_; uint32_t thread_id_; + ThreadSettings thread_config_; void run() { - set_name_to_current_thread("dds.shmd.%u", thread_id_); std::unique_lock guard(resources_.cv_mutex); while (resources_.logging) diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp index 00095d1f9a7..8c54f702c6d 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp @@ -290,7 +290,7 @@ bool SharedMemTransport::init( auto packets_file_consumer = std::unique_ptr( new SHMPacketFileConsumer(configuration_.rtps_dump_file())); - packet_logger_ = std::make_shared>(0); + packet_logger_ = std::make_shared>(0, configuration_.dump_thread()); packet_logger_->RegisterConsumer(std::move(packets_file_consumer)); } } @@ -341,7 +341,10 @@ SharedMemChannelResource* SharedMemTransport::CreateInputChannelResource( open_mode)->create_listener(), locator, receiver, - configuration_.rtps_dump_file()); + configuration_.rtps_dump_file(), + configuration_.dump_thread(), + true, + configuration_.get_thread_config_for_port(locator.port)); } bool SharedMemTransport::OpenOutputChannel( diff --git a/src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp b/src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp index a950f74f7ec..641677feec8 100644 --- a/src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp +++ b/src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp @@ -33,11 +33,14 @@ class test_SharedMemChannelResource : public SharedMemChannelResource TransportReceiverInterface* receiver, uint32_t big_buffer_size, uint32_t* big_buffer_size_count) - : SharedMemChannelResource(listener, locator, receiver, std::string(), false) + : SharedMemChannelResource( + listener, locator, receiver, + std::string(), ThreadSettings{}, + false, ThreadSettings{}) , big_buffer_size_(big_buffer_size) , big_buffer_size_count_(big_buffer_size_count) { - init_thread(locator); + init_thread(locator, ThreadSettings{}); } virtual ~test_SharedMemChannelResource() override diff --git a/src/cpp/security/logging/LogTopic.cpp b/src/cpp/security/logging/LogTopic.cpp index b461cc9fc7e..747710d915c 100644 --- a/src/cpp/security/logging/LogTopic.cpp +++ b/src/cpp/security/logging/LogTopic.cpp @@ -1,25 +1,22 @@ #include -#include -#include - #include +#include + namespace eprosima { namespace fastrtps { namespace rtps { namespace security { LogTopic::LogTopic( - std::function thread_init_cb) + uint32_t thread_id, + const fastdds::rtps::ThreadSettings& thr_config) : stop_(false) - , thread_([this, thread_init_cb]() + , thread_( + create_thread( + [this]() { - if (thread_init_cb) - { - thread_init_cb(); - } - while (true) { // Put the thread asleep until there is @@ -37,7 +34,8 @@ LogTopic::LogTopic( publish(*p); } - }) + }, + thr_config, "dds.slog.%u", thread_id)) { // } diff --git a/src/cpp/security/logging/LogTopic.h b/src/cpp/security/logging/LogTopic.h index 47ebcd9be7e..6c19f958708 100644 --- a/src/cpp/security/logging/LogTopic.h +++ b/src/cpp/security/logging/LogTopic.h @@ -18,6 +18,7 @@ #ifndef _FASTDDS_RTPS_SECURITY_LOGGING_LOGTOPIC_H_ #define _FASTDDS_RTPS_SECURITY_LOGGING_LOGTOPIC_H_ +#include #include #include @@ -43,7 +44,8 @@ class LogTopic final : public Logging public: LogTopic( - std::function thread_init_cb = {}); + uint32_t thread_id = 0, + const fastdds::rtps::ThreadSettings& thr_config = {}); ~LogTopic(); private: diff --git a/src/cpp/utils/SystemInfo.cpp b/src/cpp/utils/SystemInfo.cpp index 74fd688ba51..8c285d39b0e 100644 --- a/src/cpp/utils/SystemInfo.cpp +++ b/src/cpp/utils/SystemInfo.cpp @@ -214,7 +214,9 @@ const std::string& SystemInfo::get_environment_file() FileWatchHandle SystemInfo::watch_file( std::string filename, - std::function callback) + std::function callback, + const fastdds::rtps::ThreadSettings& watch_thread_config, + const fastdds::rtps::ThreadSettings& callback_thread_config) { #if defined(_WIN32) || defined(__unix__) return FileWatchHandle (new filewatch::FileWatch(filename, @@ -229,10 +231,12 @@ FileWatchHandle SystemInfo::watch_file( // No-op break; } - })); + }, watch_thread_config, callback_thread_config)); #else // defined(_WIN32) || defined(__unix__) static_cast(filename); static_cast(callback); + static_cast(watch_thread_config); + static_cast(callback_thread_config); return FileWatchHandle(); #endif // defined(_WIN32) || defined(__unix__) } diff --git a/src/cpp/utils/SystemInfo.hpp b/src/cpp/utils/SystemInfo.hpp index d2b4420c4be..418da3d9963 100644 --- a/src/cpp/utils/SystemInfo.hpp +++ b/src/cpp/utils/SystemInfo.hpp @@ -25,6 +25,8 @@ #include #include +#include + #include #include @@ -193,12 +195,16 @@ class SystemInfo * * @param [in] filename Path/name of the file to watch. * @param [in] callback Callback to execute when the file changes. + * @param [in] watch_thread_config Thread settings for watch thread. + * @param [in] callback_thread_config Thread settings for callback thread. * * @return The handle that represents the watcher object. */ static FileWatchHandle watch_file( std::string filename, - std::function callback); + std::function callback, + const fastdds::rtps::ThreadSettings& watch_thread_config, + const fastdds::rtps::ThreadSettings& callback_thread_config); /** * Stop a file watcher. diff --git a/src/cpp/utils/shared_memory/SharedMemWatchdog.hpp b/src/cpp/utils/shared_memory/SharedMemWatchdog.hpp index e77c3131fad..cf9ac1edd28 100644 --- a/src/cpp/utils/shared_memory/SharedMemWatchdog.hpp +++ b/src/cpp/utils/shared_memory/SharedMemWatchdog.hpp @@ -22,6 +22,8 @@ #include #include +#include + #include namespace eprosima { @@ -77,6 +79,12 @@ class SharedMemWatchdog } } + static void set_thread_settings( + const ThreadSettings& thr_config) + { + thread_settings() = thr_config; + } + static constexpr std::chrono::milliseconds period() { return std::chrono::milliseconds(1000); @@ -101,11 +109,21 @@ class SharedMemWatchdog std::atomic_bool exit_thread_; + static ThreadSettings& thread_settings() + { + static ThreadSettings s_settings(ThreadSettings{}); + return s_settings; + } + SharedMemWatchdog() : wake_run_(false) , exit_thread_(false) { - thread_run_ = std::thread(&SharedMemWatchdog::run, this); + auto fn = [this]() + { + run(); + }; + thread_run_ = create_thread(fn, thread_settings(), "dds.shm.wdog"); } /** @@ -123,8 +141,6 @@ class SharedMemWatchdog void run() { - set_name_to_current_thread("dds.shm.wdog"); - while (!exit_thread_) { { diff --git a/src/cpp/utils/threading.hpp b/src/cpp/utils/threading.hpp index dc28df06dc7..778d8662c35 100644 --- a/src/cpp/utils/threading.hpp +++ b/src/cpp/utils/threading.hpp @@ -15,8 +15,17 @@ #ifndef UTILS__THREADING_HPP_ #define UTILS__THREADING_HPP_ +#include + namespace eprosima { +// Forward declare dependencies +namespace fastdds { +namespace rtps { +struct ThreadSettings; +} // namespace rtps +} // namespace fastdds + /** * @brief Give a name to the thread calling this function. * @@ -55,6 +64,42 @@ void set_name_to_current_thread( uint32_t arg1, uint32_t arg2); + +/** + * @brief Apply thread settings to the thread calling this function. + * + * @param[in] settings Thread settings to apply. + */ +void apply_thread_settings_to_current_thread( + const fastdds::rtps::ThreadSettings& settings); + +/** + * @brief Create and start a thread with custom settings and name. + * + * This wrapper will create a thread on which the incoming functor will be called after + * applying giving it a custom name and applying the thread settings. + * + * @param[in] func Functor with the logic to be run on the created thread. + * @param[in] settings Thread settings to apply to the created thread. + * @param[in] name Name (format) for the created thread. + * @param[in] args Additional arguments to complete the thread name. + * See @ref set_name_to_current_thread for details. + */ +template +std::thread create_thread( + Functor func, + const fastdds::rtps::ThreadSettings& settings, + const char* name, + Args... args) +{ + return std::thread([=]() + { + apply_thread_settings_to_current_thread(settings); + set_name_to_current_thread(name, args ...); + func(); + }); +} + } // eprosima #endif // UTILS__THREADING_HPP_ diff --git a/src/cpp/utils/threading/threading_empty.ipp b/src/cpp/utils/threading/threading_empty.ipp index 30d97890bca..684664fc29a 100644 --- a/src/cpp/utils/threading/threading_empty.ipp +++ b/src/cpp/utils/threading/threading_empty.ipp @@ -32,4 +32,9 @@ void set_name_to_current_thread( { } +void apply_thread_settings_to_current_thread( + const fastdds::rtps::ThreadSettings& /*settings*/) +{ +} + } // namespace eprosima diff --git a/src/cpp/utils/threading/threading_osx.ipp b/src/cpp/utils/threading/threading_osx.ipp index efb215bd99b..3131d3642a7 100644 --- a/src/cpp/utils/threading/threading_osx.ipp +++ b/src/cpp/utils/threading/threading_osx.ipp @@ -12,9 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include + #include -#include -#include + +#include +#include namespace eprosima { @@ -48,4 +53,81 @@ void set_name_to_current_thread( set_name_to_current_thread_impl(fmt, arg1, arg2); } +static void configure_current_thread_scheduler( + int sched_class, + int sched_priority) +{ + pthread_t self_tid = pthread_self(); + sched_param param; + sched_param current_param; + int current_class; + int result = 0; + bool change_priority = (std::numeric_limits::min() != sched_priority); + + // Get current scheduling parameters + memset(¤t_param, 0, sizeof(current_param)); + pthread_getschedparam(self_tid, ¤t_class, ¤t_param); + + memset(¶m, 0, sizeof(param)); + param.sched_priority = 0; + sched_class = (sched_class == -1) ? current_class : sched_class; + + // + // Set Scheduler Class and Priority + // + + if((sched_class == SCHED_OTHER) || + (sched_class == SCHED_BATCH) || + (sched_class == SCHED_IDLE)) + { + // + // BATCH and IDLE do not have explicit priority values. + // - Requires priorty value to be zero (0). + + result = pthread_setschedparam(self_tid, sched_class, ¶m); + + // + // Sched OTHER has a nice value, that we pull from the priority parameter. + // + + if(sched_class == SCHED_OTHER && change_priority) + { + result = setpriority(PRIO_PROCESS, gettid(), sched_priority); + } + } + else if((sched_class == SCHED_FIFO) || + (sched_class == SCHED_RR)) + { + // + // RT Policies use a different priority numberspace. + // + + param.sched_priority = change_priority ? sched_priority : current_param.sched_priority; + result = pthread_setschedparam(self_tid, sched_class, ¶m); + } + + if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << strerror(result) << "' configuring scheduler for thread " << self_tid); + } +} + +static void configure_current_thread_affinity( + uint64_t affinity) +{ + if (affinity <= static_cast(std::numeric_limits::max())) + { + thread_affinity_policy_data_t policy = { static_cast(affinity) }; + pthread_t self_tid = pthread_self(); + thread_policy_set(pthread_mach_thread_np(self_tid), THREAD_AFFINITY_POLICY, (thread_policy_t)&policy, 1); + } +} + +void apply_thread_settings_to_current_thread( + const fastdds::rtps::ThreadSettings& settings) +{ + configure_current_thread_scheduler(settings.scheduling_policy, settings.priority); + configure_current_thread_affinity(settings.affinity); +} + } // namespace eprosima diff --git a/src/cpp/utils/threading/threading_pthread.ipp b/src/cpp/utils/threading/threading_pthread.ipp index 1090d9939c0..5803c3464e1 100644 --- a/src/cpp/utils/threading/threading_pthread.ipp +++ b/src/cpp/utils/threading/threading_pthread.ipp @@ -12,9 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include + #include -#include -#include +#include +#include +#include +#include + +#include +#include namespace eprosima { @@ -49,4 +58,120 @@ void set_name_to_current_thread( set_name_to_current_thread_impl(fmt, arg1, arg2); } +static void configure_current_thread_scheduler( + int sched_class, + int sched_priority) +{ + pthread_t self_tid = pthread_self(); + sched_param param; + sched_param current_param; + int current_class; + int result = 0; + bool change_priority = (std::numeric_limits::min() != sched_priority); + + // Get current scheduling parameters + memset(¤t_param, 0, sizeof(current_param)); + pthread_getschedparam(self_tid, ¤t_class, ¤t_param); + + memset(¶m, 0, sizeof(param)); + param.sched_priority = 0; + sched_class = (sched_class == -1) ? current_class : sched_class; + + // + // Set Scheduler Class and Priority + // + + if((sched_class == SCHED_OTHER) || + (sched_class == SCHED_BATCH) || + (sched_class == SCHED_IDLE)) + { + // + // BATCH and IDLE do not have explicit priority values. + // - Requires priorty value to be zero (0). + + result = pthread_setschedparam(self_tid, sched_class, ¶m); + + // + // Sched OTHER has a nice value, that we pull from the priority parameter. + // + + if(sched_class == SCHED_OTHER && change_priority) + { + result = setpriority(PRIO_PROCESS, gettid(), sched_priority); + } + } + else if((sched_class == SCHED_FIFO) || + (sched_class == SCHED_RR)) + { + // + // RT Policies use a different priority numberspace. + // + + param.sched_priority = change_priority ? sched_priority : current_param.sched_priority; + result = pthread_setschedparam(self_tid, sched_class, ¶m); + } + + if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << strerror(result) << "' configuring scheduler for thread " << self_tid); + } +} + +static void configure_current_thread_affinity( + uint64_t affinity_mask) +{ + int a; + int result; + int cpu_count; + cpu_set_t cpu_set; + pthread_t self_tid = pthread_self(); + + result = 0; + + // + // Rebuilt the cpu set from scratch... + // + + CPU_ZERO(&cpu_set); + + // + // If the bit is set in our mask, set it into the cpu_set + // We only consider up to the total number of CPU's the + // system has. + // + cpu_count = get_nprocs_conf(); + + for(a = 0; a < cpu_count; a++) + { + if(0 != (affinity_mask & 1)) + { + CPU_SET(a, &cpu_set); + result++; + } + affinity_mask >>= 1; + } + + if (affinity_mask > 0) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Affinity mask has more processors than the ones present in the system"); + } + + if(result > 0) + { + result = pthread_setaffinity_np(self_tid, sizeof(cpu_set_t), &cpu_set); + } + + if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << strerror(result) << "' configuring affinity for thread " << self_tid); + } +} + +void apply_thread_settings_to_current_thread( + const fastdds::rtps::ThreadSettings& settings) +{ + configure_current_thread_scheduler(settings.scheduling_policy, settings.priority); + configure_current_thread_affinity(settings.affinity); +} + } // namespace eprosima diff --git a/src/cpp/utils/threading/threading_win32.ipp b/src/cpp/utils/threading/threading_win32.ipp index 1a9f683bd0f..4c3565663f5 100644 --- a/src/cpp/utils/threading/threading_win32.ipp +++ b/src/cpp/utils/threading/threading_win32.ipp @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include +#include + namespace eprosima { template @@ -53,4 +56,35 @@ void set_name_to_current_thread( set_name_to_current_thread_impl(fmt, arg1, arg2); } +static void configure_current_thread_priority( + int32_t priority) +{ + if (priority != std::numeric_limits::min()) + { + if (0 == SetThreadPriority(GetCurrentThread(), priority)) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << GetLastError() << "' configuring priority for thread " << GetCurrentThread()); + } + } +} + +static void configure_current_thread_affinity( + uint64_t affinity_mask) +{ + if (affinity_mask != 0) + { + if (0 == SetThreadAffinityMask(GetCurrentThread(), affinity_mask)) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << GetLastError() << "' configuring affinity for thread " << GetCurrentThread()); + } + } +} + +void apply_thread_settings_to_current_thread( + const fastdds::rtps::ThreadSettings& settings) +{ + configure_current_thread_priority(settings.priority); + configure_current_thread_affinity(settings.affinity); +} + } // namespace eprosima diff --git a/test/mock/rtps/RTPSDomain/fastdds/rtps/RTPSDomain.h b/test/mock/rtps/RTPSDomain/fastdds/rtps/RTPSDomain.h index 7cf26913643..4c16829e480 100644 --- a/test/mock/rtps/RTPSDomain/fastdds/rtps/RTPSDomain.h +++ b/test/mock/rtps/RTPSDomain/fastdds/rtps/RTPSDomain.h @@ -16,6 +16,7 @@ #define _FASTDDS_RTPS_DOMAIN_H_ #include +#include #include #include @@ -44,6 +45,12 @@ class RTPSDomain { public: + static void set_filewatch_thread_config( + const fastdds::rtps::ThreadSettings&, + const fastdds::rtps::ThreadSettings&) + { + } + static void stopAll() { } diff --git a/test/unittest/dds/participant/ParticipantTests.cpp b/test/unittest/dds/participant/ParticipantTests.cpp index 84af1c34e77..91f89279ef4 100644 --- a/test/unittest/dds/participant/ParticipantTests.cpp +++ b/test/unittest/dds/participant/ParticipantTests.cpp @@ -3002,23 +3002,23 @@ TEST(ParticipantTests, UpdatableDomainParticipantQos) // Check that the builtin_controllers_sender_thread can not be changed in an enabled participant participant->get_qos(pqos); - pqos.builtin_controllers_sender_thread().cpu_mask = 1; + pqos.builtin_controllers_sender_thread().affinity = 1; ASSERT_EQ(participant->set_qos(pqos), ReturnCode_t::RETCODE_IMMUTABLE_POLICY); // Check that the timed_events_thread can not be changed in an enabled participant participant->get_qos(pqos); - pqos.timed_events_thread().cpu_mask = 1; + pqos.timed_events_thread().affinity = 1; ASSERT_EQ(participant->set_qos(pqos), ReturnCode_t::RETCODE_IMMUTABLE_POLICY); // Check that the discovery_server_thread can not be changed in an enabled participant participant->get_qos(pqos); - pqos.discovery_server_thread().cpu_mask = 1; + pqos.discovery_server_thread().affinity = 1; ASSERT_EQ(participant->set_qos(pqos), ReturnCode_t::RETCODE_IMMUTABLE_POLICY); #if HAVE_SECURITY // Check that the security_log_thread can not be changed in an enabled participant participant->get_qos(pqos); - pqos.security_log_thread().cpu_mask = 1; + pqos.security_log_thread().affinity = 1; ASSERT_EQ(participant->set_qos(pqos), ReturnCode_t::RETCODE_IMMUTABLE_POLICY); ASSERT_EQ(DomainParticipantFactory::get_instance()->delete_participant(participant), ReturnCode_t::RETCODE_OK); diff --git a/test/unittest/logging/LogTests.cpp b/test/unittest/logging/LogTests.cpp index aee0743b44f..a6caae4cb87 100644 --- a/test/unittest/logging/LogTests.cpp +++ b/test/unittest/logging/LogTests.cpp @@ -664,6 +664,37 @@ TEST_F(LogTests, flush_n) loggind_thread.join(); } +/** + * The goal of this test is to be able to manually check that the thread settings are applied, using an external + * tool like `htop` in Linux. + * It sets some scheduling configuration for the logging thread, and performs one log every second for 10 seconds, + * giving enough time for the tool to show the scheduling configuration and name of said thread. + */ +TEST_F(LogTests, thread_config) +{ + // Set general verbosity + Log::SetVerbosity(Log::Info); + const unsigned int n_logs = 10; + + // Set thread settings + eprosima::fastdds::rtps::ThreadSettings thr_settings{}; +#if defined(_POSIX_SOURCE) + thr_settings.affinity = 3; + thr_settings.scheduling_policy = SCHED_OTHER; + thr_settings.priority = 1; +#endif // if defined(_POSIX_SOURCE) + Log::SetThreadConfig(thr_settings); + + for (unsigned int i = 0; i < n_logs; i++) + { + EPROSIMA_LOG_INFO(TEST_THREADS, "Info message " << i); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + auto entries = HELPER_WaitForEntries(n_logs); + EXPECT_EQ(entries.size(), n_logs); +} + int main( int argc, char** argv) diff --git a/test/unittest/rtps/attributes/ThreadSettingsTests.cpp b/test/unittest/rtps/attributes/ThreadSettingsTests.cpp index 25a540a2b40..8abce8ff719 100644 --- a/test/unittest/rtps/attributes/ThreadSettingsTests.cpp +++ b/test/unittest/rtps/attributes/ThreadSettingsTests.cpp @@ -27,49 +27,49 @@ TEST(ThreadSettingsTests, EqualOperators) // Fixed scheduling_policy cases settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); @@ -77,43 +77,43 @@ TEST(ThreadSettingsTests, EqualOperators) // Fixed priority cases (not already covered) settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); - // Fixed cpu_mask cases (not already covered) + // Fixed affinity cases (not already covered) settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); @@ -121,14 +121,14 @@ TEST(ThreadSettingsTests, EqualOperators) // Fixed stack_size cases (not already covered) settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); @@ -136,7 +136,7 @@ TEST(ThreadSettingsTests, EqualOperators) // All different settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); diff --git a/test/unittest/transport/PortBasedTransportDescriptorTests.cpp b/test/unittest/transport/PortBasedTransportDescriptorTests.cpp index 88c7b5e7021..28bca4a8bab 100644 --- a/test/unittest/transport/PortBasedTransportDescriptorTests.cpp +++ b/test/unittest/transport/PortBasedTransportDescriptorTests.cpp @@ -65,7 +65,7 @@ TEST_F(PortBasedTransportDescriptorTests, get_thread_config_for_port) PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings; set_settings[1234].scheduling_policy = 33; set_settings[1234].priority = 33; - set_settings[1234].cpu_mask = 33; + set_settings[1234].affinity = 33; set_settings[1234].stack_size = 33; ASSERT_TRUE(reception_threads(set_settings)); @@ -86,7 +86,7 @@ TEST_F(PortBasedTransportDescriptorTests, set_thread_config_for_port) PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings; set_settings[1234].scheduling_policy = 33; set_settings[1234].priority = 33; - set_settings[1234].cpu_mask = 33; + set_settings[1234].affinity = 33; set_settings[1234].stack_size = 33; ASSERT_TRUE(reception_threads(set_settings)); @@ -116,7 +116,7 @@ TEST_F(PortBasedTransportDescriptorTests, set_default_reception_threads) ThreadSettings set_settings; set_settings.scheduling_policy = 33; set_settings.priority = 33; - set_settings.cpu_mask = 33; + set_settings.affinity = 33; set_settings.stack_size = 33; ASSERT_NE(initial_settings, set_settings); @@ -137,7 +137,7 @@ TEST_F(PortBasedTransportDescriptorTests, set_reception_threads) PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings; set_settings[1234].scheduling_policy = 33; set_settings[1234].priority = 33; - set_settings[1234].cpu_mask = 33; + set_settings[1234].affinity = 33; set_settings[1234].stack_size = 33; ASSERT_NE(initial_settings, set_settings); @@ -170,7 +170,7 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator) ThreadSettings set_settings; set_settings.scheduling_policy = 33; set_settings.priority = 33; - set_settings.cpu_mask = 33; + set_settings.affinity = 33; set_settings.stack_size = 33; other.default_reception_threads(set_settings); ASSERT_FALSE(*this == other); @@ -185,7 +185,7 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator) PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings_map; set_settings_map[1234].scheduling_policy = 33; set_settings_map[1234].priority = 33; - set_settings_map[1234].cpu_mask = 33; + set_settings_map[1234].affinity = 33; set_settings_map[1234].stack_size = 33; ASSERT_TRUE(other.reception_threads(set_settings_map)); ASSERT_FALSE(*this == other); @@ -202,7 +202,7 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator) ThreadSettings set_settings; set_settings.scheduling_policy = 33; set_settings.priority = 33; - set_settings.cpu_mask = 33; + set_settings.affinity = 33; set_settings.stack_size = 33; other.default_reception_threads(set_settings); @@ -220,7 +220,7 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator) PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings_map; set_settings_map[1234].scheduling_policy = 33; set_settings_map[1234].priority = 33; - set_settings_map[1234].cpu_mask = 33; + set_settings_map[1234].affinity = 33; set_settings_map[1234].stack_size = 33; ASSERT_TRUE(other.reception_threads(set_settings_map)); @@ -236,14 +236,14 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator) ThreadSettings set_settings; set_settings.scheduling_policy = 33; set_settings.priority = 33; - set_settings.cpu_mask = 33; + set_settings.affinity = 33; set_settings.stack_size = 33; other.default_reception_threads(set_settings); PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings_map; set_settings_map[1234].scheduling_policy = 33; set_settings_map[1234].priority = 33; - set_settings_map[1234].cpu_mask = 33; + set_settings_map[1234].affinity = 33; set_settings_map[1234].stack_size = 33; ASSERT_TRUE(other.reception_threads(set_settings_map)); @@ -261,14 +261,14 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator) ThreadSettings set_settings; set_settings.scheduling_policy = 33; set_settings.priority = 33; - set_settings.cpu_mask = 33; + set_settings.affinity = 33; set_settings.stack_size = 33; other.default_reception_threads(set_settings); PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings_map; set_settings_map[1234].scheduling_policy = 33; set_settings_map[1234].priority = 33; - set_settings_map[1234].cpu_mask = 33; + set_settings_map[1234].affinity = 33; set_settings_map[1234].stack_size = 33; ASSERT_TRUE(other.reception_threads(set_settings_map)); diff --git a/test/unittest/utils/CMakeLists.txt b/test/unittest/utils/CMakeLists.txt index 893eaeb519c..97cdb5ff8bc 100644 --- a/test/unittest/utils/CMakeLists.txt +++ b/test/unittest/utils/CMakeLists.txt @@ -48,6 +48,10 @@ set(FIXEDSIZEQUEUETESTS_SOURCE set(SYSTEMINFOTESTS_SOURCE SystemInfoTests.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutErrConsumer.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp) include_directories(mock/) diff --git a/test/unittest/utils/SystemInfoTests.cpp b/test/unittest/utils/SystemInfoTests.cpp index 58af62ed4e4..512a5b97147 100644 --- a/test/unittest/utils/SystemInfoTests.cpp +++ b/test/unittest/utils/SystemInfoTests.cpp @@ -238,7 +238,7 @@ TEST_F(SystemInfoTests, FileWatchTest) eprosima::SystemInfo::wait_for_file_closure(filename, _1s); ++times_called_; cv_.notify_all(); - }); + }, {}, {}); // Read contents { diff --git a/thirdparty/filewatch/FileWatch.hpp b/thirdparty/filewatch/FileWatch.hpp index a714d284953..37c462762d9 100644 --- a/thirdparty/filewatch/FileWatch.hpp +++ b/thirdparty/filewatch/FileWatch.hpp @@ -23,6 +23,8 @@ #ifndef FILEWATCHER_H #define FILEWATCHER_H +#include + #include #ifdef _WIN32 @@ -67,6 +69,7 @@ #include #include +namespace eprosima { namespace filewatch { enum class Event { added, @@ -93,35 +96,33 @@ namespace filewatch { public: - FileWatch(T path, UnderpinningRegex pattern, std::function callback) : - _path(path), - _pattern(pattern), - _callback(callback), - _directory(get_directory(path)) + FileWatch( + T path, + UnderpinningRegex pattern, + std::function callback, + const fastdds::rtps::ThreadSettings& watch_thread_config, + const fastdds::rtps::ThreadSettings& callback_thread_config) + : _path(path) + , _pattern(pattern) + , _callback(callback) + , _directory(get_directory(path)) { - init(); + init(watch_thread_config, callback_thread_config); } - FileWatch(T path, std::function callback) : - FileWatch(path, UnderpinningRegex(_regex_all), callback) {} + FileWatch( + T path, + std::function callback, + const fastdds::rtps::ThreadSettings& watch_thread_config, + const fastdds::rtps::ThreadSettings& callback_thread_config) + : FileWatch(path, UnderpinningRegex(_regex_all), callback, watch_thread_config, callback_thread_config) {} ~FileWatch() { destroy(); } - FileWatch(const FileWatch& other) : FileWatch(other._path, other._callback) {} - - FileWatch& operator=(const FileWatch& other) - { - if (this == &other) { return *this; } - - destroy(); - _path = other._path; - _callback = other._callback; - _directory = get_directory(other._path); - init(); - return *this; - } + FileWatch(const FileWatch& other) = delete; + FileWatch& operator=(const FileWatch& other) = delete; // Const memeber varibles don't let me implent moves nicely, if moves are really wanted std::unique_ptr should be used and move that. FileWatch(FileWatch&&) = delete; @@ -201,7 +202,9 @@ namespace filewatch { const static std::size_t event_size = (sizeof(struct inotify_event)); #endif // __unix__ - void init() + void init( + const fastdds::rtps::ThreadSettings& watch_thread_config = {}, + const fastdds::rtps::ThreadSettings& callback_thread_config = {}) { #ifdef _WIN32 _close_event = CreateEvent(NULL, TRUE, FALSE, NULL); @@ -209,9 +212,8 @@ namespace filewatch { throw std::system_error(GetLastError(), std::system_category()); } #endif // WIN32 - _callback_thread = std::move(std::thread([this]() { + _callback_thread = create_thread([this]() { try { - eprosima::set_name_to_current_thread("dds.fwatch.cb"); callback_thread(); } catch (...) { try { @@ -219,10 +221,9 @@ namespace filewatch { } catch (...) {} // set_exception() may throw too } - })); - _watch_thread = std::move(std::thread([this]() { + }, callback_thread_config, "dds.fwatch.cb"); + _watch_thread = create_thread([this]() { try { - eprosima::set_name_to_current_thread("dds.fwatch"); monitor_directory(); } catch (...) { try { @@ -230,7 +231,7 @@ namespace filewatch { } catch (...) {} // set_exception() may throw too } - })); + }, watch_thread_config, "dds.fwatch"); std::future future = _running.get_future(); future.get(); //block until the monitor_directory is up and running @@ -624,5 +625,6 @@ namespace filewatch { template constexpr typename FileWatch::C FileWatch::_regex_all[]; template constexpr typename FileWatch::C FileWatch::_this_directory[]; -} +} // namespace filewatch +} // namespace eprosima #endif