Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[19907] Enable configuration of thread setting for all threads #4013

Merged
merged 9 commits into from
Nov 21, 2023
Prev Previous commit
Next Next commit
Apply thread settings (#3874)
* Refs #19436. Added thread creation wrapper infrastructure.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Added empty implementation for apply_thread_settings_to_current_thread.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Refactor on Log.cpp

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Add implementation for setting scheduler and priority.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Add implementation for setting cpu affinity.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Add test setting config for Log thread.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Fix SystemInfoTests link issue.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Changes on ResourceEvent.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Changes on DataSharingListener.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Changes on FlowControllerImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Changes on security LogTopic.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Apply settings on SharedMemWatchdog.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Apply settings on SharedMem reception threads.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Apply settings on SharedMem packet dump threads.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Apply settings on UDP reception threads.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Apply settings on TCP accept and keep_alive threads.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Apply settings on TCP reception threads.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Include what you use.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Add MacOS implementation for setting scheduler and priority.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Add MacOS implementation for setting thread affinity.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19437. Member cpu_mask changed to affinity and made it 64 bits.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19437. Windows implementation for thread affinity.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19437. Windows implementation for thread priority.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Made `get_thread_config_for_port` a const method.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19436. Apply suggestions from code review.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: Eduardo Ponz Segrelles <eduardoponz@eprosima.com>

* Refs #19435. Some refactors on FileWatch:
- Namespace moved to eprosima::filewatch
- Constructor receives thread settings
- Copy constructors deleted

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19435. SystemInfo::watch_file receives thread settings.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19435. Added RTPSDomain::set_filewatch_thread_config

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19435. Call RTPSDomain::set_filewatch_thread_config inside DomainParticipantFactory::create_participant

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19435. Change priority default value.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19435. Account for default values in threading_pthread

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19435. Account for default values in threading_osx

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19435. Account for default values in threading_win32

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19435. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19435. Use C++ headers.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19435. Documentation updates.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19435. Suggestions on Log test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #19435. Removed unused overload of create_thread.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

---------

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Co-authored-by: Eduardo Ponz Segrelles <eduardoponz@eprosima.com>
  • Loading branch information
MiguelCompany and EduPonz committed Nov 20, 2023
commit 010a9d9793dfcf076f2a8b144e5e29a61f21e104
13 changes: 13 additions & 0 deletions include/fastdds/rtps/RTPSDomain.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <set>

#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastdds/rtps/attributes/ThreadSettings.hpp>
#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastdds/rtps/history/IChangePool.h>
Expand Down Expand Up @@ -54,6 +55,18 @@ class RTPSDomain
{
public:

/**
* Method to set the configuration of the threads created by the file watcher for the environment file.
* In order for these settings to take effect, this method must be called before the first call
* to @ref createParticipant.
*
* @param watch_thread Settings for the thread watching the environment file.
* @param callback_thread Settings for the thread executing the callback when the environment file changed.
*/
RTPS_DllAPI static void set_filewatch_thread_config(
const fastdds::rtps::ThreadSettings& watch_thread,
const fastdds::rtps::ThreadSettings& callback_thread);

/**
* Method to shut down all RTPSParticipants, readers, writers, etc.
* It must be called at the end of the process to avoid memory leaks.
Expand Down
14 changes: 9 additions & 5 deletions include/fastdds/rtps/attributes/ThreadSettings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

#include <cstdint>
#include <limits>

#include <fastrtps/fastrtps_dll.h>

Expand All @@ -41,6 +42,7 @@ struct RTPS_DllAPI ThreadSettings
* A value of -1 indicates system default.
*
* This value is platform specific and it is used as-is to configure the specific platform thread.
* It is ignored on Windows platforms.
* Setting this value to something other than the default one may require different privileges
* on different platforms.
*/
Expand All @@ -50,25 +52,27 @@ struct RTPS_DllAPI ThreadSettings
* @brief The thread's priority.
*
* Configures the thread's priority.
* A value of 0 indicates system default.
* A value of -2^31 indicates system default.
*
* This value is platform specific and it is used as-is to configure the specific platform thread.
* Setting this value to something other than the default one may require different privileges
* on different platforms.
*/
int32_t priority = 0;
int32_t priority = std::numeric_limits<int32_t>::min();

/**
* @brief The thread's core affinity.
* @brief The thread's affinity.
*
* cpu_mask is a bit mask for setting the threads affinity to each core individually.
* On some systems (Windows, Linux), this is a bit mask for setting the threads affinity to each core individually.
* On MacOS, this sets the affinity tag for the thread, and the OS tries to share the L2 cache between threads
* with the same affinity.
* A value of 0 indicates no particular affinity.
*
* This value is platform specific and it is used as-is to configure the specific platform thread.
* Setting this value to something other than the default one may require different privileges
* on different platforms.
*/
uint32_t cpu_mask = 0;
uint64_t affinity = 0;

/**
* @brief The thread's stack size in bytes.
Expand Down
18 changes: 12 additions & 6 deletions include/fastdds/rtps/resources/ResourceEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <fastrtps/utils/TimedMutex.hpp>
#include <fastrtps/utils/TimedConditionVariable.hpp>

#include <atomic>
#include <functional>
#include <thread>
#include <vector>

#include <fastdds/rtps/attributes/ThreadSettings.hpp>
#include <fastrtps/utils/TimedMutex.hpp>
#include <fastrtps/utils/TimedConditionVariable.hpp>

namespace eprosima {
namespace fastrtps {
namespace rtps {
Expand All @@ -51,11 +52,16 @@ class ResourceEvent
/*!
* @brief Method to initialize the internal thread.
*
* @param[in] configure_cb Function to be called in the context of the started thread
* before calling the internal service routine.
* @param[in] thread_cfg Settings to apply to the created thread.
* @param[in] name_fmt A null-terminated string to be used as the format argument of
* a `snprintf` like function, taking `thread_id` as additional
* argument, and used to give a name to the created thread.
* @param[in] thread_id Single variadic argument passed to the formatting function.
*/
void init_thread(
std::function<void()> configure_cb = {});
const fastdds::rtps::ThreadSettings& thread_cfg = {},
const char* name_fmt = "event %u",
uint32_t thread_id = 0);

void stop_thread();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class PortBasedTransportDescriptor : public TransportDescriptorInterface
* @return The ThreadSettings for the given port.
*/
virtual RTPS_DllAPI const ThreadSettings& get_thread_config_for_port(
uint32_t port);
uint32_t port) const;

