Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[19435] Apply thread settings #3874

Merged
merged 38 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c1499f6
Refs #19436. Added thread creation wrapper infrastructure.
MiguelCompany Sep 13, 2023
9c515c0
Refs #19436. Added empty implementation for apply_thread_settings_to_…
MiguelCompany Sep 13, 2023
8cf6176
Refs #19436. Refactor on Log.cpp
MiguelCompany Sep 13, 2023
fc45783
Refs #19436. Add implementation for setting scheduler and priority.
MiguelCompany Sep 19, 2023
12f5572
Refs #19436. Add implementation for setting cpu affinity.
MiguelCompany Sep 19, 2023
9327aaf
Refs #19436. Add test setting config for Log thread.
MiguelCompany Sep 19, 2023
14ac33e
Refs #19436. Fix SystemInfoTests link issue.
MiguelCompany Sep 20, 2023
b41d405
Refs #19436. Changes on ResourceEvent.
MiguelCompany Sep 20, 2023
0c2448c
Refs #19436. Changes on DataSharingListener.
MiguelCompany Sep 20, 2023
8048446
Refs #19436. Changes on FlowControllerImpl.
MiguelCompany Sep 20, 2023
b8e68a9
Refs #19436. Changes on security LogTopic.
MiguelCompany Sep 20, 2023
8071ff5
Refs #19436. Apply settings on SharedMemWatchdog.
MiguelCompany Sep 20, 2023
a456704
Refs #19436. Apply settings on SharedMem reception threads.
MiguelCompany Sep 21, 2023
974b09e
Refs #19436. Apply settings on SharedMem packet dump threads.
MiguelCompany Sep 21, 2023
30ba24a
Refs #19436. Apply settings on UDP reception threads.
MiguelCompany Sep 21, 2023
580cc07
Refs #19436. Apply settings on TCP accept and keep_alive threads.
MiguelCompany Sep 21, 2023
0a9d3bb
Refs #19436. Apply settings on TCP reception threads.
MiguelCompany Sep 21, 2023
ee8c2db
Refs #19436. Include what you use.
MiguelCompany Sep 21, 2023
c63e9f0
Refs #19436. Add MacOS implementation for setting scheduler and prior…
MiguelCompany Sep 25, 2023
0babc87
Refs #19436. Add MacOS implementation for setting thread affinity.
MiguelCompany Sep 25, 2023
84cc93b
Refs #19437. Member cpu_mask changed to affinity and made it 64 bits.
MiguelCompany Oct 5, 2023
711e4e7
Refs #19437. Windows implementation for thread affinity.
MiguelCompany Oct 5, 2023
cd8ea38
Refs #19437. Windows implementation for thread priority.
MiguelCompany Oct 5, 2023
e88a574
Refs #19436. Made `get_thread_config_for_port` a const method.
MiguelCompany Oct 6, 2023
a9ed0d9
Refs #19436. Apply suggestions from code review.
MiguelCompany Oct 6, 2023
59b53c0
Refs #19435. Some refactors on FileWatch:
MiguelCompany Oct 10, 2023
41c3702
Refs #19435. SystemInfo::watch_file receives thread settings.
MiguelCompany Oct 10, 2023
3245527
Refs #19435. Added RTPSDomain::set_filewatch_thread_config
MiguelCompany Oct 11, 2023
06ef85d
Refs #19435. Call RTPSDomain::set_filewatch_thread_config inside Doma…
MiguelCompany Oct 11, 2023
a9f64d6
Refs #19435. Change priority default value.
MiguelCompany Oct 16, 2023
f1a299f
Refs #19435. Account for default values in threading_pthread
MiguelCompany Oct 16, 2023
393617a
Refs #19435. Account for default values in threading_osx
MiguelCompany Oct 16, 2023
2383e86
Refs #19435. Account for default values in threading_win32
MiguelCompany Oct 16, 2023
76f12a4
Refs #19435. Linters.
MiguelCompany Oct 16, 2023
0c5d7d7
Refs #19435. Use C++ headers.
MiguelCompany Oct 16, 2023
2f7faec
Refs #19435. Documentation updates.
MiguelCompany Oct 16, 2023
68aac31
Refs #19435. Suggestions on Log test.
MiguelCompany Oct 16, 2023
a7ef83c
Refs #19435. Removed unused overload of create_thread.
MiguelCompany Oct 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/fastdds/rtps/RTPSDomain.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <set>

