Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[19375] Setting infraestructure for naming threads #3821

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions include/fastdds/rtps/resources/ResourceEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
#include <fastrtps/utils/TimedMutex.hpp>
#include <fastrtps/utils/TimedConditionVariable.hpp>

#include <thread>
#include <atomic>
#include <functional>
#include <thread>
#include <vector>

namespace eprosima {
Expand All @@ -49,8 +50,12 @@ class ResourceEvent

/*!
* @brief Method to initialize the internal thread.
*
* @param[in] configure_cb Function to be called in the context of the started thread
* before calling the internal service routine.
*/
void init_thread();
void init_thread(
std::function<void()> configure_cb = {});

void stop_thread();

Expand Down
3 changes: 3 additions & 0 deletions src/cpp/fastdds/log/Log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fastdds/dds/log/StdoutErrConsumer.hpp>
#include <fastdds/dds/log/Colors.hpp>
#include <utils/SystemInfo.hpp>
#include <utils/threading.hpp>

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -258,6 +259,8 @@ struct LogResources

void run()
{
set_name_to_current_thread("dds.log");

std::unique_lock<std::mutex> guard(cv_mutex_);

while (logging_)
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/DataSharing/DataSharingListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <rtps/DataSharing/DataSharingListener.hpp>
#include <fastdds/rtps/reader/RTPSReader.h>
#include <utils/threading.hpp>

#include <memory>
#include <mutex>
Expand Down Expand Up @@ -49,6 +50,8 @@ DataSharingListener::~DataSharingListener()

void DataSharingListener::run()
{
set_name_to_current_thread("dds.dsha.%u", reader_->getGuid().entityId.to_uint32() & 0x0000FFFF);

std::unique_lock<Segment::mutex> lock(notification_->notification_->notification_mutex, std::defer_lock);
while (is_running_.load())
{
Expand Down
8 changes: 7 additions & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@

#include <rtps/builtin/discovery/database/backup/SharedBackupFunctions.hpp>

#include <utils/threading.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {
Expand Down Expand Up @@ -160,7 +162,11 @@ bool PDPServer::init(
getRTPSParticipant()->enableReader(edp->publications_reader_.first);

// Initialize server dedicated thread.
resource_event_thread_.init_thread();
uint32_t id_for_thread = static_cast<uint32_t>(getRTPSParticipant()->getRTPSParticipantAttributes().participantID);
resource_event_thread_.init_thread([id_for_thread]()
{
set_name_to_current_thread("dds.ds_ev.%u", id_for_thread);
});

/*
Given the fact that a participant is either a client or a server the
Expand Down
26 changes: 14 additions & 12 deletions src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,26 @@ void FlowControllerFactory::init(
pure_sync_flow_controller_name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerPureSyncPublishMode,
FlowControllerFifoSchedule>(participant_, nullptr))));
FlowControllerFifoSchedule>(participant_, nullptr, 0))));
// SyncFlowController -> used by rest of besteffort writers.
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
sync_flow_controller_name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerSyncPublishMode,
JesusPoderoso marked this conversation as resolved.
Show resolved Hide resolved
FlowControllerFifoSchedule>(participant_, nullptr))));
FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++))));
// AsyncFlowController
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
async_flow_controller_name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerFifoSchedule>(participant_, nullptr))));
FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++))));

#ifdef FASTDDS_STATISTICS
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
async_statistics_flow_controller_name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerFifoSchedule>(participant_, nullptr))));
FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++))));
#endif // ifndef FASTDDS_STATISTICS
}

Expand All @@ -67,31 +67,32 @@ void FlowControllerFactory::register_flow_controller (
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerLimitedAsyncPublishMode,
FlowControllerFifoSchedule>(participant_, &flow_controller_descr))));
FlowControllerFifoSchedule>(participant_,
&flow_controller_descr, async_controller_index_++))));
break;
case FlowControllerSchedulerPolicy::ROUND_ROBIN:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerLimitedAsyncPublishMode,
FlowControllerRoundRobinSchedule>(participant_,
&flow_controller_descr))));
&flow_controller_descr, async_controller_index_++))));
break;
case FlowControllerSchedulerPolicy::HIGH_PRIORITY:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerLimitedAsyncPublishMode,
FlowControllerHighPrioritySchedule>(participant_,
&flow_controller_descr))));
&flow_controller_descr, async_controller_index_++))));
break;
case FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerLimitedAsyncPublishMode,
FlowControllerPriorityWithReservationSchedule>(participant_,
&flow_controller_descr))));
&flow_controller_descr, async_controller_index_++))));
break;
default:
assert(false);
Expand All @@ -106,31 +107,32 @@ void FlowControllerFactory::register_flow_controller (
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerFifoSchedule>(participant_, &flow_controller_descr))));
FlowControllerFifoSchedule>(participant_,
&flow_controller_descr, async_controller_index_++))));
break;
case FlowControllerSchedulerPolicy::ROUND_ROBIN:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerRoundRobinSchedule>(participant_,
&flow_controller_descr))));
&flow_controller_descr, async_controller_index_++))));
break;
case FlowControllerSchedulerPolicy::HIGH_PRIORITY:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerHighPrioritySchedule>(participant_,
&flow_controller_descr))));
&flow_controller_descr, async_controller_index_++))));
break;
case FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerPriorityWithReservationSchedule>(participant_,
&flow_controller_descr))));
&flow_controller_descr, async_controller_index_++))));
break;
default:
assert(false);
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/flowcontrol/FlowControllerFactory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class FlowControllerFactory
//! Stores the created flow controllers.
std::map<std::string, std::unique_ptr<FlowController>> flow_controllers_;

//! Counter used for thread identification
uint32_t async_controller_index_ = 0;

};

} // namespace rtps
Expand Down
17 changes: 16 additions & 1 deletion src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "FlowController.hpp"
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/writer/RTPSWriter.h>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <utils/threading.hpp>

#include <atomic>
#include <cassert>
Expand Down Expand Up @@ -926,11 +928,19 @@ class FlowControllerImpl : public FlowController

FlowControllerImpl(
fastrtps::rtps::RTPSParticipantImpl* participant,
const FlowControllerDescriptor* descriptor
const FlowControllerDescriptor* descriptor,
uint32_t async_index
)
: participant_(participant)
, async_mode(participant, descriptor)
, participant_id_(0)
, async_index_(async_index)
{
if (nullptr != participant)
{
participant_id_ = static_cast<uint32_t>(participant->getRTPSParticipantAttributes().participantID);
}

uint32_t limitation = get_max_payload();

if (std::numeric_limits<uint32_t>::max() != limitation)
Expand Down Expand Up @@ -1257,6 +1267,8 @@ class FlowControllerImpl : public FlowController
*/
void run()
{
set_name_to_current_thread("dds.asyn.%u.%u", participant_id_, async_index_);

while (async_mode.running)
{
// There are writers interested in removing a sample.
Expand Down Expand Up @@ -1399,6 +1411,9 @@ class FlowControllerImpl : public FlowController

// async_mode must be destroyed before sched.
publish_mode async_mode;

uint32_t participant_id_ = 0;
uint32_t async_index_ = 0;
};

} // namespace rtps
Expand Down
22 changes: 20 additions & 2 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/persistence/PersistenceService.h>
#include <statistics/rtps/GuidUtils.hpp>
#include <utils/threading.hpp>

#if HAVE_SECURITY
#include <security/logging/LogTopic.h>
#endif // HAVE_SECURITY

namespace eprosima {
namespace fastrtps {
Expand Down Expand Up @@ -137,7 +142,7 @@ RTPSParticipantImpl::RTPSParticipantImpl(
, internal_metatraffic_locators_(false)
, internal_default_locators_(false)
#if HAVE_SECURITY
, m_security_manager(this)
, m_security_manager(this, *this)
#endif // if HAVE_SECURITY
, mp_participantListener(plisten)
, mp_userParticipant(par)
Expand Down Expand Up @@ -239,7 +244,11 @@ RTPSParticipantImpl::RTPSParticipantImpl(
}

mp_userParticipant->mp_impl = this;
mp_event_thr.init_thread();
uint32_t id_for_thread = static_cast<uint32_t>(m_att.participantID);
mp_event_thr.init_thread([id_for_thread]()
{
set_name_to_current_thread("dds.ev.%u", id_for_thread);
});

if (!networkFactoryHasRegisteredTransports())
{
Expand Down Expand Up @@ -2186,6 +2195,15 @@ bool RTPSParticipantImpl::is_security_enabled_for_reader(
return false;
}

security::Logging* RTPSParticipantImpl::create_builtin_logging_plugin()
{
return new security::LogTopic([this]()
{
uint32_t participant_id = static_cast<uint32_t>(m_att.participantID);
set_name_to_current_thread("dds.slog.%u", participant_id);
});
}

#endif // if HAVE_SECURITY

PDP* RTPSParticipantImpl::pdp()
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include <fastdds/rtps/Endpoint.h>
#include <fastdds/rtps/security/accesscontrol/ParticipantSecurityAttributes.h>
#include <rtps/security/SecurityManager.h>
#include <rtps/security/SecurityPluginFactory.h>
#endif // if HAVE_SECURITY

namespace eprosima {
Expand Down Expand Up @@ -106,6 +107,9 @@ class WLP;
*/
class RTPSParticipantImpl
: public fastdds::statistics::StatisticsParticipantImpl
#if HAVE_SECURITY
, private security::SecurityPluginFactory
#endif // if HAVE_SECURITY
{
/*
Receiver Control block is a struct we use to encapsulate the resources that take part in message reception.
Expand Down Expand Up @@ -398,6 +402,8 @@ class RTPSParticipantImpl
bool is_security_enabled_for_reader(
const ReaderAttributes& reader_attributes);

security::Logging* create_builtin_logging_plugin() override;

#endif // if HAVE_SECURITY

PDPSimple* pdpsimple();
Expand Down
12 changes: 10 additions & 2 deletions src/cpp/rtps/resources/ResourceEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,23 @@ void ResourceEvent::do_timer_actions()
}
}

void ResourceEvent::init_thread()
void ResourceEvent::init_thread(
std::function<void()> configure_cb)
{
std::lock_guard<TimedMutex> lock(mutex_);

allow_vector_manipulation_ = false;
stop_.store(false);
resize_collections();

thread_ = std::thread(&ResourceEvent::event_service, this);
thread_ = std::thread([this, configure_cb]()
{
if (configure_cb)
{
configure_cb();
}
event_service();
});
}

} /* namespace rtps */
Expand Down
Loading