virtual RTPS_DllAPI bool set_thread_config_for_port(
const uint32_t& port,
Expand Down
5 changes: 5 additions & 0 deletions src/cpp/fastdds/domain/DomainParticipantFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <rtps/history/TopicPayloadPoolRegistry.hpp>
#include <statistics/fastdds/domain/DomainParticipantImpl.hpp>
#include <utils/SystemInfo.hpp>
#include <utils/shared_memory/SharedMemWatchdog.hpp>

using namespace eprosima::fastrtps::xmlparser;

Expand Down Expand Up @@ -156,6 +157,8 @@ DomainParticipant* DomainParticipantFactory::create_participant(
{
load_profiles();

RTPSDomain::set_filewatch_thread_config(factory_qos_.file_watch_threads(), factory_qos_.file_watch_threads());

const DomainParticipantQos& pqos = (&qos == &PARTICIPANT_QOS_DEFAULT) ? default_participant_qos_ : qos;

DomainParticipant* dom_part = new DomainParticipant(mask);
Expand Down Expand Up @@ -422,6 +425,8 @@ void DomainParticipantFactory::set_qos(
(void) first_time;
//As all the Qos can always be updated and none of them need to be sent
to = from;

rtps::SharedMemWatchdog::set_thread_settings(to.shm_watchdog_thread());
}

ReturnCode_t DomainParticipantFactory::check_qos(
Expand Down
37 changes: 18 additions & 19 deletions src/cpp/fastdds/log/Log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,17 @@ struct LogResources
void SetThreadConfig(
const rtps::ThreadSettings& config)
{
static_cast<void>(config);
return;
std::lock_guard<std::mutex> guard(cv_mutex_);
thread_settings_ = config;
}

//! Returns the logging_ engine to configuration defaults.
void Reset()
{
std::unique_lock<std::mutex> configGuard(config_mutex_);
rtps::ThreadSettings thr_config{};
SetThreadConfig(thr_config);

std::lock_guard<std::mutex> configGuard(config_mutex_);
category_filter_.reset();
filename_filter_.reset();
error_string_filter_.reset();
Expand All @@ -162,7 +165,7 @@ struct LogResources
{
std::unique_lock<std::mutex> guard(cv_mutex_);

if (!logging_ && !logging_thread_)
if (!logging_ && !logging_thread_.joinable())
{
// already killed
return;
Expand Down Expand Up @@ -237,19 +240,13 @@ struct LogResources
work_ = false;
}

if (logging_thread_)
if (logging_thread_.joinable())
{
cv_.notify_all();
// The #ifdef workaround here is due to an unsolved MSVC bug, which Microsoft has announced
// they have no intention of solving: https://connect.microsoft.com/VisualStudio/feedback/details/747145
// Each VS version deals with post-main deallocation of threads in a very different way.
#if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800
if (logging_thread_->joinable() && logging_thread_->get_id() != std::this_thread::get_id())
if (logging_thread_.get_id() != std::this_thread::get_id())
{
logging_thread_->join();
logging_thread_.join();
}
#endif // if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800
logging_thread_.reset();
}
}

Expand All @@ -258,17 +255,19 @@ struct LogResources
void StartThread()
{
std::unique_lock<std::mutex> guard(cv_mutex_);
if (!logging_ && !logging_thread_)
if (!logging_ && !logging_thread_.joinable())
{
logging_ = true;
logging_thread_.reset(new std::thread(&LogResources::run, this));
auto thread_fn = [this]()
{
run();
};
logging_thread_ = eprosima::create_thread(thread_fn, thread_settings_, "dds.log");
}
}

void run()
{
set_name_to_current_thread("dds.log");

std::unique_lock<std::mutex> guard(cv_mutex_);

while (logging_)
Expand Down Expand Up @@ -343,7 +342,7 @@ struct LogResources

fastrtps::DBQueue<Log::Entry> logs_;
std::vector<std::unique_ptr<LogConsumer>> consumers_;
std::unique_ptr<std::thread> logging_thread_;
std::thread logging_thread_;

// Condition variable segment.
std::condition_variable cv_;
Expand All @@ -361,7 +360,7 @@ struct LogResources
std::unique_ptr<std::regex> error_string_filter_;

std::atomic<Log::Kind> verbosity_;

rtps::ThreadSettings thread_settings_;
};

std::shared_ptr<LogResources> get_log_resources()
Expand Down
18 changes: 8 additions & 10 deletions src/cpp/rtps/DataSharing/DataSharingListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace rtps {
DataSharingListener::DataSharingListener(
std::shared_ptr<DataSharingNotification> notification,
const std::string& datasharing_pools_directory,
const fastdds::rtps::ThreadSettings& thr_config,
ResourceLimitedContainerConfig limits,
RTPSReader* reader)
: notification_(notification)
Expand All @@ -39,6 +40,7 @@ DataSharingListener::DataSharingListener(
, writer_pools_(limits)
, writer_pools_changed_(false)
, datasharing_pools_directory_(datasharing_pools_directory)
, thread_config_(thr_config)
{
}

Expand All @@ -50,8 +52,6 @@ DataSharingListener::~DataSharingListener()

void DataSharingListener::run()
{
set_name_to_current_thread("dds.dsha.%u", reader_->getGuid().entityId.to_uint32() & 0x0000FFFF);

std::unique_lock<Segment::mutex> lock(notification_->notification_->notification_mutex, std::defer_lock);
while (is_running_.load())
{
Expand Down Expand Up @@ -100,13 +100,15 @@ void DataSharingListener::start()
}

// Initialize the thread
listening_thread_ = new std::thread(&DataSharingListener::run, this);
uint32_t thread_id = reader_->getGuid().entityId.to_uint32() & 0x0000FFFF;
listening_thread_ = create_thread([this]()
{
run();
}, thread_config_, "dds.dsha.%u", thread_id);
}

void DataSharingListener::stop()
{
std::thread* thr = nullptr;

{
std::lock_guard<std::mutex> guard(mutex_);

Expand All @@ -116,15 +118,11 @@ void DataSharingListener::stop()
{
return;
}

thr = listening_thread_;
listening_thread_ = nullptr;
}

// Notify the thread and wait for it to finish
notification_->notify();
thr->join();
delete thr;
listening_thread_.join();
}

void DataSharingListener::process_new_data ()
Expand Down
16 changes: 10 additions & 6 deletions src/cpp/rtps/DataSharing/DataSharingListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
#ifndef RTPS_DATASHARING_DATASHARINGLISTENER_HPP
#define RTPS_DATASHARING_DATASHARINGLISTENER_HPP

#include <atomic>
#include <map>
#include <memory>

#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/attributes/ThreadSettings.hpp>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>

#include <rtps/DataSharing/IDataSharingListener.hpp>
#include <rtps/DataSharing/DataSharingNotification.hpp>
#include <rtps/DataSharing/ReaderPool.hpp>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>

#include <memory>
#include <atomic>
#include <map>

namespace eprosima {
namespace fastrtps {
Expand All @@ -46,6 +48,7 @@ class DataSharingListener : public IDataSharingListener
DataSharingListener(
std::shared_ptr<DataSharingNotification> notification,
const std::string& datasharing_pools_directory,
const fastdds::rtps::ThreadSettings& thr_config,
ResourceLimitedContainerConfig limits,
RTPSReader* reader);

Expand Down Expand Up @@ -111,10 +114,11 @@ class DataSharingListener : public IDataSharingListener
std::shared_ptr<DataSharingNotification> notification_;
std::atomic<bool> is_running_;
RTPSReader* reader_;
std::thread* listening_thread_;
std::thread listening_thread_;
ResourceLimitedVector<WriterInfo> writer_pools_;
std::atomic<bool> writer_pools_changed_;
std::string datasharing_pools_directory_;
fastdds::rtps::ThreadSettings thread_config_;
mutable std::mutex mutex_;

};
Expand Down
21 changes: 20 additions & 1 deletion src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ std::shared_ptr<RTPSDomainImpl> RTPSDomainImpl::get_instance()
return instance;
}

void RTPSDomain::set_filewatch_thread_config(
const fastdds::rtps::ThreadSettings& watch_thread,
const fastdds::rtps::ThreadSettings& callback_thread)
{
RTPSDomainImpl::set_filewatch_thread_config(watch_thread, callback_thread);
}

void RTPSDomain::stopAll()
{
RTPSDomainImpl::stopAll();
Expand Down Expand Up @@ -143,8 +150,10 @@ RTPSParticipant* RTPSDomainImpl::createParticipant(
std::string filename = SystemInfo::get_environment_file();
if (!filename.empty() && SystemInfo::file_exists(filename))
{
std::lock_guard<std::mutex> guard(instance->m_mutex);
// Create filewatch
instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback);
instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback,
instance->watch_thread_config_, instance->callback_thread_config_);
}
else if (!filename.empty())
{
Expand Down Expand Up @@ -778,6 +787,16 @@ void RTPSDomainImpl::file_watch_callback()
}
}

void RTPSDomainImpl::set_filewatch_thread_config(
const fastdds::rtps::ThreadSettings& watch_thread,
const fastdds::rtps::ThreadSettings& callback_thread)
{
auto instance = get_instance();
std::lock_guard<std::mutex> guard(instance->m_mutex);
instance->watch_thread_config_ = watch_thread;
instance->callback_thread_config_ = callback_thread;
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
Loading