#include <fastdds/rtps/attributes/RTPSParticipantAttributes.h>
#include <fastdds/rtps/attributes/ThreadSettings.hpp>
#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastdds/rtps/history/IChangePool.h>
Expand Down Expand Up @@ -54,6 +55,18 @@ class RTPSDomain
{
public:

/**
* Method to set the configuration of the threads created by the file watcher for the environment file.
* In order for these settings to take effect, this method must be called before the first call
* to @ref createParticipant.
*
* @param watch_thread Settings for the thread watching the environment file.
* @param callback_thread Settings for the thread executing the callback when the environment file changed.
*/
RTPS_DllAPI static void set_filewatch_thread_config(
const fastdds::rtps::ThreadSettings& watch_thread,
const fastdds::rtps::ThreadSettings& callback_thread);

/**
* Method to shut down all RTPSParticipants, readers, writers, etc.
* It must be called at the end of the process to avoid memory leaks.
Expand Down
6 changes: 3 additions & 3 deletions include/fastdds/rtps/attributes/ThreadSettings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ struct RTPS_DllAPI ThreadSettings
int32_t priority = 0;

/**
* @brief The thread's core affinity.
* @brief The thread's affinity.
*
* cpu_mask is a bit mask for setting the threads affinity to each core individually.
* On some systems, this is a bit mask for setting the threads affinity to each core individually.
EduPonz marked this conversation as resolved.
Show resolved Hide resolved
* A value of 0 indicates no particular affinity.
*
* This value is platform specific and it is used as-is to configure the specific platform thread.
* Setting this value to something other than the default one may require different privileges
* on different platforms.
*/
uint32_t cpu_mask = 0;
uint64_t affinity = 0;

/**
* @brief The thread's stack size in bytes.
Expand Down
18 changes: 12 additions & 6 deletions include/fastdds/rtps/resources/ResourceEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <fastrtps/utils/TimedMutex.hpp>
#include <fastrtps/utils/TimedConditionVariable.hpp>

#include <atomic>
#include <functional>
#include <thread>
#include <vector>

#include <fastdds/rtps/attributes/ThreadSettings.hpp>
#include <fastrtps/utils/TimedMutex.hpp>
#include <fastrtps/utils/TimedConditionVariable.hpp>

namespace eprosima {
namespace fastrtps {
namespace rtps {
Expand All @@ -51,11 +52,16 @@ class ResourceEvent
/*!
* @brief Method to initialize the internal thread.
*
* @param[in] configure_cb Function to be called in the context of the started thread
* before calling the internal service routine.
* @param[in] thread_cfg Settings to apply to the created thread.
* @param[in] name_fmt A null-terminated string to be used as the format argument of
* a `snprintf` like function, taking `thread_id` as additional
* argument, and used to give a name to the created thread.
* @param[in] thread_id Single variadic argument passed to the formatting function.
*/
void init_thread(
std::function<void()> configure_cb = {});
const fastdds::rtps::ThreadSettings& thread_cfg = {},
const char* name_fmt = "event %u",
uint32_t thread_id = 0);

void stop_thread();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class PortBasedTransportDescriptor : public TransportDescriptorInterface
* @return The ThreadSettings for the given port.
*/
virtual RTPS_DllAPI const ThreadSettings& get_thread_config_for_port(
uint32_t port);
uint32_t port) const;

