Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[19435] Apply thread settings #3874

Merged
merged 38 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
c1499f6
Refs #19436. Added thread creation wrapper infrastructure.
MiguelCompany Sep 13, 2023
9c515c0
Refs #19436. Added empty implementation for apply_thread_settings_to_…
MiguelCompany Sep 13, 2023
8cf6176
Refs #19436. Refactor on Log.cpp
MiguelCompany Sep 13, 2023
fc45783
Refs #19436. Add implementation for setting scheduler and priority.
MiguelCompany Sep 19, 2023
12f5572
Refs #19436. Add implementation for setting cpu affinity.
MiguelCompany Sep 19, 2023
9327aaf
Refs #19436. Add test setting config for Log thread.
MiguelCompany Sep 19, 2023
14ac33e
Refs #19436. Fix SystemInfoTests link issue.
MiguelCompany Sep 20, 2023
b41d405
Refs #19436. Changes on ResourceEvent.
MiguelCompany Sep 20, 2023
0c2448c
Refs #19436. Changes on DataSharingListener.
MiguelCompany Sep 20, 2023
8048446
Refs #19436. Changes on FlowControllerImpl.
MiguelCompany Sep 20, 2023
b8e68a9
Refs #19436. Changes on security LogTopic.
MiguelCompany Sep 20, 2023
8071ff5
Refs #19436. Apply settings on SharedMemWatchdog.
MiguelCompany Sep 20, 2023
a456704
Refs #19436. Apply settings on SharedMem reception threads.
MiguelCompany Sep 21, 2023
974b09e
Refs #19436. Apply settings on SharedMem packet dump threads.
MiguelCompany Sep 21, 2023
30ba24a
Refs #19436. Apply settings on UDP reception threads.
MiguelCompany Sep 21, 2023
580cc07
Refs #19436. Apply settings on TCP accept and keep_alive threads.
MiguelCompany Sep 21, 2023
0a9d3bb
Refs #19436. Apply settings on TCP reception threads.
MiguelCompany Sep 21, 2023
ee8c2db
Refs #19436. Include what you use.
MiguelCompany Sep 21, 2023
c63e9f0
Refs #19436. Add MacOS implementation for setting scheduler and prior…
MiguelCompany Sep 25, 2023
0babc87
Refs #19436. Add MacOS implementation for setting thread affinity.
MiguelCompany Sep 25, 2023
84cc93b
Refs #19437. Member cpu_mask changed to affinity and made it 64 bits.
MiguelCompany Oct 5, 2023
711e4e7
Refs #19437. Windows implementation for thread affinity.
MiguelCompany Oct 5, 2023
cd8ea38
Refs #19437. Windows implementation for thread priority.
MiguelCompany Oct 5, 2023
e88a574
Refs #19436. Made `get_thread_config_for_port` a const method.
MiguelCompany Oct 6, 2023
a9ed0d9
Refs #19436. Apply suggestions from code review.
MiguelCompany Oct 6, 2023
59b53c0
Refs #19435. Some refactors on FileWatch:
MiguelCompany Oct 10, 2023
41c3702
Refs #19435. SystemInfo::watch_file receives thread settings.
MiguelCompany Oct 10, 2023
3245527
Refs #19435. Added RTPSDomain::set_filewatch_thread_config
MiguelCompany Oct 11, 2023
06ef85d
Refs #19435. Call RTPSDomain::set_filewatch_thread_config inside Doma…
MiguelCompany Oct 11, 2023
a9f64d6
Refs #19435. Change priority default value.
MiguelCompany Oct 16, 2023
f1a299f
Refs #19435. Account for default values in threading_pthread
MiguelCompany Oct 16, 2023
393617a
Refs #19435. Account for default values in threading_osx
MiguelCompany Oct 16, 2023
2383e86
Refs #19435. Account for default values in threading_win32
MiguelCompany Oct 16, 2023
76f12a4
Refs #19435. Linters.
MiguelCompany Oct 16, 2023
0c5d7d7
Refs #19435. Use C++ headers.
MiguelCompany Oct 16, 2023
2f7faec
Refs #19435. Documentation updates.
MiguelCompany Oct 16, 2023
68aac31
Refs #19435. Suggestions on Log test.
MiguelCompany Oct 16, 2023
a7ef83c
Refs #19435. Removed unused overload of create_thread.
MiguelCompany Oct 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/fastdds/rtps/attributes/ThreadSettings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ struct RTPS_DllAPI ThreadSettings
int32_t priority = 0;

