From c1499f692d4771dce21191754e674edbe8b5e096 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 13 Sep 2023 11:05:45 +0200 Subject: [PATCH 01/38] Refs #19436. Added thread creation wrapper infrastructure. Signed-off-by: Miguel Company --- src/cpp/utils/threading.hpp | 69 +++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/src/cpp/utils/threading.hpp b/src/cpp/utils/threading.hpp index dc28df06dc7..7d7b9d43166 100644 --- a/src/cpp/utils/threading.hpp +++ b/src/cpp/utils/threading.hpp @@ -15,8 +15,17 @@ #ifndef UTILS__THREADING_HPP_ #define UTILS__THREADING_HPP_ +#include + namespace eprosima { +// Forward declare dependencies +namespace fastdds { +namespace rtps { +struct ThreadSettings; +} // namespace rtps +} // namespace fastdds + /** * @brief Give a name to the thread calling this function. * @@ -55,6 +64,66 @@ void set_name_to_current_thread( uint32_t arg1, uint32_t arg2); + +/** + * @brief Apply thread settings to the thread calling this function. + * + * @param[in] settings Thread settings to apply. + */ +void apply_thread_settings_to_current_thread( + const fastdds::rtps::ThreadSettings& settings); + +/** + * @brief Create and start a thread with custom settings and name. + * + * This wrapper will create a thread on which the incoming functor will be called after + * applying giving it a custom name and applying the thread settings. + * + * @param[in] func Functor with the logic to be run on the created thread. + * @param[in] settings Thread settings to apply to the created thread. + * @param[in] name Name (format) for the created thread. + * @param[in] args Additional arguments to complete the thread name. + * See @ref set_name_to_current_thread for details. + */ +template +std::thread create_thread( + const Functor& func, + const fastdds::rtps::ThreadSettings& settings, + const char* name, + Args... args) +{ + return std::thread([&]() + { + apply_thread_settings_to_current_thread(settings); + set_name_to_current_thread(name, args ...); + func(); + }); +} + +/** + * @brief Create and start a thread with custom name. + * + * This wrapper will create a thread on which the incoming functor will be called after + * applying giving it a custom name. + * + * @param[in] func Functor with the logic to be run on the created thread. + * @param[in] name Name (format) for the created thread. + * @param[in] args Additional arguments to complete the thread name. + * See @ref set_name_to_current_thread for details. + */ +template +std::thread create_thread( + const Functor& func, + const char* name, + Args... args) +{ + return std::thread([&]() + { + set_name_to_current_thread(name, args ...); + func(); + }); +} + } // eprosima #endif // UTILS__THREADING_HPP_ From 9c515c09297666cb20c476d69bf150e1d39aa212 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 13 Sep 2023 11:09:20 +0200 Subject: [PATCH 02/38] Refs #19436. Added empty implementation for apply_thread_settings_to_current_thread. Signed-off-by: Miguel Company --- src/cpp/utils/threading/threading_empty.ipp | 5 +++++ src/cpp/utils/threading/threading_osx.ipp | 5 +++++ src/cpp/utils/threading/threading_pthread.ipp | 5 +++++ src/cpp/utils/threading/threading_win32.ipp | 5 +++++ 4 files changed, 20 insertions(+) diff --git a/src/cpp/utils/threading/threading_empty.ipp b/src/cpp/utils/threading/threading_empty.ipp index 30d97890bca..684664fc29a 100644 --- a/src/cpp/utils/threading/threading_empty.ipp +++ b/src/cpp/utils/threading/threading_empty.ipp @@ -32,4 +32,9 @@ void set_name_to_current_thread( { } +void apply_thread_settings_to_current_thread( + const fastdds::rtps::ThreadSettings& /*settings*/) +{ +} + } // namespace eprosima diff --git a/src/cpp/utils/threading/threading_osx.ipp b/src/cpp/utils/threading/threading_osx.ipp index efb215bd99b..e89ab5aeddc 100644 --- a/src/cpp/utils/threading/threading_osx.ipp +++ b/src/cpp/utils/threading/threading_osx.ipp @@ -48,4 +48,9 @@ void set_name_to_current_thread( set_name_to_current_thread_impl(fmt, arg1, arg2); } +void apply_thread_settings_to_current_thread( + const fastdds::rtps::ThreadSettings& /*settings*/) +{ +} + } // namespace eprosima diff --git a/src/cpp/utils/threading/threading_pthread.ipp b/src/cpp/utils/threading/threading_pthread.ipp index 1090d9939c0..c5bee670c04 100644 --- a/src/cpp/utils/threading/threading_pthread.ipp +++ b/src/cpp/utils/threading/threading_pthread.ipp @@ -49,4 +49,9 @@ void set_name_to_current_thread( set_name_to_current_thread_impl(fmt, arg1, arg2); } +void apply_thread_settings_to_current_thread( + const fastdds::rtps::ThreadSettings& /*settings*/) +{ +} + } // namespace eprosima diff --git a/src/cpp/utils/threading/threading_win32.ipp b/src/cpp/utils/threading/threading_win32.ipp index 1a9f683bd0f..22400f5a437 100644 --- a/src/cpp/utils/threading/threading_win32.ipp +++ b/src/cpp/utils/threading/threading_win32.ipp @@ -53,4 +53,9 @@ void set_name_to_current_thread( set_name_to_current_thread_impl(fmt, arg1, arg2); } +void apply_thread_settings_to_current_thread( + const fastdds::rtps::ThreadSettings& /*settings*/) +{ +} + } // namespace eprosima From 8cf6176924418a3aefa516946cf3200a408ce0fa Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 13 Sep 2023 11:31:45 +0200 Subject: [PATCH 03/38] Refs #19436. Refactor on Log.cpp Signed-off-by: Miguel Company --- src/cpp/fastdds/log/Log.cpp | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/cpp/fastdds/log/Log.cpp b/src/cpp/fastdds/log/Log.cpp index ff33c5f5b96..0652d1145c0 100644 --- a/src/cpp/fastdds/log/Log.cpp +++ b/src/cpp/fastdds/log/Log.cpp @@ -134,14 +134,17 @@ struct LogResources void SetThreadConfig( const rtps::ThreadSettings& config) { - static_cast(config); - return; + std::lock_guard guard(cv_mutex_); + thread_settings_ = config; } //! Returns the logging_ engine to configuration defaults. void Reset() { - std::unique_lock configGuard(config_mutex_); + rtps::ThreadSettings thr_config{}; + SetThreadConfig(thr_config); + + std::lock_guard configGuard(config_mutex_); category_filter_.reset(); filename_filter_.reset(); error_string_filter_.reset(); @@ -162,7 +165,7 @@ struct LogResources { std::unique_lock guard(cv_mutex_); - if (!logging_ && !logging_thread_) + if (!logging_ && !logging_thread_.joinable()) { // already killed return; @@ -237,19 +240,13 @@ struct LogResources work_ = false; } - if (logging_thread_) + if (logging_thread_.joinable()) { cv_.notify_all(); - // The #ifdef workaround here is due to an unsolved MSVC bug, which Microsoft has announced - // they have no intention of solving: https://connect.microsoft.com/VisualStudio/feedback/details/747145 - // Each VS version deals with post-main deallocation of threads in a very different way. -#if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800 - if (logging_thread_->joinable() && logging_thread_->get_id() != std::this_thread::get_id()) + if (logging_thread_.get_id() != std::this_thread::get_id()) { - logging_thread_->join(); + logging_thread_.join(); } -#endif // if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800 - logging_thread_.reset(); } } @@ -258,17 +255,19 @@ struct LogResources void StartThread() { std::unique_lock guard(cv_mutex_); - if (!logging_ && !logging_thread_) + if (!logging_ && !logging_thread_.joinable()) { logging_ = true; - logging_thread_.reset(new std::thread(&LogResources::run, this)); + auto thread_fn = [this]() + { + run(); + }; + logging_thread_ = eprosima::create_thread(thread_fn, thread_settings_, "dds.log"); } } void run() { - set_name_to_current_thread("dds.log"); - std::unique_lock guard(cv_mutex_); while (logging_) @@ -343,7 +342,7 @@ struct LogResources fastrtps::DBQueue logs_; std::vector> consumers_; - std::unique_ptr logging_thread_; + std::thread logging_thread_; // Condition variable segment. std::condition_variable cv_; @@ -361,7 +360,7 @@ struct LogResources std::unique_ptr error_string_filter_; std::atomic verbosity_; - + rtps::ThreadSettings thread_settings_; }; std::shared_ptr get_log_resources() From fc45783c72ad8a1914b96515ab1fb7671531a253 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 19 Sep 2023 15:20:28 +0200 Subject: [PATCH 04/38] Refs #19436. Add implementation for setting scheduler and priority. Signed-off-by: Miguel Company --- src/cpp/utils/threading/threading_pthread.ipp | 62 ++++++++++++++++++- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/src/cpp/utils/threading/threading_pthread.ipp b/src/cpp/utils/threading/threading_pthread.ipp index c5bee670c04..81edc3eeae8 100644 --- a/src/cpp/utils/threading/threading_pthread.ipp +++ b/src/cpp/utils/threading/threading_pthread.ipp @@ -13,8 +13,14 @@ // limitations under the License. #include -#include #include +#include +#include +#include +#include +#include + +#include namespace eprosima { @@ -49,9 +55,61 @@ void set_name_to_current_thread( set_name_to_current_thread_impl(fmt, arg1, arg2); } +static void configure_current_thread_scheduler( + int sched_class, + int sched_priority) +{ + pthread_t self_tid = pthread_self(); + sched_param param; + int result = 0; + + memset(¶m, 0, sizeof(param)); + param.sched_priority = 0; + + // + // Set Scheduler Class and Priority + // + + if((sched_class == SCHED_OTHER) || + (sched_class == SCHED_BATCH) || + (sched_class == SCHED_IDLE)) + { + // + // BATCH and IDLE do not have explicit priority values. + // - Requires priorty value to be zero (0). + + result = pthread_setschedparam(self_tid, sched_class, ¶m); + + // + // Sched OTHER has a nice value, that we pull from the priority parameter. + // + + if(sched_class == SCHED_OTHER) + { + result = setpriority(PRIO_PROCESS, gettid(), sched_priority); + } + } + else if((sched_class == SCHED_FIFO) || + (sched_class == SCHED_RR)) + { + // + // RT Policies use a different priority numberspace. + // + + param.sched_priority = sched_priority; + result = pthread_setschedparam(self_tid, sched_class, ¶m); + } + + if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << strerror(result) << "' configuring scheduler for thread " << self_tid); + } +} + void apply_thread_settings_to_current_thread( - const fastdds::rtps::ThreadSettings& /*settings*/) + const fastdds::rtps::ThreadSettings& settings) { + configure_current_thread_scheduler(settings.scheduling_policy, settings.priority); } } // namespace eprosima From 12f55726daedf16a8fbd8ba05b361209806d0dc2 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 19 Sep 2023 15:21:14 +0200 Subject: [PATCH 05/38] Refs #19436. Add implementation for setting cpu affinity. Signed-off-by: Miguel Company --- src/cpp/utils/threading/threading_pthread.ipp | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/cpp/utils/threading/threading_pthread.ipp b/src/cpp/utils/threading/threading_pthread.ipp index 81edc3eeae8..018888d6169 100644 --- a/src/cpp/utils/threading/threading_pthread.ipp +++ b/src/cpp/utils/threading/threading_pthread.ipp @@ -106,10 +106,56 @@ static void configure_current_thread_scheduler( } } +static void configure_current_thread_affinity( + uint32_t affinity_mask) +{ + int a; + int result; + int cpu_count; + cpu_set_t cpu_set; + pthread_t self_tid = pthread_self(); + + result = 0; + + // + // Rebuilt the cpu set from scratch... + // + + CPU_ZERO(&cpu_set); + + // + // If the bit is set in our mask, set it into the cpu_set + // We only consider up to the total number of CPU's the + // system has. + // + + cpu_count = get_nprocs_conf(); + + for(a = 0; a < cpu_count; a++) + { + if(affinity_mask & (1 << a)) + { + CPU_SET(a, &cpu_set); + result++; + } + } + + if(result > 0) + { + result = pthread_setaffinity_np(self_tid, sizeof(cpu_set_t), &cpu_set); + } + + if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << strerror(result) << "' configuring affinity for thread " << self_tid); + } +} + void apply_thread_settings_to_current_thread( const fastdds::rtps::ThreadSettings& settings) { configure_current_thread_scheduler(settings.scheduling_policy, settings.priority); + configure_current_thread_affinity(settings.cpu_mask); } } // namespace eprosima From 9327aaf143bd4415c9e577b39a60288eb31518dd Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 19 Sep 2023 16:18:39 +0200 Subject: [PATCH 06/38] Refs #19436. Add test setting config for Log thread. Signed-off-by: Miguel Company --- test/unittest/logging/LogTests.cpp | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/test/unittest/logging/LogTests.cpp b/test/unittest/logging/LogTests.cpp index aee0743b44f..6eb6bfa5c8a 100644 --- a/test/unittest/logging/LogTests.cpp +++ b/test/unittest/logging/LogTests.cpp @@ -664,6 +664,31 @@ TEST_F(LogTests, flush_n) loggind_thread.join(); } +TEST_F(LogTests, thread_config) +{ + // Set general verbosity + Log::SetVerbosity(Log::Info); + unsigned int n_logs = 1000; + + // Set thread settings + eprosima::fastdds::rtps::ThreadSettings thr_settings{}; +#if defined(_POSIX_SOURCE) + thr_settings.cpu_mask = 3; + thr_settings.scheduling_policy = SCHED_OTHER; + thr_settings.priority = 1; +#endif // if defined(_POSIX_SOURCE) + Log::SetThreadConfig(thr_settings); + + for (unsigned int i = 0; i < n_logs; i++) + { + EPROSIMA_LOG_INFO(TEST_THREADS, "Info message " << i); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + } + + auto entries = HELPER_WaitForEntries(n_logs); + EXPECT_EQ(entries.size(), n_logs); +} + int main( int argc, char** argv) From 14ac33e013c616b460df0735979325996ceafaf2 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 20 Sep 2023 09:19:49 +0200 Subject: [PATCH 07/38] Refs #19436. Fix SystemInfoTests link issue. Signed-off-by: Miguel Company --- test/unittest/utils/CMakeLists.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/unittest/utils/CMakeLists.txt b/test/unittest/utils/CMakeLists.txt index 893eaeb519c..97cdb5ff8bc 100644 --- a/test/unittest/utils/CMakeLists.txt +++ b/test/unittest/utils/CMakeLists.txt @@ -48,6 +48,10 @@ set(FIXEDSIZEQUEUETESTS_SOURCE set(SYSTEMINFOTESTS_SOURCE SystemInfoTests.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutConsumer.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutErrConsumer.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp) include_directories(mock/) From b41d405a169dc34367471e84518b7af5783cc7d3 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 20 Sep 2023 10:35:56 +0200 Subject: [PATCH 08/38] Refs #19436. Changes on ResourceEvent. Signed-off-by: Miguel Company --- .../fastdds/rtps/resources/ResourceEvent.h | 18 ++++++++++++------ .../discovery/participant/PDPServer.cpp | 9 ++++----- .../rtps/participant/RTPSParticipantImpl.cpp | 6 ++---- src/cpp/rtps/resources/ResourceEvent.cpp | 19 +++++++++---------- 4 files changed, 27 insertions(+), 25 deletions(-) diff --git a/include/fastdds/rtps/resources/ResourceEvent.h b/include/fastdds/rtps/resources/ResourceEvent.h index 3709f3ff120..5cb68311720 100644 --- a/include/fastdds/rtps/resources/ResourceEvent.h +++ b/include/fastdds/rtps/resources/ResourceEvent.h @@ -22,14 +22,15 @@ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC -#include -#include - #include #include #include #include +#include +#include +#include + namespace eprosima { namespace fastrtps { namespace rtps { @@ -51,11 +52,16 @@ class ResourceEvent /*! * @brief Method to initialize the internal thread. * - * @param[in] configure_cb Function to be called in the context of the started thread - * before calling the internal service routine. + * @param[in] thread_cfg Settings to apply to the created thread. + * @param[in] name_fmt A null-terminated string to be used as the format argument of + * a `snprintf` like function, taking `thread_id` as additional + * argument, and used to give a name to the created thread. + * @param[in] thread_id Single variadic argument passed to the formatting function. */ void init_thread( - std::function configure_cb = {}); + const fastdds::rtps::ThreadSettings& thread_cfg = {}, + const char* name_fmt = "event %u", + uint32_t thread_id = 0); void stop_thread(); diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index 6e524dd8e7b..cb39e120049 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -162,11 +162,10 @@ bool PDPServer::init( getRTPSParticipant()->enableReader(edp->publications_reader_.first); // Initialize server dedicated thread. - uint32_t id_for_thread = static_cast(getRTPSParticipant()->getRTPSParticipantAttributes().participantID); - resource_event_thread_.init_thread([id_for_thread]() - { - set_name_to_current_thread("dds.ds_ev.%u", id_for_thread); - }); + const RTPSParticipantAttributes& part_attr = getRTPSParticipant()->getRTPSParticipantAttributes(); + uint32_t id_for_thread = static_cast(part_attr.participantID); + const fastdds::rtps::ThreadSettings& thr_config = part_attr.discovery_server_thread; + resource_event_thread_.init_thread(thr_config, "dds.ds_ev.%u", id_for_thread); /* Given the fact that a participant is either a client or a server the diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index dcc0b4f5d88..038edcc18b3 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -245,10 +245,8 @@ RTPSParticipantImpl::RTPSParticipantImpl( mp_userParticipant->mp_impl = this; uint32_t id_for_thread = static_cast(m_att.participantID); - mp_event_thr.init_thread([id_for_thread]() - { - set_name_to_current_thread("dds.ev.%u", id_for_thread); - }); + const fastdds::rtps::ThreadSettings& thr_config = m_att.timed_events_thread; + mp_event_thr.init_thread(thr_config, "dds.ev.%u", id_for_thread); if (!networkFactoryHasRegisteredTransports()) { diff --git a/src/cpp/rtps/resources/ResourceEvent.cpp b/src/cpp/rtps/resources/ResourceEvent.cpp index d5f58eb0a81..66acf31f4c1 100644 --- a/src/cpp/rtps/resources/ResourceEvent.cpp +++ b/src/cpp/rtps/resources/ResourceEvent.cpp @@ -16,13 +16,14 @@ * @file ResourceEvent.cpp */ +#include +#include + #include #include #include "TimedEventImpl.h" - -#include -#include +#include namespace eprosima { namespace fastrtps { @@ -304,7 +305,9 @@ void ResourceEvent::do_timer_actions() } void ResourceEvent::init_thread( - std::function configure_cb) + const fastdds::rtps::ThreadSettings& thread_cfg, + const char* name_fmt, + uint32_t thread_id) { std::lock_guard lock(mutex_); @@ -312,14 +315,10 @@ void ResourceEvent::init_thread( stop_.store(false); resize_collections(); - thread_ = std::thread([this, configure_cb]() + thread_ = eprosima::create_thread([this]() { - if (configure_cb) - { - configure_cb(); - } event_service(); - }); + }, thread_cfg, name_fmt, thread_id); } } /* namespace rtps */ From 0c2448c87a768d237d43cbaa8483fbe3290c6dc6 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 20 Sep 2023 10:51:01 +0200 Subject: [PATCH 09/38] Refs #19436. Changes on DataSharingListener. Signed-off-by: Miguel Company --- .../rtps/DataSharing/DataSharingListener.cpp | 18 ++++++++---------- .../rtps/DataSharing/DataSharingListener.hpp | 16 ++++++++++------ src/cpp/rtps/reader/RTPSReader.cpp | 1 + 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.cpp b/src/cpp/rtps/DataSharing/DataSharingListener.cpp index a358767d317..e4aed898a11 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.cpp @@ -31,6 +31,7 @@ namespace rtps { DataSharingListener::DataSharingListener( std::shared_ptr notification, const std::string& datasharing_pools_directory, + const fastdds::rtps::ThreadSettings& thr_config, ResourceLimitedContainerConfig limits, RTPSReader* reader) : notification_(notification) @@ -39,6 +40,7 @@ DataSharingListener::DataSharingListener( , writer_pools_(limits) , writer_pools_changed_(false) , datasharing_pools_directory_(datasharing_pools_directory) + , thread_config_(thr_config) { } @@ -50,8 +52,6 @@ DataSharingListener::~DataSharingListener() void DataSharingListener::run() { - set_name_to_current_thread("dds.dsha.%u", reader_->getGuid().entityId.to_uint32() & 0x0000FFFF); - std::unique_lock lock(notification_->notification_->notification_mutex, std::defer_lock); while (is_running_.load()) { @@ -100,13 +100,15 @@ void DataSharingListener::start() } // Initialize the thread - listening_thread_ = new std::thread(&DataSharingListener::run, this); + uint32_t thread_id = reader_->getGuid().entityId.to_uint32() & 0x0000FFFF; + listening_thread_ = create_thread([this]() + { + run(); + }, thread_config_, "dds.dsha.%u", thread_id); } void DataSharingListener::stop() { - std::thread* thr = nullptr; - { std::lock_guard guard(mutex_); @@ -116,15 +118,11 @@ void DataSharingListener::stop() { return; } - - thr = listening_thread_; - listening_thread_ = nullptr; } // Notify the thread and wait for it to finish notification_->notify(); - thr->join(); - delete thr; + listening_thread_.join(); } void DataSharingListener::process_new_data () diff --git a/src/cpp/rtps/DataSharing/DataSharingListener.hpp b/src/cpp/rtps/DataSharing/DataSharingListener.hpp index 41d27b4029c..3afc0bce6f5 100644 --- a/src/cpp/rtps/DataSharing/DataSharingListener.hpp +++ b/src/cpp/rtps/DataSharing/DataSharingListener.hpp @@ -19,15 +19,17 @@ #ifndef RTPS_DATASHARING_DATASHARINGLISTENER_HPP #define RTPS_DATASHARING_DATASHARINGLISTENER_HPP +#include +#include +#include + #include +#include +#include + #include #include #include -#include - -#include -#include -#include namespace eprosima { namespace fastrtps { @@ -46,6 +48,7 @@ class DataSharingListener : public IDataSharingListener DataSharingListener( std::shared_ptr notification, const std::string& datasharing_pools_directory, + const fastdds::rtps::ThreadSettings& thr_config, ResourceLimitedContainerConfig limits, RTPSReader* reader); @@ -111,10 +114,11 @@ class DataSharingListener : public IDataSharingListener std::shared_ptr notification_; std::atomic is_running_; RTPSReader* reader_; - std::thread* listening_thread_; + std::thread listening_thread_; ResourceLimitedVector writer_pools_; std::atomic writer_pools_changed_; std::string datasharing_pools_directory_; + fastdds::rtps::ThreadSettings thread_config_; mutable std::mutex mutex_; }; diff --git a/src/cpp/rtps/reader/RTPSReader.cpp b/src/cpp/rtps/reader/RTPSReader.cpp index d337520d456..297feee6ef9 100644 --- a/src/cpp/rtps/reader/RTPSReader.cpp +++ b/src/cpp/rtps/reader/RTPSReader.cpp @@ -130,6 +130,7 @@ void RTPSReader::init( datasharing_listener_.reset(new DataSharingListener( notification, att.endpoint.data_sharing_configuration().shm_directory(), + att.data_sharing_listener_thread, att.matched_writers_allocation, this)); From 80484469678084e1c6c45f4415b5cab816f04d7e Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 20 Sep 2023 10:57:57 +0200 Subject: [PATCH 10/38] Refs #19436. Changes on FlowControllerImpl. Signed-off-by: Miguel Company --- src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp index 0ae267ff6d3..a31a68b8534 100644 --- a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp +++ b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp @@ -1062,7 +1062,10 @@ class FlowControllerImpl : public FlowController if (async_mode.running.compare_exchange_strong(expected, true)) { // Code for initializing the asynchronous thread. - async_mode.thread = std::thread(&FlowControllerImpl::run, this); + async_mode.thread = create_thread([this]() + { + run(); + }, thread_settings_, "dds.asyn.%u.%u", participant_id_, async_index_); } } @@ -1341,8 +1344,6 @@ class FlowControllerImpl : public FlowController */ void run() { - set_name_to_current_thread("dds.asyn.%u.%u", participant_id_, async_index_); - while (async_mode.running) { // There are writers interested in removing a sample. From b8e68a91f2f1c1dfcddf65d27946b6ac2ce6d366 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 20 Sep 2023 14:27:37 +0200 Subject: [PATCH 11/38] Refs #19436. Changes on security LogTopic. Signed-off-by: Miguel Company --- .../rtps/participant/RTPSParticipantImpl.cpp | 7 ++----- src/cpp/security/logging/LogTopic.cpp | 20 +++++++++---------- src/cpp/security/logging/LogTopic.h | 4 +++- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 038edcc18b3..cf3cf50e4cd 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -2199,11 +2199,8 @@ bool RTPSParticipantImpl::is_security_enabled_for_reader( security::Logging* RTPSParticipantImpl::create_builtin_logging_plugin() { - return new security::LogTopic([this]() - { - uint32_t participant_id = static_cast(m_att.participantID); - set_name_to_current_thread("dds.slog.%u", participant_id); - }); + uint32_t participant_id = static_cast(m_att.participantID); + return new security::LogTopic(participant_id, m_att.security_log_thread); } #endif // if HAVE_SECURITY diff --git a/src/cpp/security/logging/LogTopic.cpp b/src/cpp/security/logging/LogTopic.cpp index b461cc9fc7e..747710d915c 100644 --- a/src/cpp/security/logging/LogTopic.cpp +++ b/src/cpp/security/logging/LogTopic.cpp @@ -1,25 +1,22 @@ #include -#include -#include - #include +#include + namespace eprosima { namespace fastrtps { namespace rtps { namespace security { LogTopic::LogTopic( - std::function thread_init_cb) + uint32_t thread_id, + const fastdds::rtps::ThreadSettings& thr_config) : stop_(false) - , thread_([this, thread_init_cb]() + , thread_( + create_thread( + [this]() { - if (thread_init_cb) - { - thread_init_cb(); - } - while (true) { // Put the thread asleep until there is @@ -37,7 +34,8 @@ LogTopic::LogTopic( publish(*p); } - }) + }, + thr_config, "dds.slog.%u", thread_id)) { // } diff --git a/src/cpp/security/logging/LogTopic.h b/src/cpp/security/logging/LogTopic.h index 47ebcd9be7e..6c19f958708 100644 --- a/src/cpp/security/logging/LogTopic.h +++ b/src/cpp/security/logging/LogTopic.h @@ -18,6 +18,7 @@ #ifndef _FASTDDS_RTPS_SECURITY_LOGGING_LOGTOPIC_H_ #define _FASTDDS_RTPS_SECURITY_LOGGING_LOGTOPIC_H_ +#include #include #include @@ -43,7 +44,8 @@ class LogTopic final : public Logging public: LogTopic( - std::function thread_init_cb = {}); + uint32_t thread_id = 0, + const fastdds::rtps::ThreadSettings& thr_config = {}); ~LogTopic(); private: From 8071ff5af7e8c52f1096cba882cd4f100c591f47 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 20 Sep 2023 14:55:56 +0200 Subject: [PATCH 12/38] Refs #19436. Apply settings on SharedMemWatchdog. Signed-off-by: Miguel Company --- .../domain/DomainParticipantFactory.cpp | 3 +++ .../utils/shared_memory/SharedMemWatchdog.hpp | 22 ++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/cpp/fastdds/domain/DomainParticipantFactory.cpp b/src/cpp/fastdds/domain/DomainParticipantFactory.cpp index 12049b35d0f..7e837332dd9 100644 --- a/src/cpp/fastdds/domain/DomainParticipantFactory.cpp +++ b/src/cpp/fastdds/domain/DomainParticipantFactory.cpp @@ -35,6 +35,7 @@ #include #include #include +#include using namespace eprosima::fastrtps::xmlparser; @@ -422,6 +423,8 @@ void DomainParticipantFactory::set_qos( (void) first_time; //As all the Qos can always be updated and none of them need to be sent to = from; + + rtps::SharedMemWatchdog::set_thread_settings(to.shm_watchdog_thread()); } ReturnCode_t DomainParticipantFactory::check_qos( diff --git a/src/cpp/utils/shared_memory/SharedMemWatchdog.hpp b/src/cpp/utils/shared_memory/SharedMemWatchdog.hpp index e77c3131fad..cf9ac1edd28 100644 --- a/src/cpp/utils/shared_memory/SharedMemWatchdog.hpp +++ b/src/cpp/utils/shared_memory/SharedMemWatchdog.hpp @@ -22,6 +22,8 @@ #include #include +#include + #include namespace eprosima { @@ -77,6 +79,12 @@ class SharedMemWatchdog } } + static void set_thread_settings( + const ThreadSettings& thr_config) + { + thread_settings() = thr_config; + } + static constexpr std::chrono::milliseconds period() { return std::chrono::milliseconds(1000); @@ -101,11 +109,21 @@ class SharedMemWatchdog std::atomic_bool exit_thread_; + static ThreadSettings& thread_settings() + { + static ThreadSettings s_settings(ThreadSettings{}); + return s_settings; + } + SharedMemWatchdog() : wake_run_(false) , exit_thread_(false) { - thread_run_ = std::thread(&SharedMemWatchdog::run, this); + auto fn = [this]() + { + run(); + }; + thread_run_ = create_thread(fn, thread_settings(), "dds.shm.wdog"); } /** @@ -123,8 +141,6 @@ class SharedMemWatchdog void run() { - set_name_to_current_thread("dds.shm.wdog"); - while (!exit_thread_) { { From a456704ffd5ae3aa477e20a8ff29b1e5558ea7fd Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 21 Sep 2023 09:20:13 +0200 Subject: [PATCH 13/38] Refs #19436. Apply settings on SharedMem reception threads. Signed-off-by: Miguel Company --- .../shared_mem/SharedMemChannelResource.hpp | 17 +++++++++++------ .../transport/shared_mem/SharedMemTransport.cpp | 4 +++- .../test_SharedMemChannelResource.hpp | 4 ++-- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp index 139e5284a95..c7145d434a9 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp @@ -15,6 +15,7 @@ #ifndef _FASTDDS_SHAREDMEM_CHANNEL_RESOURCE_ #define _FASTDDS_SHAREDMEM_CHANNEL_RESOURCE_ +#include #include #include @@ -39,7 +40,8 @@ class SharedMemChannelResource : public ChannelResource const Locator& locator, TransportReceiverInterface* receiver, const std::string& dump_file, - bool should_init_thread = true) + bool should_init_thread, + const ThreadSettings& thr_config) : ChannelResource() , message_receiver_(receiver) , listener_(listener) @@ -57,7 +59,7 @@ class SharedMemChannelResource : public ChannelResource if (should_init_thread) { - init_thread(locator); + init_thread(locator, thr_config); } } @@ -125,8 +127,6 @@ class SharedMemChannelResource : public ChannelResource void perform_listen_operation( Locator input_locator) { - set_name_to_current_thread("dds.shm.%u", input_locator.port); - Locator remote_locator; while (alive()) @@ -168,9 +168,14 @@ class SharedMemChannelResource : public ChannelResource protected: void init_thread( - const Locator& locator) + const Locator& locator, + const ThreadSettings& thr_config) { - this->thread(std::thread(&SharedMemChannelResource::perform_listen_operation, this, locator)); + auto fn = [this, locator]() + { + perform_listen_operation(locator); + }; + this->thread(create_thread(fn, thr_config, "dds.shm.%u", locator.port)); } /** diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp index 00095d1f9a7..dc03804325a 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp @@ -341,7 +341,9 @@ SharedMemChannelResource* SharedMemTransport::CreateInputChannelResource( open_mode)->create_listener(), locator, receiver, - configuration_.rtps_dump_file()); + configuration_.rtps_dump_file(), + true, + configuration_.get_thread_config_for_port(locator.port)); } bool SharedMemTransport::OpenOutputChannel( diff --git a/src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp b/src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp index a950f74f7ec..bdf6981328d 100644 --- a/src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp +++ b/src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp @@ -33,11 +33,11 @@ class test_SharedMemChannelResource : public SharedMemChannelResource TransportReceiverInterface* receiver, uint32_t big_buffer_size, uint32_t* big_buffer_size_count) - : SharedMemChannelResource(listener, locator, receiver, std::string(), false) + : SharedMemChannelResource(listener, locator, receiver, std::string(), false, ThreadSettings{}) , big_buffer_size_(big_buffer_size) , big_buffer_size_count_(big_buffer_size_count) { - init_thread(locator); + init_thread(locator, ThreadSettings{}); } virtual ~test_SharedMemChannelResource() override From 974b09e0be0f2bd48b288abfec09da46ca2dce6a Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 21 Sep 2023 09:35:00 +0200 Subject: [PATCH 14/38] Refs #19436. Apply settings on SharedMem packet dump threads. Signed-off-by: Miguel Company --- .../shared_mem/SharedMemChannelResource.hpp | 3 +- .../transport/shared_mem/SharedMemLog.hpp | 34 +++++++++---------- .../shared_mem/SharedMemTransport.cpp | 3 +- .../test_SharedMemChannelResource.hpp | 5 ++- 4 files changed, 25 insertions(+), 20 deletions(-) diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp index c7145d434a9..321f1870d7a 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp @@ -40,6 +40,7 @@ class SharedMemChannelResource : public ChannelResource const Locator& locator, TransportReceiverInterface* receiver, const std::string& dump_file, + const ThreadSettings& dump_thr_config, bool should_init_thread, const ThreadSettings& thr_config) : ChannelResource() @@ -53,7 +54,7 @@ class SharedMemChannelResource : public ChannelResource auto packets_file_consumer = std::unique_ptr( new SHMPacketFileConsumer(dump_file)); - packet_logger_ = std::make_shared>(locator.port); + packet_logger_ = std::make_shared>(locator.port, dump_thr_config); packet_logger_->RegisterConsumer(std::move(packets_file_consumer)); } diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemLog.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemLog.hpp index 893fef62e0e..80f3ebf0bea 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemLog.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemLog.hpp @@ -15,10 +15,13 @@ #ifndef _FASTDDS_SHAREDMEM_LOG_H_ #define _FASTDDS_SHAREDMEM_LOG_H_ +#include #include #include + #include #include +#include namespace eprosima { namespace fastdds { @@ -200,8 +203,10 @@ class PacketsLog public: PacketsLog( - uint32_t thread_id) + uint32_t thread_id, + const ThreadSettings& thread_config) : thread_id_(thread_id) + , thread_config_(thread_config) { } @@ -242,7 +247,7 @@ class PacketsLog { std::unique_lock guard(resources_.cv_mutex); - if (!resources_.logging && !resources_.logging_thread) + if (!resources_.logging && !resources_.logging_thread.joinable()) { // already killed return; @@ -286,31 +291,26 @@ class PacketsLog resources_.work = false; } - if (resources_.logging_thread) + if (resources_.logging_thread.joinable()) { resources_.cv.notify_all(); - // The #ifdef workaround here is due to an unsolved MSVC bug, which Microsoft has announced - // they have no intention of solving: https://connect.microsoft.com/VisualStudio/feedback/details/747145 - // Each VS version deals with post-main deallocation of threads in a very different way. - #if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800 - resources_.logging_thread->join(); - #endif // if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800 - resources_.logging_thread.reset(); + resources_.logging_thread.join(); } } - // Note: In VS2013, if you're linking this class statically, you will have to call KillThread before leaving - // main, due to an unsolved MSVC bug. - void QueueLog( const typename TPacketConsumer::Pkt& packet) { { std::unique_lock guard(resources_.cv_mutex); - if (!resources_.logging && !resources_.logging_thread) + if (!resources_.logging && !resources_.logging_thread.joinable()) { resources_.logging = true; - resources_.logging_thread.reset(new std::thread(&PacketsLog::run, this)); + auto fn = [this]() + { + run(); + }; + resources_.logging_thread = create_thread(fn, thread_config_, "dds.shmd.%u", thread_id_); } } @@ -333,7 +333,7 @@ class PacketsLog { eprosima::fastrtps::DBQueue logs; std::vector> consumers; - std::unique_ptr logging_thread; + std::thread logging_thread; // Condition variable segment. std::condition_variable cv; @@ -356,10 +356,10 @@ class PacketsLog Resources resources_; uint32_t thread_id_; + ThreadSettings thread_config_; void run() { - set_name_to_current_thread("dds.shmd.%u", thread_id_); std::unique_lock guard(resources_.cv_mutex); while (resources_.logging) diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp index dc03804325a..8c54f702c6d 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp @@ -290,7 +290,7 @@ bool SharedMemTransport::init( auto packets_file_consumer = std::unique_ptr( new SHMPacketFileConsumer(configuration_.rtps_dump_file())); - packet_logger_ = std::make_shared>(0); + packet_logger_ = std::make_shared>(0, configuration_.dump_thread()); packet_logger_->RegisterConsumer(std::move(packets_file_consumer)); } } @@ -342,6 +342,7 @@ SharedMemChannelResource* SharedMemTransport::CreateInputChannelResource( locator, receiver, configuration_.rtps_dump_file(), + configuration_.dump_thread(), true, configuration_.get_thread_config_for_port(locator.port)); } diff --git a/src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp b/src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp index bdf6981328d..641677feec8 100644 --- a/src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp +++ b/src/cpp/rtps/transport/shared_mem/test_SharedMemChannelResource.hpp @@ -33,7 +33,10 @@ class test_SharedMemChannelResource : public SharedMemChannelResource TransportReceiverInterface* receiver, uint32_t big_buffer_size, uint32_t* big_buffer_size_count) - : SharedMemChannelResource(listener, locator, receiver, std::string(), false, ThreadSettings{}) + : SharedMemChannelResource( + listener, locator, receiver, + std::string(), ThreadSettings{}, + false, ThreadSettings{}) , big_buffer_size_(big_buffer_size) , big_buffer_size_count_(big_buffer_size_count) { From 30ba24ab414703100fbbfead9798c83c061b2085 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 21 Sep 2023 10:33:50 +0200 Subject: [PATCH 15/38] Refs #19436. Apply settings on UDP reception threads. Signed-off-by: Miguel Company --- src/cpp/rtps/transport/UDPChannelResource.cpp | 14 ++++++++++---- src/cpp/rtps/transport/UDPChannelResource.h | 6 +++++- src/cpp/rtps/transport/UDPTransportInterface.cpp | 2 +- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/cpp/rtps/transport/UDPChannelResource.cpp b/src/cpp/rtps/transport/UDPChannelResource.cpp index 62904444c7c..03efd8737d1 100644 --- a/src/cpp/rtps/transport/UDPChannelResource.cpp +++ b/src/cpp/rtps/transport/UDPChannelResource.cpp @@ -15,7 +15,10 @@ #include #include + +#include #include + #include #include @@ -32,7 +35,8 @@ UDPChannelResource::UDPChannelResource( uint32_t maxMsgSize, const Locator& locator, const std::string& sInterface, - TransportReceiverInterface* receiver) + TransportReceiverInterface* receiver, + const ThreadSettings& thread_config) : ChannelResource(maxMsgSize) , message_receiver_(receiver) , socket_(moveSocket(socket)) @@ -40,7 +44,11 @@ UDPChannelResource::UDPChannelResource( , interface_(sInterface) , transport_(transport) { - thread(std::thread(&UDPChannelResource::perform_listen_operation, this, locator)); + auto fn = [this, locator]() + { + perform_listen_operation(locator); + }; + thread(create_thread(fn, thread_config, "dds.udp.%u", locator.port)); } UDPChannelResource::~UDPChannelResource() @@ -54,8 +62,6 @@ UDPChannelResource::~UDPChannelResource() void UDPChannelResource::perform_listen_operation( Locator input_locator) { - set_name_to_current_thread("dds.udp.%u", input_locator.port); - Locator remote_locator; while (alive()) diff --git a/src/cpp/rtps/transport/UDPChannelResource.h b/src/cpp/rtps/transport/UDPChannelResource.h index 146b164c3f4..a496a89bf13 100644 --- a/src/cpp/rtps/transport/UDPChannelResource.h +++ b/src/cpp/rtps/transport/UDPChannelResource.h @@ -16,7 +16,10 @@ #define _FASTDDS_UDP_CHANNEL_RESOURCE_INFO_ #include + +#include #include + #include namespace eprosima { @@ -110,7 +113,8 @@ class UDPChannelResource : public ChannelResource uint32_t maxMsgSize, const Locator& locator, const std::string& sInterface, - TransportReceiverInterface* receiver); + TransportReceiverInterface* receiver, + const ThreadSettings& thread_config); virtual ~UDPChannelResource() override; diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 19adeb1a91a..820056e26b3 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -231,7 +231,7 @@ UDPChannelResource* UDPTransportInterface::CreateInputChannelResource( eProsimaUDPSocket unicastSocket = OpenAndBindInputSocket(sInterface, IPLocator::getPhysicalPort(locator), is_multicast); UDPChannelResource* p_channel_resource = new UDPChannelResource(this, unicastSocket, maxMsgSize, locator, - sInterface, receiver); + sInterface, receiver, configuration()->get_thread_config_for_port(locator.port)); return p_channel_resource; } From 580cc073accecbf3f1548a09dffb924b88dc2e19 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 21 Sep 2023 11:10:09 +0200 Subject: [PATCH 16/38] Refs #19436. Apply settings on TCP accept and keep_alive threads. Signed-off-by: Miguel Company --- .../rtps/transport/TCPTransportInterface.cpp | 28 +++++++++---------- .../rtps/transport/TCPTransportInterface.h | 4 +-- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 72fd3b9834b..2dc03fda8db 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -196,11 +196,10 @@ void TCPTransportInterface::clean() alive_.store(false); keep_alive_event_.cancel(); - if (io_service_timers_thread_) + if (io_service_timers_thread_.joinable()) { io_service_timers_.stop(); - io_service_timers_thread_->join(); - io_service_timers_thread_ = nullptr; + io_service_timers_thread_.join(); } { @@ -242,11 +241,10 @@ void TCPTransportInterface::clean() } } - if (io_service_thread_) + if (io_service_thread_.joinable()) { io_service_.stop(); - io_service_thread_->join(); - io_service_thread_ = nullptr; + io_service_thread_.join(); } } @@ -455,7 +453,6 @@ bool TCPTransportInterface::init( auto ioServiceFunction = [&]() { - set_name_to_current_thread("dds.tcp_accept"); #if ASIO_VERSION >= 101200 asio::executor_work_guard work(io_service_.get_executor()); #else @@ -463,21 +460,22 @@ bool TCPTransportInterface::init( #endif // if ASIO_VERSION >= 101200 io_service_.run(); }; - io_service_thread_ = std::make_shared(ioServiceFunction); + io_service_thread_ = create_thread(ioServiceFunction, configuration()->accept_thread, "dds.tcp_accept"); if (0 < configuration()->keep_alive_frequency_ms) { - io_service_timers_thread_ = std::make_shared([&]() - { - set_name_to_current_thread("dds.tcp_keep"); + auto ioServiceTimersFunction = [&]() + { #if ASIO_VERSION >= 101200 - asio::executor_work_guard work(io_service_timers_. + asio::executor_work_guard work(io_service_timers_. get_executor()); #else - io_service::work work(io_service_timers_); + io_service::work work(io_service_timers_); #endif // if ASIO_VERSION >= 101200 - io_service_timers_.run(); - }); + io_service_timers_.run(); + }; + io_service_timers_thread_ = create_thread(ioServiceTimersFunction, + configuration()->keep_alive_thread, "dds.tcp_keep"); } return true; diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index f21f61b2e3f..337c9a34ac6 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -79,8 +79,8 @@ class TCPTransportInterface : public TransportInterface #if TLS_FOUND asio::ssl::context ssl_context_; #endif // if TLS_FOUND - std::shared_ptr io_service_thread_; - std::shared_ptr io_service_timers_thread_; + std::thread io_service_thread_; + std::thread io_service_timers_thread_; std::shared_ptr rtcp_message_manager_; std::mutex rtcp_message_manager_mutex_; std::condition_variable rtcp_message_manager_cv_; From 0a9d3bbc53b3381e391dd45071842f8694ced7c1 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 21 Sep 2023 11:52:55 +0200 Subject: [PATCH 17/38] Refs #19436. Apply settings on TCP reception threads. Signed-off-by: Miguel Company --- .../rtps/transport/TCPTransportInterface.cpp | 29 +++++++++++-------- .../rtps/transport/TCPTransportInterface.h | 26 ++++++++++------- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 2dc03fda8db..2d9d50fe020 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -835,6 +835,20 @@ void TCPTransportInterface::keep_alive() */ } +void TCPTransportInterface::create_listening_thread( + const std::shared_ptr& channel) +{ + std::weak_ptr channel_weak_ptr = channel; + std::weak_ptr rtcp_manager_weak_ptr = rtcp_message_manager_; + auto fn = [this, channel_weak_ptr, rtcp_manager_weak_ptr]() + { + perform_listen_operation(channel_weak_ptr, rtcp_manager_weak_ptr); + }; + uint32_t port = channel->local_endpoint().port(); + const ThreadSettings& thr_config = configuration()->get_thread_config_for_port(port); + channel->thread(create_thread(fn, thr_config, "dds.tcp.%u", port)); +} + void TCPTransportInterface::perform_listen_operation( std::weak_ptr channel_weak, std::weak_ptr rtcp_manager) @@ -1338,10 +1352,7 @@ void TCPTransportInterface::SocketAccepted( } channel->set_options(configuration()); - std::weak_ptr channel_weak_ptr = channel; - std::weak_ptr rtcp_manager_weak_ptr = rtcp_message_manager_; - channel->thread(std::thread(&TCPTransportInterface::perform_listen_operation, this, - channel_weak_ptr, rtcp_manager_weak_ptr)); + create_listening_thread(channel); EPROSIMA_LOG_INFO(RTCP, "Accepted connection (local: " << IPLocator::to_string(locator) << ", remote: " @@ -1385,10 +1396,7 @@ void TCPTransportInterface::SecureSocketAccepted( } secure_channel->set_options(configuration()); - std::weak_ptr channel_weak_ptr = secure_channel; - std::weak_ptr rtcp_manager_weak_ptr = rtcp_message_manager_; - secure_channel->thread(std::thread(&TCPTransportInterface::perform_listen_operation, this, - channel_weak_ptr, rtcp_manager_weak_ptr)); + create_listening_thread(secure_channel); EPROSIMA_LOG_INFO(RTCP, " Accepted connection (local: " << IPLocator::to_string(locator) << ", remote: " << socket->lowest_layer().remote_endpoint().address() @@ -1430,10 +1438,7 @@ void TCPTransportInterface::SocketConnected( { channel->change_status(TCPChannelResource::eConnectionStatus::eConnected); channel->set_options(configuration()); - - std::weak_ptr rtcp_manager_weak_ptr = rtcp_message_manager_; - channel->thread(std::thread(&TCPTransportInterface::perform_listen_operation, this, - channel_weak_ptr, rtcp_manager_weak_ptr)); + create_listening_thread(channel); } } else diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index 337c9a34ac6..412c905b475 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -15,12 +15,23 @@ #ifndef _FASTDDS_TCP_TRANSPORT_INTERFACE_H_ #define _FASTDDS_TCP_TRANSPORT_INTERFACE_H_ -#include +#include +#include +#include +#include +#include + +#include +#include + #include +#include #include + #include -#include #include +#include + #if TLS_FOUND #define OPENSSL_API_COMPAT 10101 #include @@ -29,14 +40,6 @@ #include -#include -#include -#include -#include -#include -#include -#include - namespace eprosima { namespace fastdds { namespace rtps { @@ -195,6 +198,9 @@ class TCPTransportInterface : public TransportInterface std::shared_ptr& channel, const Locator& remote_locator); + void create_listening_thread( + const std::shared_ptr& channel); + public: friend class RTCPMessageManager; From ee8c2db21aed2213f002270c7a52d4e00d1255c2 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 21 Sep 2023 15:14:32 +0200 Subject: [PATCH 18/38] Refs #19436. Include what you use. Signed-off-by: Miguel Company --- src/cpp/utils/threading/threading_osx.ipp | 2 ++ src/cpp/utils/threading/threading_pthread.ipp | 1 + src/cpp/utils/threading/threading_win32.ipp | 2 ++ 3 files changed, 5 insertions(+) diff --git a/src/cpp/utils/threading/threading_osx.ipp b/src/cpp/utils/threading/threading_osx.ipp index e89ab5aeddc..8cbf489e53e 100644 --- a/src/cpp/utils/threading/threading_osx.ipp +++ b/src/cpp/utils/threading/threading_osx.ipp @@ -16,6 +16,8 @@ #include #include +#include + namespace eprosima { template diff --git a/src/cpp/utils/threading/threading_pthread.ipp b/src/cpp/utils/threading/threading_pthread.ipp index 018888d6169..bdc0f67eef9 100644 --- a/src/cpp/utils/threading/threading_pthread.ipp +++ b/src/cpp/utils/threading/threading_pthread.ipp @@ -21,6 +21,7 @@ #include #include +#include namespace eprosima { diff --git a/src/cpp/utils/threading/threading_win32.ipp b/src/cpp/utils/threading/threading_win32.ipp index 22400f5a437..a3739153d86 100644 --- a/src/cpp/utils/threading/threading_win32.ipp +++ b/src/cpp/utils/threading/threading_win32.ipp @@ -16,6 +16,8 @@ #include #include +#include + namespace eprosima { template From c63e9f02885897690dd0f44904991d3040817957 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 25 Sep 2023 14:27:39 +0200 Subject: [PATCH 19/38] Refs #19436. Add MacOS implementation for setting scheduler and priority. Signed-off-by: Miguel Company --- src/cpp/utils/threading/threading_osx.ipp | 55 ++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/src/cpp/utils/threading/threading_osx.ipp b/src/cpp/utils/threading/threading_osx.ipp index 8cbf489e53e..109ccbbbf4b 100644 --- a/src/cpp/utils/threading/threading_osx.ipp +++ b/src/cpp/utils/threading/threading_osx.ipp @@ -16,6 +16,7 @@ #include #include +#include #include namespace eprosima { @@ -50,9 +51,61 @@ void set_name_to_current_thread( set_name_to_current_thread_impl(fmt, arg1, arg2); } +static void configure_current_thread_scheduler( + int sched_class, + int sched_priority) +{ + pthread_t self_tid = pthread_self(); + sched_param param; + int result = 0; + + memset(¶m, 0, sizeof(param)); + param.sched_priority = 0; + + // + // Set Scheduler Class and Priority + // + + if((sched_class == SCHED_OTHER) || + (sched_class == SCHED_BATCH) || + (sched_class == SCHED_IDLE)) + { + // + // BATCH and IDLE do not have explicit priority values. + // - Requires priorty value to be zero (0). + + result = pthread_setschedparam(self_tid, sched_class, ¶m); + + // + // Sched OTHER has a nice value, that we pull from the priority parameter. + // + + if(sched_class == SCHED_OTHER) + { + result = setpriority(PRIO_PROCESS, gettid(), sched_priority); + } + } + else if((sched_class == SCHED_FIFO) || + (sched_class == SCHED_RR)) + { + // + // RT Policies use a different priority numberspace. + // + + param.sched_priority = sched_priority; + result = pthread_setschedparam(self_tid, sched_class, ¶m); + } + + if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << strerror(result) << "' configuring scheduler for thread " << self_tid); + } +} + void apply_thread_settings_to_current_thread( - const fastdds::rtps::ThreadSettings& /*settings*/) + const fastdds::rtps::ThreadSettings& settings) { + configure_current_thread_scheduler(settings.scheduling_policy, settings.priority); } } // namespace eprosima From 0babc874a81174d1b4b5ae22bf2f4a098c519e9d Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 25 Sep 2023 16:47:48 +0200 Subject: [PATCH 20/38] Refs #19436. Add MacOS implementation for setting thread affinity. Signed-off-by: Miguel Company --- src/cpp/utils/threading/threading_osx.ipp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/cpp/utils/threading/threading_osx.ipp b/src/cpp/utils/threading/threading_osx.ipp index 109ccbbbf4b..e7afe658c7f 100644 --- a/src/cpp/utils/threading/threading_osx.ipp +++ b/src/cpp/utils/threading/threading_osx.ipp @@ -102,10 +102,19 @@ static void configure_current_thread_scheduler( } } +static void configure_current_thread_affinity( + uint32_t affinity_mask) +{ + thread_affinity_policy_data_t policy = { m_affinityMask }; + pthread_t self_tid = pthread_self(); + thread_policy_set(pthread_mach_thread_np(self_tid), THREAD_AFFINITY_POLICY, (thread_policy_t)&policy, 1); +} + void apply_thread_settings_to_current_thread( const fastdds::rtps::ThreadSettings& settings) { configure_current_thread_scheduler(settings.scheduling_policy, settings.priority); + configure_current_thread_affinity(settings.cpu_mask); } } // namespace eprosima From 84cc93b071fe709c9dc3ab326da2696ca1a57173 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 5 Oct 2023 12:52:48 +0200 Subject: [PATCH 21/38] Refs #19437. Member cpu_mask changed to affinity and made it 64 bits. Signed-off-by: Miguel Company --- .../rtps/attributes/ThreadSettings.hpp | 6 ++-- src/cpp/rtps/attributes/ThreadSettings.cpp | 2 +- src/cpp/utils/threading/threading_osx.ipp | 15 +++++--- src/cpp/utils/threading/threading_pthread.ipp | 21 +++++++----- .../dds/participant/ParticipantTests.cpp | 8 ++--- test/unittest/logging/LogTests.cpp | 2 +- .../rtps/attributes/ThreadSettingsTests.cpp | 34 +++++++++---------- .../PortBasedTransportDescriptorTests.cpp | 24 ++++++------- 8 files changed, 61 insertions(+), 51 deletions(-) diff --git a/include/fastdds/rtps/attributes/ThreadSettings.hpp b/include/fastdds/rtps/attributes/ThreadSettings.hpp index bd6c9ad34ae..00a3d5c5f2f 100644 --- a/include/fastdds/rtps/attributes/ThreadSettings.hpp +++ b/include/fastdds/rtps/attributes/ThreadSettings.hpp @@ -59,16 +59,16 @@ struct RTPS_DllAPI ThreadSettings int32_t priority = 0; /** - * @brief The thread's core affinity. + * @brief The thread's affinity. * - * cpu_mask is a bit mask for setting the threads affinity to each core individually. + * On some systems, this is a bit mask for setting the threads affinity to each core individually. * A value of 0 indicates no particular affinity. * * This value is platform specific and it is used as-is to configure the specific platform thread. * Setting this value to something other than the default one may require different privileges * on different platforms. */ - uint32_t cpu_mask = 0; + uint64_t affinity = 0; /** * @brief The thread's stack size in bytes. diff --git a/src/cpp/rtps/attributes/ThreadSettings.cpp b/src/cpp/rtps/attributes/ThreadSettings.cpp index 8ee925711d1..0c2761539f6 100644 --- a/src/cpp/rtps/attributes/ThreadSettings.cpp +++ b/src/cpp/rtps/attributes/ThreadSettings.cpp @@ -21,7 +21,7 @@ bool ThreadSettings::operator ==( { return (scheduling_policy == rhs.scheduling_policy && priority == rhs.priority && - cpu_mask == rhs.cpu_mask && + affinity == rhs.affinity && stack_size == rhs.stack_size); } diff --git a/src/cpp/utils/threading/threading_osx.ipp b/src/cpp/utils/threading/threading_osx.ipp index e7afe658c7f..034230900c4 100644 --- a/src/cpp/utils/threading/threading_osx.ipp +++ b/src/cpp/utils/threading/threading_osx.ipp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include #include #include @@ -103,18 +105,21 @@ static void configure_current_thread_scheduler( } static void configure_current_thread_affinity( - uint32_t affinity_mask) + uint64_t affinity) { - thread_affinity_policy_data_t policy = { m_affinityMask }; - pthread_t self_tid = pthread_self(); - thread_policy_set(pthread_mach_thread_np(self_tid), THREAD_AFFINITY_POLICY, (thread_policy_t)&policy, 1); + if (affinity <= static_cast(std::numeric_limits::max())) + { + thread_affinity_policy_data_t policy = { static_cast(affinity) }; + pthread_t self_tid = pthread_self(); + thread_policy_set(pthread_mach_thread_np(self_tid), THREAD_AFFINITY_POLICY, (thread_policy_t)&policy, 1); + } } void apply_thread_settings_to_current_thread( const fastdds::rtps::ThreadSettings& settings) { configure_current_thread_scheduler(settings.scheduling_policy, settings.priority); - configure_current_thread_affinity(settings.cpu_mask); + configure_current_thread_affinity(settings.affinity); } } // namespace eprosima diff --git a/src/cpp/utils/threading/threading_pthread.ipp b/src/cpp/utils/threading/threading_pthread.ipp index bdc0f67eef9..42c6f3356d9 100644 --- a/src/cpp/utils/threading/threading_pthread.ipp +++ b/src/cpp/utils/threading/threading_pthread.ipp @@ -108,12 +108,12 @@ static void configure_current_thread_scheduler( } static void configure_current_thread_affinity( - uint32_t affinity_mask) -{ + uint64_t affinity_mask) +{ int a; int result; int cpu_count; - cpu_set_t cpu_set; + cpu_set_t cpu_set; pthread_t self_tid = pthread_self(); result = 0; @@ -129,16 +129,21 @@ static void configure_current_thread_affinity( // We only consider up to the total number of CPU's the // system has. // - cpu_count = get_nprocs_conf(); - + for(a = 0; a < cpu_count; a++) { - if(affinity_mask & (1 << a)) - { + if(0 != (affinity_mask & 1)) + { CPU_SET(a, &cpu_set); result++; } + affinity_mask >>= 1; + } + + if (affinity_mask > 0) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Affinity mask has more processors than the ones present in the system"); } if(result > 0) @@ -156,7 +161,7 @@ void apply_thread_settings_to_current_thread( const fastdds::rtps::ThreadSettings& settings) { configure_current_thread_scheduler(settings.scheduling_policy, settings.priority); - configure_current_thread_affinity(settings.cpu_mask); + configure_current_thread_affinity(settings.affinity); } } // namespace eprosima diff --git a/test/unittest/dds/participant/ParticipantTests.cpp b/test/unittest/dds/participant/ParticipantTests.cpp index 84af1c34e77..91f89279ef4 100644 --- a/test/unittest/dds/participant/ParticipantTests.cpp +++ b/test/unittest/dds/participant/ParticipantTests.cpp @@ -3002,23 +3002,23 @@ TEST(ParticipantTests, UpdatableDomainParticipantQos) // Check that the builtin_controllers_sender_thread can not be changed in an enabled participant participant->get_qos(pqos); - pqos.builtin_controllers_sender_thread().cpu_mask = 1; + pqos.builtin_controllers_sender_thread().affinity = 1; ASSERT_EQ(participant->set_qos(pqos), ReturnCode_t::RETCODE_IMMUTABLE_POLICY); // Check that the timed_events_thread can not be changed in an enabled participant participant->get_qos(pqos); - pqos.timed_events_thread().cpu_mask = 1; + pqos.timed_events_thread().affinity = 1; ASSERT_EQ(participant->set_qos(pqos), ReturnCode_t::RETCODE_IMMUTABLE_POLICY); // Check that the discovery_server_thread can not be changed in an enabled participant participant->get_qos(pqos); - pqos.discovery_server_thread().cpu_mask = 1; + pqos.discovery_server_thread().affinity = 1; ASSERT_EQ(participant->set_qos(pqos), ReturnCode_t::RETCODE_IMMUTABLE_POLICY); #if HAVE_SECURITY // Check that the security_log_thread can not be changed in an enabled participant participant->get_qos(pqos); - pqos.security_log_thread().cpu_mask = 1; + pqos.security_log_thread().affinity = 1; ASSERT_EQ(participant->set_qos(pqos), ReturnCode_t::RETCODE_IMMUTABLE_POLICY); ASSERT_EQ(DomainParticipantFactory::get_instance()->delete_participant(participant), ReturnCode_t::RETCODE_OK); diff --git a/test/unittest/logging/LogTests.cpp b/test/unittest/logging/LogTests.cpp index 6eb6bfa5c8a..1ee93155f47 100644 --- a/test/unittest/logging/LogTests.cpp +++ b/test/unittest/logging/LogTests.cpp @@ -673,7 +673,7 @@ TEST_F(LogTests, thread_config) // Set thread settings eprosima::fastdds::rtps::ThreadSettings thr_settings{}; #if defined(_POSIX_SOURCE) - thr_settings.cpu_mask = 3; + thr_settings.affinity = 3; thr_settings.scheduling_policy = SCHED_OTHER; thr_settings.priority = 1; #endif // if defined(_POSIX_SOURCE) diff --git a/test/unittest/rtps/attributes/ThreadSettingsTests.cpp b/test/unittest/rtps/attributes/ThreadSettingsTests.cpp index 25a540a2b40..8abce8ff719 100644 --- a/test/unittest/rtps/attributes/ThreadSettingsTests.cpp +++ b/test/unittest/rtps/attributes/ThreadSettingsTests.cpp @@ -27,49 +27,49 @@ TEST(ThreadSettingsTests, EqualOperators) // Fixed scheduling_policy cases settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); @@ -77,43 +77,43 @@ TEST(ThreadSettingsTests, EqualOperators) // Fixed priority cases (not already covered) settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); - // Fixed cpu_mask cases (not already covered) + // Fixed affinity cases (not already covered) settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); @@ -121,14 +121,14 @@ TEST(ThreadSettingsTests, EqualOperators) // Fixed stack_size cases (not already covered) settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask; + settings_2.affinity = settings_1.affinity; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); @@ -136,7 +136,7 @@ TEST(ThreadSettingsTests, EqualOperators) // All different settings_2.scheduling_policy = settings_1.scheduling_policy + 1; settings_2.priority = settings_1.priority + 1; - settings_2.cpu_mask = settings_1.cpu_mask + 1; + settings_2.affinity = settings_1.affinity + 1; settings_2.stack_size = settings_1.stack_size + 1; ASSERT_FALSE(settings_1 == settings_2); ASSERT_TRUE(settings_1 != settings_2); diff --git a/test/unittest/transport/PortBasedTransportDescriptorTests.cpp b/test/unittest/transport/PortBasedTransportDescriptorTests.cpp index 88c7b5e7021..28bca4a8bab 100644 --- a/test/unittest/transport/PortBasedTransportDescriptorTests.cpp +++ b/test/unittest/transport/PortBasedTransportDescriptorTests.cpp @@ -65,7 +65,7 @@ TEST_F(PortBasedTransportDescriptorTests, get_thread_config_for_port) PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings; set_settings[1234].scheduling_policy = 33; set_settings[1234].priority = 33; - set_settings[1234].cpu_mask = 33; + set_settings[1234].affinity = 33; set_settings[1234].stack_size = 33; ASSERT_TRUE(reception_threads(set_settings)); @@ -86,7 +86,7 @@ TEST_F(PortBasedTransportDescriptorTests, set_thread_config_for_port) PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings; set_settings[1234].scheduling_policy = 33; set_settings[1234].priority = 33; - set_settings[1234].cpu_mask = 33; + set_settings[1234].affinity = 33; set_settings[1234].stack_size = 33; ASSERT_TRUE(reception_threads(set_settings)); @@ -116,7 +116,7 @@ TEST_F(PortBasedTransportDescriptorTests, set_default_reception_threads) ThreadSettings set_settings; set_settings.scheduling_policy = 33; set_settings.priority = 33; - set_settings.cpu_mask = 33; + set_settings.affinity = 33; set_settings.stack_size = 33; ASSERT_NE(initial_settings, set_settings); @@ -137,7 +137,7 @@ TEST_F(PortBasedTransportDescriptorTests, set_reception_threads) PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings; set_settings[1234].scheduling_policy = 33; set_settings[1234].priority = 33; - set_settings[1234].cpu_mask = 33; + set_settings[1234].affinity = 33; set_settings[1234].stack_size = 33; ASSERT_NE(initial_settings, set_settings); @@ -170,7 +170,7 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator) ThreadSettings set_settings; set_settings.scheduling_policy = 33; set_settings.priority = 33; - set_settings.cpu_mask = 33; + set_settings.affinity = 33; set_settings.stack_size = 33; other.default_reception_threads(set_settings); ASSERT_FALSE(*this == other); @@ -185,7 +185,7 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator) PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings_map; set_settings_map[1234].scheduling_policy = 33; set_settings_map[1234].priority = 33; - set_settings_map[1234].cpu_mask = 33; + set_settings_map[1234].affinity = 33; set_settings_map[1234].stack_size = 33; ASSERT_TRUE(other.reception_threads(set_settings_map)); ASSERT_FALSE(*this == other); @@ -202,7 +202,7 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator) ThreadSettings set_settings; set_settings.scheduling_policy = 33; set_settings.priority = 33; - set_settings.cpu_mask = 33; + set_settings.affinity = 33; set_settings.stack_size = 33; other.default_reception_threads(set_settings); @@ -220,7 +220,7 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator) PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings_map; set_settings_map[1234].scheduling_policy = 33; set_settings_map[1234].priority = 33; - set_settings_map[1234].cpu_mask = 33; + set_settings_map[1234].affinity = 33; set_settings_map[1234].stack_size = 33; ASSERT_TRUE(other.reception_threads(set_settings_map)); @@ -236,14 +236,14 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator) ThreadSettings set_settings; set_settings.scheduling_policy = 33; set_settings.priority = 33; - set_settings.cpu_mask = 33; + set_settings.affinity = 33; set_settings.stack_size = 33; other.default_reception_threads(set_settings); PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings_map; set_settings_map[1234].scheduling_policy = 33; set_settings_map[1234].priority = 33; - set_settings_map[1234].cpu_mask = 33; + set_settings_map[1234].affinity = 33; set_settings_map[1234].stack_size = 33; ASSERT_TRUE(other.reception_threads(set_settings_map)); @@ -261,14 +261,14 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator) ThreadSettings set_settings; set_settings.scheduling_policy = 33; set_settings.priority = 33; - set_settings.cpu_mask = 33; + set_settings.affinity = 33; set_settings.stack_size = 33; other.default_reception_threads(set_settings); PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings_map; set_settings_map[1234].scheduling_policy = 33; set_settings_map[1234].priority = 33; - set_settings_map[1234].cpu_mask = 33; + set_settings_map[1234].affinity = 33; set_settings_map[1234].stack_size = 33; ASSERT_TRUE(other.reception_threads(set_settings_map)); From 711e4e7b3614e46cc2c1ca914c395df842b2f843 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 5 Oct 2023 13:03:18 +0200 Subject: [PATCH 22/38] Refs #19437. Windows implementation for thread affinity. Signed-off-by: Miguel Company --- src/cpp/utils/threading/threading_win32.ipp | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/cpp/utils/threading/threading_win32.ipp b/src/cpp/utils/threading/threading_win32.ipp index a3739153d86..f35f6dfbe92 100644 --- a/src/cpp/utils/threading/threading_win32.ipp +++ b/src/cpp/utils/threading/threading_win32.ipp @@ -55,9 +55,22 @@ void set_name_to_current_thread( set_name_to_current_thread_impl(fmt, arg1, arg2); } +static void configure_current_thread_affinity( + uint64_t affinity_mask) +{ + if (affinity_mask != 0) + { + if (0 == SetThreadAffinityMask(GetCurrentThread(), affinity_mask)) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << GetLastError() << "' configuring affinity for thread " << GetCurrentThread()); + } + } +} + void apply_thread_settings_to_current_thread( - const fastdds::rtps::ThreadSettings& /*settings*/) + const fastdds::rtps::ThreadSettings& settings) { + configure_current_thread_affinity(settings.affinity); } } // namespace eprosima From cd8ea380d72f2733eec3b53a4cce9129649aadba Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 5 Oct 2023 13:12:44 +0200 Subject: [PATCH 23/38] Refs #19437. Windows implementation for thread priority. Signed-off-by: Miguel Company --- src/cpp/utils/threading/threading_win32.ipp | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/cpp/utils/threading/threading_win32.ipp b/src/cpp/utils/threading/threading_win32.ipp index f35f6dfbe92..886671eefa7 100644 --- a/src/cpp/utils/threading/threading_win32.ipp +++ b/src/cpp/utils/threading/threading_win32.ipp @@ -55,6 +55,18 @@ void set_name_to_current_thread( set_name_to_current_thread_impl(fmt, arg1, arg2); } +static void configure_current_thread_priority( + int32_t priority) +{ + if (priority != 0) + { + if (0 == SetThreadPriority(GetCurrentThread(), priority)) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << GetLastError() << "' configuring priority for thread " << GetCurrentThread()); + } + } +} + static void configure_current_thread_affinity( uint64_t affinity_mask) { @@ -70,6 +82,7 @@ static void configure_current_thread_affinity( void apply_thread_settings_to_current_thread( const fastdds::rtps::ThreadSettings& settings) { + configure_current_thread_priority(settings.priority); configure_current_thread_affinity(settings.affinity); } From e88a5742eb93edaf3222f8d3104e0e2e8527c809 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 6 Oct 2023 09:48:36 +0200 Subject: [PATCH 24/38] Refs #19436. Made `get_thread_config_for_port` a const method. Signed-off-by: Miguel Company --- include/fastdds/rtps/transport/PortBasedTransportDescriptor.hpp | 2 +- src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/fastdds/rtps/transport/PortBasedTransportDescriptor.hpp b/include/fastdds/rtps/transport/PortBasedTransportDescriptor.hpp index e34002ac0a5..e35ad1dfa33 100644 --- a/include/fastdds/rtps/transport/PortBasedTransportDescriptor.hpp +++ b/include/fastdds/rtps/transport/PortBasedTransportDescriptor.hpp @@ -77,7 +77,7 @@ class PortBasedTransportDescriptor : public TransportDescriptorInterface * @return The ThreadSettings for the given port. */ virtual RTPS_DllAPI const ThreadSettings& get_thread_config_for_port( - uint32_t port); + uint32_t port) const; virtual RTPS_DllAPI bool set_thread_config_for_port( const uint32_t& port, diff --git a/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp b/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp index d57ba8d30dd..91640346586 100644 --- a/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp +++ b/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp @@ -38,7 +38,7 @@ bool PortBasedTransportDescriptor::operator ==( } const ThreadSettings& PortBasedTransportDescriptor::get_thread_config_for_port( - uint32_t port) + uint32_t port) const { auto search = reception_threads_.find(port); if (search != reception_threads_.end()) From a9ed0d9411ee411a98cf6062bb112031d4e5f5e4 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 6 Oct 2023 11:16:22 +0200 Subject: [PATCH 25/38] Refs #19436. Apply suggestions from code review. Signed-off-by: Miguel Company Co-authored-by: Eduardo Ponz Segrelles --- src/cpp/utils/threading.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cpp/utils/threading.hpp b/src/cpp/utils/threading.hpp index 7d7b9d43166..100b49d87b0 100644 --- a/src/cpp/utils/threading.hpp +++ b/src/cpp/utils/threading.hpp @@ -87,12 +87,12 @@ void apply_thread_settings_to_current_thread( */ template std::thread create_thread( - const Functor& func, + Functor func, const fastdds::rtps::ThreadSettings& settings, const char* name, Args... args) { - return std::thread([&]() + return std::thread([=]() { apply_thread_settings_to_current_thread(settings); set_name_to_current_thread(name, args ...); From 59b53c080b1c56c0857cdafe07ac3b974da8b37e Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 10 Oct 2023 16:30:49 +0200 Subject: [PATCH 26/38] Refs #19435. Some refactors on FileWatch: - Namespace moved to eprosima::filewatch - Constructor receives thread settings - Copy constructors deleted Signed-off-by: Miguel Company --- src/cpp/utils/SystemInfo.cpp | 2 +- thirdparty/filewatch/FileWatch.hpp | 60 +++++++++++++++--------------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/cpp/utils/SystemInfo.cpp b/src/cpp/utils/SystemInfo.cpp index 74fd688ba51..42f365720df 100644 --- a/src/cpp/utils/SystemInfo.cpp +++ b/src/cpp/utils/SystemInfo.cpp @@ -229,7 +229,7 @@ FileWatchHandle SystemInfo::watch_file( // No-op break; } - })); + }, {}, {})); #else // defined(_WIN32) || defined(__unix__) static_cast(filename); static_cast(callback); diff --git a/thirdparty/filewatch/FileWatch.hpp b/thirdparty/filewatch/FileWatch.hpp index a714d284953..37c462762d9 100644 --- a/thirdparty/filewatch/FileWatch.hpp +++ b/thirdparty/filewatch/FileWatch.hpp @@ -23,6 +23,8 @@ #ifndef FILEWATCHER_H #define FILEWATCHER_H +#include + #include #ifdef _WIN32 @@ -67,6 +69,7 @@ #include #include +namespace eprosima { namespace filewatch { enum class Event { added, @@ -93,35 +96,33 @@ namespace filewatch { public: - FileWatch(T path, UnderpinningRegex pattern, std::function callback) : - _path(path), - _pattern(pattern), - _callback(callback), - _directory(get_directory(path)) + FileWatch( + T path, + UnderpinningRegex pattern, + std::function callback, + const fastdds::rtps::ThreadSettings& watch_thread_config, + const fastdds::rtps::ThreadSettings& callback_thread_config) + : _path(path) + , _pattern(pattern) + , _callback(callback) + , _directory(get_directory(path)) { - init(); + init(watch_thread_config, callback_thread_config); } - FileWatch(T path, std::function callback) : - FileWatch(path, UnderpinningRegex(_regex_all), callback) {} + FileWatch( + T path, + std::function callback, + const fastdds::rtps::ThreadSettings& watch_thread_config, + const fastdds::rtps::ThreadSettings& callback_thread_config) + : FileWatch(path, UnderpinningRegex(_regex_all), callback, watch_thread_config, callback_thread_config) {} ~FileWatch() { destroy(); } - FileWatch(const FileWatch& other) : FileWatch(other._path, other._callback) {} - - FileWatch& operator=(const FileWatch& other) - { - if (this == &other) { return *this; } - - destroy(); - _path = other._path; - _callback = other._callback; - _directory = get_directory(other._path); - init(); - return *this; - } + FileWatch(const FileWatch& other) = delete; + FileWatch& operator=(const FileWatch& other) = delete; // Const memeber varibles don't let me implent moves nicely, if moves are really wanted std::unique_ptr should be used and move that. FileWatch(FileWatch&&) = delete; @@ -201,7 +202,9 @@ namespace filewatch { const static std::size_t event_size = (sizeof(struct inotify_event)); #endif // __unix__ - void init() + void init( + const fastdds::rtps::ThreadSettings& watch_thread_config = {}, + const fastdds::rtps::ThreadSettings& callback_thread_config = {}) { #ifdef _WIN32 _close_event = CreateEvent(NULL, TRUE, FALSE, NULL); @@ -209,9 +212,8 @@ namespace filewatch { throw std::system_error(GetLastError(), std::system_category()); } #endif // WIN32 - _callback_thread = std::move(std::thread([this]() { + _callback_thread = create_thread([this]() { try { - eprosima::set_name_to_current_thread("dds.fwatch.cb"); callback_thread(); } catch (...) { try { @@ -219,10 +221,9 @@ namespace filewatch { } catch (...) {} // set_exception() may throw too } - })); - _watch_thread = std::move(std::thread([this]() { + }, callback_thread_config, "dds.fwatch.cb"); + _watch_thread = create_thread([this]() { try { - eprosima::set_name_to_current_thread("dds.fwatch"); monitor_directory(); } catch (...) { try { @@ -230,7 +231,7 @@ namespace filewatch { } catch (...) {} // set_exception() may throw too } - })); + }, watch_thread_config, "dds.fwatch"); std::future future = _running.get_future(); future.get(); //block until the monitor_directory is up and running @@ -624,5 +625,6 @@ namespace filewatch { template constexpr typename FileWatch::C FileWatch::_regex_all[]; template constexpr typename FileWatch::C FileWatch::_this_directory[]; -} +} // namespace filewatch +} // namespace eprosima #endif From 41c37026ddf342dc97bf9347ce706c2c3f9ffc93 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 10 Oct 2023 16:38:57 +0200 Subject: [PATCH 27/38] Refs #19435. SystemInfo::watch_file receives thread settings. Signed-off-by: Miguel Company --- src/cpp/rtps/RTPSDomain.cpp | 2 +- src/cpp/utils/SystemInfo.cpp | 8 ++++++-- src/cpp/utils/SystemInfo.hpp | 8 +++++++- test/unittest/utils/SystemInfoTests.cpp | 2 +- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/cpp/rtps/RTPSDomain.cpp b/src/cpp/rtps/RTPSDomain.cpp index d89595b498f..22ea30afe87 100644 --- a/src/cpp/rtps/RTPSDomain.cpp +++ b/src/cpp/rtps/RTPSDomain.cpp @@ -144,7 +144,7 @@ RTPSParticipant* RTPSDomainImpl::createParticipant( if (!filename.empty() && SystemInfo::file_exists(filename)) { // Create filewatch - instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback); + instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback, {}, {}); } else if (!filename.empty()) { diff --git a/src/cpp/utils/SystemInfo.cpp b/src/cpp/utils/SystemInfo.cpp index 42f365720df..8c285d39b0e 100644 --- a/src/cpp/utils/SystemInfo.cpp +++ b/src/cpp/utils/SystemInfo.cpp @@ -214,7 +214,9 @@ const std::string& SystemInfo::get_environment_file() FileWatchHandle SystemInfo::watch_file( std::string filename, - std::function callback) + std::function callback, + const fastdds::rtps::ThreadSettings& watch_thread_config, + const fastdds::rtps::ThreadSettings& callback_thread_config) { #if defined(_WIN32) || defined(__unix__) return FileWatchHandle (new filewatch::FileWatch(filename, @@ -229,10 +231,12 @@ FileWatchHandle SystemInfo::watch_file( // No-op break; } - }, {}, {})); + }, watch_thread_config, callback_thread_config)); #else // defined(_WIN32) || defined(__unix__) static_cast(filename); static_cast(callback); + static_cast(watch_thread_config); + static_cast(callback_thread_config); return FileWatchHandle(); #endif // defined(_WIN32) || defined(__unix__) } diff --git a/src/cpp/utils/SystemInfo.hpp b/src/cpp/utils/SystemInfo.hpp index d2b4420c4be..418da3d9963 100644 --- a/src/cpp/utils/SystemInfo.hpp +++ b/src/cpp/utils/SystemInfo.hpp @@ -25,6 +25,8 @@ #include #include +#include + #include #include @@ -193,12 +195,16 @@ class SystemInfo * * @param [in] filename Path/name of the file to watch. * @param [in] callback Callback to execute when the file changes. + * @param [in] watch_thread_config Thread settings for watch thread. + * @param [in] callback_thread_config Thread settings for callback thread. * * @return The handle that represents the watcher object. */ static FileWatchHandle watch_file( std::string filename, - std::function callback); + std::function callback, + const fastdds::rtps::ThreadSettings& watch_thread_config, + const fastdds::rtps::ThreadSettings& callback_thread_config); /** * Stop a file watcher. diff --git a/test/unittest/utils/SystemInfoTests.cpp b/test/unittest/utils/SystemInfoTests.cpp index 58af62ed4e4..075a4ae0c84 100644 --- a/test/unittest/utils/SystemInfoTests.cpp +++ b/test/unittest/utils/SystemInfoTests.cpp @@ -238,7 +238,7 @@ TEST_F(SystemInfoTests, FileWatchTest) eprosima::SystemInfo::wait_for_file_closure(filename, _1s); ++times_called_; cv_.notify_all(); - }); + }, {}, {}); // Read contents { From 324552769836e9113a20b6dfeb1cc11582eaec58 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 11 Oct 2023 09:54:08 +0200 Subject: [PATCH 28/38] Refs #19435. Added RTPSDomain::set_filewatch_thread_config Signed-off-by: Miguel Company --- include/fastdds/rtps/RTPSDomain.h | 13 ++++++++++++ src/cpp/rtps/RTPSDomain.cpp | 21 ++++++++++++++++++- src/cpp/rtps/RTPSDomainImpl.hpp | 15 +++++++++++++ .../rtps/RTPSDomain/fastdds/rtps/RTPSDomain.h | 7 +++++++ 4 files changed, 55 insertions(+), 1 deletion(-) diff --git a/include/fastdds/rtps/RTPSDomain.h b/include/fastdds/rtps/RTPSDomain.h index 287ad1f792d..401a1d1a6b3 100644 --- a/include/fastdds/rtps/RTPSDomain.h +++ b/include/fastdds/rtps/RTPSDomain.h @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -54,6 +55,18 @@ class RTPSDomain { public: + /** + * Method to set the configuration of the threads created by the file watcher for the environment file. + * In order for these settings to take effect, this method must be called before the first call + * to @ref createParticipant. + * + * @param watch_thread Settings for the thread watching the environment file. + * @param callback_thread Settings for the thread executing the callback when the environment file changed. + */ + RTPS_DllAPI static void set_filewatch_thread_config( + const fastdds::rtps::ThreadSettings& watch_thread, + const fastdds::rtps::ThreadSettings& callback_thread); + /** * Method to shut down all RTPSParticipants, readers, writers, etc. * It must be called at the end of the process to avoid memory leaks. diff --git a/src/cpp/rtps/RTPSDomain.cpp b/src/cpp/rtps/RTPSDomain.cpp index 22ea30afe87..0b385ea4359 100644 --- a/src/cpp/rtps/RTPSDomain.cpp +++ b/src/cpp/rtps/RTPSDomain.cpp @@ -66,6 +66,13 @@ std::shared_ptr RTPSDomainImpl::get_instance() return instance; } +void RTPSDomain::set_filewatch_thread_config( + const fastdds::rtps::ThreadSettings& watch_thread, + const fastdds::rtps::ThreadSettings& callback_thread) +{ + RTPSDomainImpl::set_filewatch_thread_config(watch_thread, callback_thread); +} + void RTPSDomain::stopAll() { RTPSDomainImpl::stopAll(); @@ -143,8 +150,10 @@ RTPSParticipant* RTPSDomainImpl::createParticipant( std::string filename = SystemInfo::get_environment_file(); if (!filename.empty() && SystemInfo::file_exists(filename)) { + std::lock_guard guard(instance->m_mutex); // Create filewatch - instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback, {}, {}); + instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback, + instance->watch_thread_config_, instance->callback_thread_config_); } else if (!filename.empty()) { @@ -778,6 +787,16 @@ void RTPSDomainImpl::file_watch_callback() } } +void RTPSDomainImpl::set_filewatch_thread_config( + const fastdds::rtps::ThreadSettings& watch_thread, + const fastdds::rtps::ThreadSettings& callback_thread) +{ + auto instance = get_instance(); + std::lock_guard guard(instance->m_mutex); + instance->watch_thread_config_ = watch_thread; + instance->callback_thread_config_ = callback_thread; +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/RTPSDomainImpl.hpp b/src/cpp/rtps/RTPSDomainImpl.hpp index fefa90b6001..88025e52404 100644 --- a/src/cpp/rtps/RTPSDomainImpl.hpp +++ b/src/cpp/rtps/RTPSDomainImpl.hpp @@ -25,6 +25,7 @@ #include #endif // defined(_WIN32) || defined(__unix__) +#include #include #include #include @@ -203,6 +204,18 @@ class RTPSDomainImpl */ static void file_watch_callback(); + /** + * Method to set the configuration of the threads created by the file watcher for the environment file. + * In order for these settings to take effect, this method must be called before the first call + * to @ref createParticipant. + * + * @param watch_thread Settings for the thread watching the environment file. + * @param callback_thread Settings for the thread executing the callback when the environment file changed. + */ + static void set_filewatch_thread_config( + const fastdds::rtps::ThreadSettings& watch_thread, + const fastdds::rtps::ThreadSettings& callback_thread); + private: /** @@ -252,6 +265,8 @@ class RTPSDomainImpl std::unordered_map m_RTPSParticipantIDs; FileWatchHandle file_watch_handle_; + fastdds::rtps::ThreadSettings watch_thread_config_; + fastdds::rtps::ThreadSettings callback_thread_config_; }; } // namespace rtps diff --git a/test/mock/rtps/RTPSDomain/fastdds/rtps/RTPSDomain.h b/test/mock/rtps/RTPSDomain/fastdds/rtps/RTPSDomain.h index 7cf26913643..4c16829e480 100644 --- a/test/mock/rtps/RTPSDomain/fastdds/rtps/RTPSDomain.h +++ b/test/mock/rtps/RTPSDomain/fastdds/rtps/RTPSDomain.h @@ -16,6 +16,7 @@ #define _FASTDDS_RTPS_DOMAIN_H_ #include +#include #include #include @@ -44,6 +45,12 @@ class RTPSDomain { public: + static void set_filewatch_thread_config( + const fastdds::rtps::ThreadSettings&, + const fastdds::rtps::ThreadSettings&) + { + } + static void stopAll() { } From 06ef85d64104b63750ee5e7f454c3c49036b1eed Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 11 Oct 2023 10:49:52 +0200 Subject: [PATCH 29/38] Refs #19435. Call RTPSDomain::set_filewatch_thread_config inside DomainParticipantFactory::create_participant Signed-off-by: Miguel Company --- src/cpp/fastdds/domain/DomainParticipantFactory.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cpp/fastdds/domain/DomainParticipantFactory.cpp b/src/cpp/fastdds/domain/DomainParticipantFactory.cpp index 7e837332dd9..da4989e16a4 100644 --- a/src/cpp/fastdds/domain/DomainParticipantFactory.cpp +++ b/src/cpp/fastdds/domain/DomainParticipantFactory.cpp @@ -157,6 +157,8 @@ DomainParticipant* DomainParticipantFactory::create_participant( { load_profiles(); + RTPSDomain::set_filewatch_thread_config(factory_qos_.file_watch_threads(), factory_qos_.file_watch_threads()); + const DomainParticipantQos& pqos = (&qos == &PARTICIPANT_QOS_DEFAULT) ? default_participant_qos_ : qos; DomainParticipant* dom_part = new DomainParticipant(mask); From a9f64d6b8fabeace5de85d3360f2bee03e54a378 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 16 Oct 2023 10:28:54 +0200 Subject: [PATCH 30/38] Refs #19435. Change priority default value. Signed-off-by: Miguel Company --- include/fastdds/rtps/attributes/ThreadSettings.hpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/include/fastdds/rtps/attributes/ThreadSettings.hpp b/include/fastdds/rtps/attributes/ThreadSettings.hpp index 00a3d5c5f2f..b3a78becfd8 100644 --- a/include/fastdds/rtps/attributes/ThreadSettings.hpp +++ b/include/fastdds/rtps/attributes/ThreadSettings.hpp @@ -17,6 +17,7 @@ */ #include +#include #include @@ -50,13 +51,13 @@ struct RTPS_DllAPI ThreadSettings * @brief The thread's priority. * * Configures the thread's priority. - * A value of 0 indicates system default. + * A value of -2^31 indicates system default. * * This value is platform specific and it is used as-is to configure the specific platform thread. * Setting this value to something other than the default one may require different privileges * on different platforms. */ - int32_t priority = 0; + int32_t priority = std::numeric_limits::min(); /** * @brief The thread's affinity. From f1a299fa9b20ada8cd5fa4f21117341f6a135e14 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 16 Oct 2023 10:30:39 +0200 Subject: [PATCH 31/38] Refs #19435. Account for default values in threading_pthread Signed-off-by: Miguel Company --- src/cpp/utils/threading/threading_pthread.ipp | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/cpp/utils/threading/threading_pthread.ipp b/src/cpp/utils/threading/threading_pthread.ipp index 42c6f3356d9..b22902b0a06 100644 --- a/src/cpp/utils/threading/threading_pthread.ipp +++ b/src/cpp/utils/threading/threading_pthread.ipp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include #include #include @@ -62,10 +64,18 @@ static void configure_current_thread_scheduler( { pthread_t self_tid = pthread_self(); sched_param param; + sched_param current_param; + int current_class; int result = 0; + bool change_priority = (std::numeric_limits::min() != sched_priority); + // Get current scheduling parameters + memset(¤t_param, 0, sizeof(current_param)); + pthread_getschedparam(self_tid, ¤t_class, ¤t_param); + memset(¶m, 0, sizeof(param)); param.sched_priority = 0; + sched_class = (sched_class == -1) ? current_class : sched_class; // // Set Scheduler Class and Priority @@ -85,7 +95,7 @@ static void configure_current_thread_scheduler( // Sched OTHER has a nice value, that we pull from the priority parameter. // - if(sched_class == SCHED_OTHER) + if(sched_class == SCHED_OTHER && change_priority) { result = setpriority(PRIO_PROCESS, gettid(), sched_priority); } @@ -97,7 +107,7 @@ static void configure_current_thread_scheduler( // RT Policies use a different priority numberspace. // - param.sched_priority = sched_priority; + param.sched_priority = change_priority ? sched_priority : current_param.sched_priority; result = pthread_setschedparam(self_tid, sched_class, ¶m); } From 393617ae8a463ff9721f6479744ce1fc2cab2ef1 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 16 Oct 2023 10:33:10 +0200 Subject: [PATCH 32/38] Refs #19435. Account for default values in threading_osx Signed-off-by: Miguel Company --- src/cpp/utils/threading/threading_osx.ipp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/cpp/utils/threading/threading_osx.ipp b/src/cpp/utils/threading/threading_osx.ipp index 034230900c4..52ba8b4acd2 100644 --- a/src/cpp/utils/threading/threading_osx.ipp +++ b/src/cpp/utils/threading/threading_osx.ipp @@ -59,10 +59,18 @@ static void configure_current_thread_scheduler( { pthread_t self_tid = pthread_self(); sched_param param; + sched_param current_param; + int current_class; int result = 0; + bool change_priority = (std::numeric_limits::min() != sched_priority); + // Get current scheduling parameters + memset(¤t_param, 0, sizeof(current_param)); + pthread_getschedparam(self_tid, ¤t_class, ¤t_param); + memset(¶m, 0, sizeof(param)); param.sched_priority = 0; + sched_class = (sched_class == -1) ? current_class : sched_class; // // Set Scheduler Class and Priority @@ -82,7 +90,7 @@ static void configure_current_thread_scheduler( // Sched OTHER has a nice value, that we pull from the priority parameter. // - if(sched_class == SCHED_OTHER) + if(sched_class == SCHED_OTHER && change_priority) { result = setpriority(PRIO_PROCESS, gettid(), sched_priority); } @@ -94,7 +102,7 @@ static void configure_current_thread_scheduler( // RT Policies use a different priority numberspace. // - param.sched_priority = sched_priority; + param.sched_priority = change_priority ? sched_priority : current_param.sched_priority; result = pthread_setschedparam(self_tid, sched_class, ¶m); } From 2383e86aade047e6df71fd87368967cce804b302 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 16 Oct 2023 10:35:23 +0200 Subject: [PATCH 33/38] Refs #19435. Account for default values in threading_win32 Signed-off-by: Miguel Company --- src/cpp/utils/threading/threading_win32.ipp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cpp/utils/threading/threading_win32.ipp b/src/cpp/utils/threading/threading_win32.ipp index 886671eefa7..4c3565663f5 100644 --- a/src/cpp/utils/threading/threading_win32.ipp +++ b/src/cpp/utils/threading/threading_win32.ipp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -58,7 +59,7 @@ void set_name_to_current_thread( static void configure_current_thread_priority( int32_t priority) { - if (priority != 0) + if (priority != std::numeric_limits::min()) { if (0 == SetThreadPriority(GetCurrentThread(), priority)) { From 76f12a47e9003f54c26d6c9f531a26eb92abb5df Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 16 Oct 2023 10:39:02 +0200 Subject: [PATCH 34/38] Refs #19435. Linters. Signed-off-by: Miguel Company --- src/cpp/fastdds/domain/DomainParticipantFactory.cpp | 2 +- test/unittest/utils/SystemInfoTests.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cpp/fastdds/domain/DomainParticipantFactory.cpp b/src/cpp/fastdds/domain/DomainParticipantFactory.cpp index da4989e16a4..ed27c18d7c5 100644 --- a/src/cpp/fastdds/domain/DomainParticipantFactory.cpp +++ b/src/cpp/fastdds/domain/DomainParticipantFactory.cpp @@ -158,7 +158,7 @@ DomainParticipant* DomainParticipantFactory::create_participant( load_profiles(); RTPSDomain::set_filewatch_thread_config(factory_qos_.file_watch_threads(), factory_qos_.file_watch_threads()); - + const DomainParticipantQos& pqos = (&qos == &PARTICIPANT_QOS_DEFAULT) ? default_participant_qos_ : qos; DomainParticipant* dom_part = new DomainParticipant(mask); diff --git a/test/unittest/utils/SystemInfoTests.cpp b/test/unittest/utils/SystemInfoTests.cpp index 075a4ae0c84..512a5b97147 100644 --- a/test/unittest/utils/SystemInfoTests.cpp +++ b/test/unittest/utils/SystemInfoTests.cpp @@ -238,7 +238,7 @@ TEST_F(SystemInfoTests, FileWatchTest) eprosima::SystemInfo::wait_for_file_closure(filename, _1s); ++times_called_; cv_.notify_all(); - }, {}, {}); + }, {}, {}); // Read contents { From 0c5d7d736f6d09bf099ddd4958f3ac6f2440520e Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 16 Oct 2023 10:42:52 +0200 Subject: [PATCH 35/38] Refs #19435. Use C++ headers. Signed-off-by: Miguel Company --- src/cpp/utils/threading/threading_osx.ipp | 4 ++-- src/cpp/utils/threading/threading_pthread.ipp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cpp/utils/threading/threading_osx.ipp b/src/cpp/utils/threading/threading_osx.ipp index 52ba8b4acd2..3131d3642a7 100644 --- a/src/cpp/utils/threading/threading_osx.ipp +++ b/src/cpp/utils/threading/threading_osx.ipp @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include #include -#include -#include #include #include diff --git a/src/cpp/utils/threading/threading_pthread.ipp b/src/cpp/utils/threading/threading_pthread.ipp index b22902b0a06..5803c3464e1 100644 --- a/src/cpp/utils/threading/threading_pthread.ipp +++ b/src/cpp/utils/threading/threading_pthread.ipp @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include #include -#include -#include #include #include #include From 2f7faecff56dfcb5ca485aa27923f92150752539 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 16 Oct 2023 10:50:01 +0200 Subject: [PATCH 36/38] Refs #19435. Documentation updates. Signed-off-by: Miguel Company --- include/fastdds/rtps/attributes/ThreadSettings.hpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/include/fastdds/rtps/attributes/ThreadSettings.hpp b/include/fastdds/rtps/attributes/ThreadSettings.hpp index b3a78becfd8..ed2a85f0b9b 100644 --- a/include/fastdds/rtps/attributes/ThreadSettings.hpp +++ b/include/fastdds/rtps/attributes/ThreadSettings.hpp @@ -42,6 +42,7 @@ struct RTPS_DllAPI ThreadSettings * A value of -1 indicates system default. * * This value is platform specific and it is used as-is to configure the specific platform thread. + * It is ignored on Windows platforms. * Setting this value to something other than the default one may require different privileges * on different platforms. */ @@ -62,7 +63,9 @@ struct RTPS_DllAPI ThreadSettings /** * @brief The thread's affinity. * - * On some systems, this is a bit mask for setting the threads affinity to each core individually. + * On some systems (Windows, Linux), this is a bit mask for setting the threads affinity to each core individually. + * On MacOS, this sets the affinity tag for the thread, and the OS tries to share the L2 cache between threads + * with the same affinity. * A value of 0 indicates no particular affinity. * * This value is platform specific and it is used as-is to configure the specific platform thread. From 68aac31daf506ef8a41075c826145791a4bab05b Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 16 Oct 2023 14:24:10 +0200 Subject: [PATCH 37/38] Refs #19435. Suggestions on Log test. Signed-off-by: Miguel Company --- test/unittest/logging/LogTests.cpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/test/unittest/logging/LogTests.cpp b/test/unittest/logging/LogTests.cpp index 1ee93155f47..a6caae4cb87 100644 --- a/test/unittest/logging/LogTests.cpp +++ b/test/unittest/logging/LogTests.cpp @@ -664,11 +664,17 @@ TEST_F(LogTests, flush_n) loggind_thread.join(); } +/** + * The goal of this test is to be able to manually check that the thread settings are applied, using an external + * tool like `htop` in Linux. + * It sets some scheduling configuration for the logging thread, and performs one log every second for 10 seconds, + * giving enough time for the tool to show the scheduling configuration and name of said thread. + */ TEST_F(LogTests, thread_config) { // Set general verbosity Log::SetVerbosity(Log::Info); - unsigned int n_logs = 1000; + const unsigned int n_logs = 10; // Set thread settings eprosima::fastdds::rtps::ThreadSettings thr_settings{}; @@ -682,7 +688,7 @@ TEST_F(LogTests, thread_config) for (unsigned int i = 0; i < n_logs; i++) { EPROSIMA_LOG_INFO(TEST_THREADS, "Info message " << i); - std::this_thread::sleep_for(std::chrono::milliseconds(250)); + std::this_thread::sleep_for(std::chrono::seconds(1)); } auto entries = HELPER_WaitForEntries(n_logs); From a7ef83c37f36f4bcb2806aa220c92800e5b393ce Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 17 Oct 2023 09:55:10 +0200 Subject: [PATCH 38/38] Refs #19435. Removed unused overload of create_thread. Signed-off-by: Miguel Company --- src/cpp/utils/threading.hpp | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/src/cpp/utils/threading.hpp b/src/cpp/utils/threading.hpp index 100b49d87b0..778d8662c35 100644 --- a/src/cpp/utils/threading.hpp +++ b/src/cpp/utils/threading.hpp @@ -100,30 +100,6 @@ std::thread create_thread( }); } -/** - * @brief Create and start a thread with custom name. - * - * This wrapper will create a thread on which the incoming functor will be called after - * applying giving it a custom name. - * - * @param[in] func Functor with the logic to be run on the created thread. - * @param[in] name Name (format) for the created thread. - * @param[in] args Additional arguments to complete the thread name. - * See @ref set_name_to_current_thread for details. - */ -template -std::thread create_thread( - const Functor& func, - const char* name, - Args... args) -{ - return std::thread([&]() - { - set_name_to_current_thread(name, args ...); - func(); - }); -} - } // eprosima #endif // UTILS__THREADING_HPP_