virtual RTPS_DllAPI bool set_thread_config_for_port(
const uint32_t& port,
Expand Down
5 changes: 5 additions & 0 deletions src/cpp/fastdds/domain/DomainParticipantFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <rtps/history/TopicPayloadPoolRegistry.hpp>
#include <statistics/fastdds/domain/DomainParticipantImpl.hpp>
#include <utils/SystemInfo.hpp>
#include <utils/shared_memory/SharedMemWatchdog.hpp>

using namespace eprosima::fastrtps::xmlparser;

Expand Down Expand Up @@ -156,6 +157,8 @@ DomainParticipant* DomainParticipantFactory::create_participant(
{
load_profiles();

RTPSDomain::set_filewatch_thread_config(factory_qos_.file_watch_threads(), factory_qos_.file_watch_threads());

const DomainParticipantQos& pqos = (&qos == &PARTICIPANT_QOS_DEFAULT) ? default_participant_qos_ : qos;

DomainParticipant* dom_part = new DomainParticipant(mask);
Expand Down Expand Up @@ -422,6 +425,8 @@ void DomainParticipantFactory::set_qos(
(void) first_time;
//As all the Qos can always be updated and none of them need to be sent
to = from;

rtps::SharedMemWatchdog::set_thread_settings(to.shm_watchdog_thread());
}

ReturnCode_t DomainParticipantFactory::check_qos(
Expand Down
37 changes: 18 additions & 19 deletions src/cpp/fastdds/log/Log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,17 @@ struct LogResources
void SetThreadConfig(
const rtps::ThreadSettings& config)
{
static_cast<void>(config);
return;
std::lock_guard<std::mutex> guard(cv_mutex_);
thread_settings_ = config;
}

//! Returns the logging_ engine to configuration defaults.
void Reset()
{
std::unique_lock<std::mutex> configGuard(config_mutex_);
rtps::ThreadSettings thr_config{};
SetThreadConfig(thr_config);

std::lock_guard<std::mutex> configGuard(config_mutex_);
category_filter_.reset();
filename_filter_.reset();
error_string_filter_.reset();
Expand All @@ -162,7 +165,7 @@ struct LogResources
{
std::unique_lock<std::mutex> guard(cv_mutex_);

if (!logging_ && !logging_thread_)
if (!logging_ && !logging_thread_.joinable())
{
// already killed
return;
Expand Down Expand Up @@ -237,19 +240,13 @@ struct LogResources
work_ = false;
}

if (logging_thread_)
if (logging_thread_.joinable())
{
cv_.notify_all();
// The #ifdef workaround here is due to an unsolved MSVC bug, which Microsoft has announced
// they have no intention of solving: https://connect.microsoft.com/VisualStudio/feedback/details/747145
// Each VS version deals with post-main deallocation of threads in a very different way.
#if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800
if (logging_thread_->joinable() && logging_thread_->get_id() != std::this_thread::get_id())
if (logging_thread_.get_id() != std::this_thread::get_id())
{
logging_thread_->join();
logging_thread_.join();
}
#endif // if !defined(_WIN32) || defined(FASTRTPS_STATIC_LINK) || _MSC_VER >= 1800
logging_thread_.reset();
}
}

Expand All @@ -258,17 +255,19 @@ struct LogResources
void StartThread()
{
std::unique_lock<std::mutex> guard(cv_mutex_);
if (!logging_ && !logging_thread_)
if (!logging_ && !logging_thread_.joinable())
{
logging_ = true;
logging_thread_.reset(new std::thread(&LogResources::run, this));
auto thread_fn = [this]()
{
run();
};
logging_thread_ = eprosima::create_thread(thread_fn, thread_settings_, "dds.log");
}
}

void run()
{
set_name_to_current_thread("dds.log");

std::unique_lock<std::mutex> guard(cv_mutex_);

while (logging_)
Expand Down Expand Up @@ -343,7 +342,7 @@ struct LogResources

fastrtps::DBQueue<Log::Entry> logs_;
std::vector<std::unique_ptr<LogConsumer>> consumers_;
std::unique_ptr<std::thread> logging_thread_;
std::thread logging_thread_;

// Condition variable segment.
std::condition_variable cv_;
Expand All @@ -361,7 +360,7 @@ struct LogResources
std::unique_ptr<std::regex> error_string_filter_;

std::atomic<Log::Kind> verbosity_;

rtps::ThreadSettings thread_settings_;
};

std::shared_ptr<LogResources> get_log_resources()
Expand Down
18 changes: 8 additions & 10 deletions src/cpp/rtps/DataSharing/DataSharingListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace rtps {
DataSharingListener::DataSharingListener(
std::shared_ptr<DataSharingNotification> notification,
const std::string& datasharing_pools_directory,
const fastdds::rtps::ThreadSettings& thr_config,
ResourceLimitedContainerConfig limits,
RTPSReader* reader)
: notification_(notification)
Expand All @@ -39,6 +40,7 @@ DataSharingListener::DataSharingListener(
, writer_pools_(limits)
, writer_pools_changed_(false)
, datasharing_pools_directory_(datasharing_pools_directory)
, thread_config_(thr_config)
{
}

Expand All @@ -50,8 +52,6 @@ DataSharingListener::~DataSharingListener()

void DataSharingListener::run()
{
set_name_to_current_thread("dds.dsha.%u", reader_->getGuid().entityId.to_uint32() & 0x0000FFFF);

std::unique_lock<Segment::mutex> lock(notification_->notification_->notification_mutex, std::defer_lock);
while (is_running_.load())
{
Expand Down Expand Up @@ -100,13 +100,15 @@ void DataSharingListener::start()
}

// Initialize the thread
listening_thread_ = new std::thread(&DataSharingListener::run, this);
uint32_t thread_id = reader_->getGuid().entityId.to_uint32() & 0x0000FFFF;
listening_thread_ = create_thread([this]()
{
run();
}, thread_config_, "dds.dsha.%u", thread_id);
}

void DataSharingListener::stop()
{
std::thread* thr = nullptr;

{
std::lock_guard<std::mutex> guard(mutex_);

Expand All @@ -116,15 +118,11 @@ void DataSharingListener::stop()
{
return;
}

thr = listening_thread_;
listening_thread_ = nullptr;
}

// Notify the thread and wait for it to finish
notification_->notify();
thr->join();
delete thr;
listening_thread_.join();
}

void DataSharingListener::process_new_data ()
Expand Down
16 changes: 10 additions & 6 deletions src/cpp/rtps/DataSharing/DataSharingListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
#ifndef RTPS_DATASHARING_DATASHARINGLISTENER_HPP
#define RTPS_DATASHARING_DATASHARINGLISTENER_HPP

#include <atomic>
#include <map>
#include <memory>

#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/attributes/ThreadSettings.hpp>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>

#include <rtps/DataSharing/IDataSharingListener.hpp>
#include <rtps/DataSharing/DataSharingNotification.hpp>
#include <rtps/DataSharing/ReaderPool.hpp>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>

#include <memory>
#include <atomic>
#include <map>

namespace eprosima {
namespace fastrtps {
Expand All @@ -46,6 +48,7 @@ class DataSharingListener : public IDataSharingListener
DataSharingListener(
std::shared_ptr<DataSharingNotification> notification,
const std::string& datasharing_pools_directory,
const fastdds::rtps::ThreadSettings& thr_config,
ResourceLimitedContainerConfig limits,
RTPSReader* reader);

Expand Down Expand Up @@ -111,10 +114,11 @@ class DataSharingListener : public IDataSharingListener
std::shared_ptr<DataSharingNotification> notification_;
std::atomic<bool> is_running_;
RTPSReader* reader_;
std::thread* listening_thread_;
std::thread listening_thread_;
ResourceLimitedVector<WriterInfo> writer_pools_;
std::atomic<bool> writer_pools_changed_;
std::string datasharing_pools_directory_;
fastdds::rtps::ThreadSettings thread_config_;
mutable std::mutex mutex_;

};
Expand Down
21 changes: 20 additions & 1 deletion src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ std::shared_ptr<RTPSDomainImpl> RTPSDomainImpl::get_instance()
return instance;
}

void RTPSDomain::set_filewatch_thread_config(
const fastdds::rtps::ThreadSettings& watch_thread,
const fastdds::rtps::ThreadSettings& callback_thread)
{
RTPSDomainImpl::set_filewatch_thread_config(watch_thread, callback_thread);
}

void RTPSDomain::stopAll()
{
RTPSDomainImpl::stopAll();
Expand Down Expand Up @@ -143,8 +150,10 @@ RTPSParticipant* RTPSDomainImpl::createParticipant(
std::string filename = SystemInfo::get_environment_file();
if (!filename.empty() && SystemInfo::file_exists(filename))
{
std::lock_guard<std::mutex> guard(instance->m_mutex);
// Create filewatch
instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback);
instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback,
instance->watch_thread_config_, instance->callback_thread_config_);
}
else if (!filename.empty())
{
Expand Down Expand Up @@ -778,6 +787,16 @@ void RTPSDomainImpl::file_watch_callback()
}
}

void RTPSDomainImpl::set_filewatch_thread_config(
const fastdds::rtps::ThreadSettings& watch_thread,
const fastdds::rtps::ThreadSettings& callback_thread)
{
auto instance = get_instance();
std::lock_guard<std::mutex> guard(instance->m_mutex);
instance->watch_thread_config_ = watch_thread;
instance->callback_thread_config_ = callback_thread;
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
15 changes: 15 additions & 0 deletions src/cpp/rtps/RTPSDomainImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <FileWatch.hpp>
#endif // defined(_WIN32) || defined(__unix__)

#include <fastdds/rtps/attributes/ThreadSettings.hpp>
#include <fastrtps/rtps/reader/RTPSReader.h>
#include <fastrtps/rtps/RTPSDomain.h>
#include <fastrtps/rtps/writer/RTPSWriter.h>
Expand Down Expand Up @@ -203,6 +204,18 @@ class RTPSDomainImpl
*/
static void file_watch_callback();

/**
* Method to set the configuration of the threads created by the file watcher for the environment file.
* In order for these settings to take effect, this method must be called before the first call
* to @ref createParticipant.
*
* @param watch_thread Settings for the thread watching the environment file.
* @param callback_thread Settings for the thread executing the callback when the environment file changed.
*/
static void set_filewatch_thread_config(
const fastdds::rtps::ThreadSettings& watch_thread,
const fastdds::rtps::ThreadSettings& callback_thread);

private:

/**
Expand Down Expand Up @@ -252,6 +265,8 @@ class RTPSDomainImpl
std::unordered_map<uint32_t, ParticipantIDState> m_RTPSParticipantIDs;

FileWatchHandle file_watch_handle_;
fastdds::rtps::ThreadSettings watch_thread_config_;
fastdds::rtps::ThreadSettings callback_thread_config_;
};

} // namespace rtps
Expand Down
Loading