/**
* @brief The thread's core affinity.
* @brief The thread's affinity.
*
* cpu_mask is a bit mask for setting the threads affinity to each core individually.
* On some systems, this is a bit mask for setting the threads affinity to each core individually.
EduPonz marked this conversation as resolved.
Show resolved Hide resolved
* A value of 0 indicates no particular affinity.
*
* This value is platform specific and it is used as-is to configure the specific platform thread.
* Setting this value to something other than the default one may require different privileges
* on different platforms.
*/
uint32_t cpu_mask = 0;
uint64_t affinity = 0;

/**
* @brief The thread's stack size in bytes.
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/attributes/ThreadSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ bool ThreadSettings::operator ==(
{
return (scheduling_policy == rhs.scheduling_policy &&
priority == rhs.priority &&
cpu_mask == rhs.cpu_mask &&
affinity == rhs.affinity &&
stack_size == rhs.stack_size);
}

Expand Down
15 changes: 10 additions & 5 deletions src/cpp/utils/threading/threading_osx.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <limits>

#include <pthread.h>
#include <string.h>
#include <stdio.h>
Expand Down Expand Up @@ -103,18 +105,21 @@ static void configure_current_thread_scheduler(
}

static void configure_current_thread_affinity(
uint32_t affinity_mask)
uint64_t affinity)
{
thread_affinity_policy_data_t policy = { m_affinityMask };
pthread_t self_tid = pthread_self();
thread_policy_set(pthread_mach_thread_np(self_tid), THREAD_AFFINITY_POLICY, (thread_policy_t)&policy, 1);
if (affinity <= static_cast<uint64_t>(std::numeric_limits<integer_t>::max()))
{
thread_affinity_policy_data_t policy = { static_cast<integer_t>(affinity) };
pthread_t self_tid = pthread_self();
thread_policy_set(pthread_mach_thread_np(self_tid), THREAD_AFFINITY_POLICY, (thread_policy_t)&policy, 1);
}
}

void apply_thread_settings_to_current_thread(
const fastdds::rtps::ThreadSettings& settings)
{
configure_current_thread_scheduler(settings.scheduling_policy, settings.priority);
configure_current_thread_affinity(settings.cpu_mask);
configure_current_thread_affinity(settings.affinity);
}

} // namespace eprosima
21 changes: 13 additions & 8 deletions src/cpp/utils/threading/threading_pthread.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ static void configure_current_thread_scheduler(
}

static void configure_current_thread_affinity(
uint32_t affinity_mask)
{
uint64_t affinity_mask)
{
int a;
int result;
int cpu_count;
cpu_set_t cpu_set;
cpu_set_t cpu_set;
pthread_t self_tid = pthread_self();

result = 0;
Expand All @@ -129,16 +129,21 @@ static void configure_current_thread_affinity(
// We only consider up to the total number of CPU's the
// system has.
//

cpu_count = get_nprocs_conf();

for(a = 0; a < cpu_count; a++)
{
if(affinity_mask & (1 << a))
{
if(0 != (affinity_mask & 1))
{
CPU_SET(a, &cpu_set);
result++;
}
affinity_mask >>= 1;
}

if (affinity_mask > 0)
{
EPROSIMA_LOG_ERROR(SYSTEM, "Affinity mask has more processors than the ones present in the system");
}

if(result > 0)
Expand All @@ -156,7 +161,7 @@ void apply_thread_settings_to_current_thread(
const fastdds::rtps::ThreadSettings& settings)
{
configure_current_thread_scheduler(settings.scheduling_policy, settings.priority);
configure_current_thread_affinity(settings.cpu_mask);
configure_current_thread_affinity(settings.affinity);
}

} // namespace eprosima
8 changes: 4 additions & 4 deletions test/unittest/dds/participant/ParticipantTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3002,23 +3002,23 @@ TEST(ParticipantTests, UpdatableDomainParticipantQos)

// Check that the builtin_controllers_sender_thread can not be changed in an enabled participant
participant->get_qos(pqos);
pqos.builtin_controllers_sender_thread().cpu_mask = 1;
pqos.builtin_controllers_sender_thread().affinity = 1;
ASSERT_EQ(participant->set_qos(pqos), ReturnCode_t::RETCODE_IMMUTABLE_POLICY);

// Check that the timed_events_thread can not be changed in an enabled participant
participant->get_qos(pqos);
pqos.timed_events_thread().cpu_mask = 1;
pqos.timed_events_thread().affinity = 1;
ASSERT_EQ(participant->set_qos(pqos), ReturnCode_t::RETCODE_IMMUTABLE_POLICY);

// Check that the discovery_server_thread can not be changed in an enabled participant
participant->get_qos(pqos);
pqos.discovery_server_thread().cpu_mask = 1;
pqos.discovery_server_thread().affinity = 1;
ASSERT_EQ(participant->set_qos(pqos), ReturnCode_t::RETCODE_IMMUTABLE_POLICY);

#if HAVE_SECURITY
// Check that the security_log_thread can not be changed in an enabled participant
participant->get_qos(pqos);
pqos.security_log_thread().cpu_mask = 1;
pqos.security_log_thread().affinity = 1;
ASSERT_EQ(participant->set_qos(pqos), ReturnCode_t::RETCODE_IMMUTABLE_POLICY);

ASSERT_EQ(DomainParticipantFactory::get_instance()->delete_participant(participant), ReturnCode_t::RETCODE_OK);
Expand Down
2 changes: 1 addition & 1 deletion test/unittest/logging/LogTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ TEST_F(LogTests, thread_config)
// Set thread settings
eprosima::fastdds::rtps::ThreadSettings thr_settings{};
#if defined(_POSIX_SOURCE)
thr_settings.cpu_mask = 3;
thr_settings.affinity = 3;
thr_settings.scheduling_policy = SCHED_OTHER;
thr_settings.priority = 1;
#endif // if defined(_POSIX_SOURCE)
Expand Down
34 changes: 17 additions & 17 deletions test/unittest/rtps/attributes/ThreadSettingsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,116 +27,116 @@ TEST(ThreadSettingsTests, EqualOperators)
// Fixed scheduling_policy cases
settings_2.scheduling_policy = settings_1.scheduling_policy;
settings_2.priority = settings_1.priority + 1;
settings_2.cpu_mask = settings_1.cpu_mask;
settings_2.affinity = settings_1.affinity;
settings_2.stack_size = settings_1.stack_size;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

settings_2.scheduling_policy = settings_1.scheduling_policy;
settings_2.priority = settings_1.priority;
settings_2.cpu_mask = settings_1.cpu_mask + 1;
settings_2.affinity = settings_1.affinity + 1;
settings_2.stack_size = settings_1.stack_size;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

settings_2.scheduling_policy = settings_1.scheduling_policy;
settings_2.priority = settings_1.priority;
settings_2.cpu_mask = settings_1.cpu_mask;
settings_2.affinity = settings_1.affinity;
settings_2.stack_size = settings_1.stack_size + 1;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

settings_2.scheduling_policy = settings_1.scheduling_policy;
settings_2.priority = settings_1.priority + 1;
settings_2.cpu_mask = settings_1.cpu_mask + 1;
settings_2.affinity = settings_1.affinity + 1;
settings_2.stack_size = settings_1.stack_size;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

settings_2.scheduling_policy = settings_1.scheduling_policy;
settings_2.priority = settings_1.priority;
settings_2.cpu_mask = settings_1.cpu_mask + 1;
settings_2.affinity = settings_1.affinity + 1;
settings_2.stack_size = settings_1.stack_size + 1;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

settings_2.scheduling_policy = settings_1.scheduling_policy;
settings_2.priority = settings_1.priority + 1;
settings_2.cpu_mask = settings_1.cpu_mask;
settings_2.affinity = settings_1.affinity;
settings_2.stack_size = settings_1.stack_size + 1;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

settings_2.scheduling_policy = settings_1.scheduling_policy;
settings_2.priority = settings_1.priority + 1;
settings_2.cpu_mask = settings_1.cpu_mask + 1;
settings_2.affinity = settings_1.affinity + 1;
settings_2.stack_size = settings_1.stack_size + 1;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

// Fixed priority cases (not already covered)
settings_2.scheduling_policy = settings_1.scheduling_policy + 1;
settings_2.priority = settings_1.priority;
settings_2.cpu_mask = settings_1.cpu_mask;
settings_2.affinity = settings_1.affinity;
settings_2.stack_size = settings_1.stack_size;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

settings_2.scheduling_policy = settings_1.scheduling_policy + 1;
settings_2.priority = settings_1.priority;
settings_2.cpu_mask = settings_1.cpu_mask + 1;
settings_2.affinity = settings_1.affinity + 1;
settings_2.stack_size = settings_1.stack_size;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

settings_2.scheduling_policy = settings_1.scheduling_policy + 1;
settings_2.priority = settings_1.priority;
settings_2.cpu_mask = settings_1.cpu_mask;
settings_2.affinity = settings_1.affinity;
settings_2.stack_size = settings_1.stack_size + 1;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

settings_2.scheduling_policy = settings_1.scheduling_policy + 1;
settings_2.priority = settings_1.priority;
settings_2.cpu_mask = settings_1.cpu_mask + 1;
settings_2.affinity = settings_1.affinity + 1;
settings_2.stack_size = settings_1.stack_size + 1;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

// Fixed cpu_mask cases (not already covered)
// Fixed affinity cases (not already covered)
settings_2.scheduling_policy = settings_1.scheduling_policy + 1;
settings_2.priority = settings_1.priority + 1;
settings_2.cpu_mask = settings_1.cpu_mask;
settings_2.affinity = settings_1.affinity;
settings_2.stack_size = settings_1.stack_size;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

settings_2.scheduling_policy = settings_1.scheduling_policy + 1;
settings_2.priority = settings_1.priority + 1;
settings_2.cpu_mask = settings_1.cpu_mask;
settings_2.affinity = settings_1.affinity;
settings_2.stack_size = settings_1.stack_size + 1;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

// Fixed stack_size cases (not already covered)
settings_2.scheduling_policy = settings_1.scheduling_policy + 1;
settings_2.priority = settings_1.priority + 1;
settings_2.cpu_mask = settings_1.cpu_mask + 1;
settings_2.affinity = settings_1.affinity + 1;
settings_2.stack_size = settings_1.stack_size;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

settings_2.scheduling_policy = settings_1.scheduling_policy + 1;
settings_2.priority = settings_1.priority + 1;
settings_2.cpu_mask = settings_1.cpu_mask;
settings_2.affinity = settings_1.affinity;
settings_2.stack_size = settings_1.stack_size + 1;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);

// All different
settings_2.scheduling_policy = settings_1.scheduling_policy + 1;
settings_2.priority = settings_1.priority + 1;
settings_2.cpu_mask = settings_1.cpu_mask + 1;
settings_2.affinity = settings_1.affinity + 1;
settings_2.stack_size = settings_1.stack_size + 1;
ASSERT_FALSE(settings_1 == settings_2);
ASSERT_TRUE(settings_1 != settings_2);
Expand Down
24 changes: 12 additions & 12 deletions test/unittest/transport/PortBasedTransportDescriptorTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ TEST_F(PortBasedTransportDescriptorTests, get_thread_config_for_port)
PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings;
set_settings[1234].scheduling_policy = 33;
set_settings[1234].priority = 33;
set_settings[1234].cpu_mask = 33;
set_settings[1234].affinity = 33;
set_settings[1234].stack_size = 33;

ASSERT_TRUE(reception_threads(set_settings));
Expand All @@ -86,7 +86,7 @@ TEST_F(PortBasedTransportDescriptorTests, set_thread_config_for_port)
PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings;
set_settings[1234].scheduling_policy = 33;
set_settings[1234].priority = 33;
set_settings[1234].cpu_mask = 33;
set_settings[1234].affinity = 33;
set_settings[1234].stack_size = 33;
ASSERT_TRUE(reception_threads(set_settings));

Expand Down Expand Up @@ -116,7 +116,7 @@ TEST_F(PortBasedTransportDescriptorTests, set_default_reception_threads)
ThreadSettings set_settings;
set_settings.scheduling_policy = 33;
set_settings.priority = 33;
set_settings.cpu_mask = 33;
set_settings.affinity = 33;
set_settings.stack_size = 33;

ASSERT_NE(initial_settings, set_settings);
Expand All @@ -137,7 +137,7 @@ TEST_F(PortBasedTransportDescriptorTests, set_reception_threads)
PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings;
set_settings[1234].scheduling_policy = 33;
set_settings[1234].priority = 33;
set_settings[1234].cpu_mask = 33;
set_settings[1234].affinity = 33;
set_settings[1234].stack_size = 33;

ASSERT_NE(initial_settings, set_settings);
Expand Down Expand Up @@ -170,7 +170,7 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator)
ThreadSettings set_settings;
set_settings.scheduling_policy = 33;
set_settings.priority = 33;
set_settings.cpu_mask = 33;
set_settings.affinity = 33;
set_settings.stack_size = 33;
other.default_reception_threads(set_settings);
ASSERT_FALSE(*this == other);
Expand All @@ -185,7 +185,7 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator)
PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings_map;
set_settings_map[1234].scheduling_policy = 33;
set_settings_map[1234].priority = 33;
set_settings_map[1234].cpu_mask = 33;
set_settings_map[1234].affinity = 33;
set_settings_map[1234].stack_size = 33;
ASSERT_TRUE(other.reception_threads(set_settings_map));
ASSERT_FALSE(*this == other);
Expand All @@ -202,7 +202,7 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator)
ThreadSettings set_settings;
set_settings.scheduling_policy = 33;
set_settings.priority = 33;
set_settings.cpu_mask = 33;
set_settings.affinity = 33;
set_settings.stack_size = 33;
other.default_reception_threads(set_settings);

Expand All @@ -220,7 +220,7 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator)
PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings_map;
set_settings_map[1234].scheduling_policy = 33;
set_settings_map[1234].priority = 33;
set_settings_map[1234].cpu_mask = 33;
set_settings_map[1234].affinity = 33;
set_settings_map[1234].stack_size = 33;
ASSERT_TRUE(other.reception_threads(set_settings_map));

Expand All @@ -236,14 +236,14 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator)
ThreadSettings set_settings;
set_settings.scheduling_policy = 33;
set_settings.priority = 33;
set_settings.cpu_mask = 33;
set_settings.affinity = 33;
set_settings.stack_size = 33;
other.default_reception_threads(set_settings);

PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings_map;
set_settings_map[1234].scheduling_policy = 33;
set_settings_map[1234].priority = 33;
set_settings_map[1234].cpu_mask = 33;
set_settings_map[1234].affinity = 33;
set_settings_map[1234].stack_size = 33;
ASSERT_TRUE(other.reception_threads(set_settings_map));

Expand All @@ -261,14 +261,14 @@ TEST_F(PortBasedTransportDescriptorTests, equal_operator)
ThreadSettings set_settings;
set_settings.scheduling_policy = 33;
set_settings.priority = 33;
set_settings.cpu_mask = 33;
set_settings.affinity = 33;
set_settings.stack_size = 33;
other.default_reception_threads(set_settings);

PortBasedTransportDescriptor::ReceptionThreadsConfigMap set_settings_map;
set_settings_map[1234].scheduling_policy = 33;
set_settings_map[1234].priority = 33;
set_settings_map[1234].cpu_mask = 33;
set_settings_map[1234].affinity = 33;
set_settings_map[1234].stack_size = 33;
ASSERT_TRUE(other.reception_threads(set_settings_map));

Expand Down