Skip to content

Commit

Permalink
Refs #19377: Add ThreadSetting to DomainParticipantQos for builtin fl…
Browse files Browse the repository at this point in the history
…ow controllers

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>
  • Loading branch information
EduPonz committed Sep 21, 2023
1 parent 5b4b305 commit f82f4fa
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 30 deletions.
34 changes: 34 additions & 0 deletions include/fastdds/dds/domain/qos/DomainParticipantQos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,37 @@ class DomainParticipantQos
return flow_controllers_;
}

/**
* Getter for builtin flow controllers sender threads ThreadSettings
*
* @return rtps::ThreadSettings reference
*/
rtps::ThreadSettings& builtin_controllers_sender_thread()
{
return builtin_controllers_sender_thread_;
}

/**
* Getter for builtin flow controllers sender threads ThreadSettings
*
* @return rtps::ThreadSettings reference
*/
const rtps::ThreadSettings& builtin_controllers_sender_thread() const
{
return builtin_controllers_sender_thread_;
}

/**
* Setter for the builtin flow controllers sender threads ThreadSettings
*
* @param value New ThreadSettings to be set
*/
void builtin_controllers_sender_thread(
const rtps::ThreadSettings& value)
{
builtin_controllers_sender_thread_ = value;
}

/**
* Getter for timed event ThreadSettings
*
Expand Down Expand Up @@ -447,6 +478,9 @@ class DomainParticipantQos
*/
FlowControllerDescriptorList flow_controllers_;

//! Thread settings for the builtin flow controllers sender threads
rtps::ThreadSettings builtin_controllers_sender_thread_;

//! Thread settings for the timed events thread
rtps::ThreadSettings timed_events_thread_;

Expand Down
14 changes: 10 additions & 4 deletions include/fastdds/rtps/attributes/RTPSParticipantAttributes.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
#ifndef _FASTDDS_RTPSPARTICIPANTPARAMETERS_H_
#define _FASTDDS_RTPSPARTICIPANTPARAMETERS_H_

#include <memory>
#include <sstream>

#include <fastdds/rtps/attributes/ExternalLocators.hpp>
#include <fastdds/rtps/attributes/PropertyPolicy.h>
#include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
#include <fastdds/rtps/attributes/ServerAttributes.h>
#include <fastdds/rtps/attributes/ThreadSettings.hpp>
#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/common/PortParameters.h>
#include <fastdds/rtps/common/Time_t.h>
Expand All @@ -31,11 +35,9 @@
#include <fastdds/rtps/flowcontrol/ThroughputControllerDescriptor.h>
#include <fastdds/rtps/resources/ResourceManagement.h>
#include <fastdds/rtps/transport/TransportInterface.h>
#include <fastrtps/fastrtps_dll.h>
#include <fastrtps/utils/fixed_size_string.hpp>

#include <memory>
#include <sstream>

namespace eprosima {

namespace fastdds {
Expand Down Expand Up @@ -484,7 +486,8 @@ class RTPSParticipantAttributes
(this->useBuiltinTransports == b.useBuiltinTransports) &&
(this->properties == b.properties) &&
(this->prefix == b.prefix) &&
(this->flow_controllers == b.flow_controllers);
(this->flow_controllers == b.flow_controllers) &&
(this->builtin_controllers_sender_thread == b.builtin_controllers_sender_thread);
}

/**
Expand Down Expand Up @@ -576,6 +579,9 @@ class RTPSParticipantAttributes
//! Flow controllers.
FlowControllerDescriptorList flow_controllers;

//! Thread settings for the builtin flow controllers sender threads
fastdds::rtps::ThreadSettings builtin_controllers_sender_thread;

private:

//! Name of the participant.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ struct FlowControllerDescriptor
uint64_t period_ms = 100;

//! Thread settings for the sender thread
ThreadSettings sender_thread_;
ThreadSettings sender_thread;

};

Expand Down
1 change: 1 addition & 0 deletions src/cpp/fastdds/utils/QosConverters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ void set_attributes_from_qos(
attr.listenSocketBufferSize = qos.transport().listen_socket_buffer_size;
attr.userData = qos.user_data().data_vec();
attr.flow_controllers = qos.flow_controllers();
attr.builtin_controllers_sender_thread = qos.builtin_controllers_sender_thread();
}

void set_qos_from_attributes(
Expand Down
33 changes: 21 additions & 12 deletions src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,38 @@ void FlowControllerFactory::init(
participant_ = participant;
// Create default flow controllers.

const ThreadSettings& sender_thread_settings =
(nullptr == participant_) ? ThreadSettings{}
: participant_->getAttributes().builtin_controllers_sender_thread;

// PureSyncFlowController -> used by volatile besteffort writers.
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
pure_sync_flow_controller_name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerPureSyncPublishMode,
FlowControllerFifoSchedule>(participant_, nullptr, 0))));
FlowControllerFifoSchedule>(participant_, nullptr, 0, sender_thread_settings))));
// SyncFlowController -> used by rest of besteffort writers.
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
sync_flow_controller_name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerSyncPublishMode,
FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++))));
FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++,
sender_thread_settings))));
// AsyncFlowController
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
async_flow_controller_name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++))));
FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++,
sender_thread_settings))));

#ifdef FASTDDS_STATISTICS
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
async_statistics_flow_controller_name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++))));
FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++,
sender_thread_settings))));
#endif // ifndef FASTDDS_STATISTICS
}

Expand All @@ -58,6 +65,8 @@ void FlowControllerFactory::register_flow_controller (
return;
}

const ThreadSettings& sender_thread_settings = flow_controller_descr.sender_thread;

if (0 < flow_controller_descr.max_bytes_per_period)
{
switch (flow_controller_descr.scheduler)
Expand All @@ -68,31 +77,31 @@ void FlowControllerFactory::register_flow_controller (
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerLimitedAsyncPublishMode,
FlowControllerFifoSchedule>(participant_,
&flow_controller_descr, async_controller_index_++))));
&flow_controller_descr, async_controller_index_++, sender_thread_settings))));
break;
case FlowControllerSchedulerPolicy::ROUND_ROBIN:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerLimitedAsyncPublishMode,
FlowControllerRoundRobinSchedule>(participant_,
&flow_controller_descr, async_controller_index_++))));
&flow_controller_descr, async_controller_index_++, sender_thread_settings))));
break;
case FlowControllerSchedulerPolicy::HIGH_PRIORITY:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerLimitedAsyncPublishMode,
FlowControllerHighPrioritySchedule>(participant_,
&flow_controller_descr, async_controller_index_++))));
&flow_controller_descr, async_controller_index_++, sender_thread_settings))));
break;
case FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerLimitedAsyncPublishMode,
FlowControllerPriorityWithReservationSchedule>(participant_,
&flow_controller_descr, async_controller_index_++))));
&flow_controller_descr, async_controller_index_++, sender_thread_settings))));
break;
default:
assert(false);
Expand All @@ -108,31 +117,31 @@ void FlowControllerFactory::register_flow_controller (
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerFifoSchedule>(participant_,
&flow_controller_descr, async_controller_index_++))));
&flow_controller_descr, async_controller_index_++, sender_thread_settings))));
break;
case FlowControllerSchedulerPolicy::ROUND_ROBIN:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerRoundRobinSchedule>(participant_,
&flow_controller_descr, async_controller_index_++))));
&flow_controller_descr, async_controller_index_++, sender_thread_settings))));
break;
case FlowControllerSchedulerPolicy::HIGH_PRIORITY:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerHighPrioritySchedule>(participant_,
&flow_controller_descr, async_controller_index_++))));
&flow_controller_descr, async_controller_index_++, sender_thread_settings))));
break;
case FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION:
flow_controllers_.insert(decltype(flow_controllers_)::value_type(
flow_controller_descr.name,
std::unique_ptr<FlowController>(
new FlowControllerImpl<FlowControllerAsyncPublishMode,
FlowControllerPriorityWithReservationSchedule>(participant_,
&flow_controller_descr, async_controller_index_++))));
&flow_controller_descr, async_controller_index_++, sender_thread_settings))));
break;
default:
assert(false);
Expand Down
9 changes: 7 additions & 2 deletions src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <unordered_map>

#include "FlowController.hpp"
#include <fastdds/rtps/attributes/ThreadSettings.hpp>
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/writer/RTPSWriter.h>
#include <fastrtps/utils/TimedConditionVariable.hpp>
Expand Down Expand Up @@ -930,12 +931,13 @@ class FlowControllerImpl : public FlowController
FlowControllerImpl(
fastrtps::rtps::RTPSParticipantImpl* participant,
const FlowControllerDescriptor* descriptor,
uint32_t async_index
)
uint32_t async_index,
ThreadSettings thread_settings)
: participant_(participant)
, async_mode(participant, descriptor)
, participant_id_(0)
, async_index_(async_index)
, thread_settings_(thread_settings)
{
if (nullptr != participant)
{
Expand Down Expand Up @@ -1486,6 +1488,9 @@ class FlowControllerImpl : public FlowController

uint32_t participant_id_ = 0;
uint32_t async_index_ = 0;

//! Thread settings for the sender thread
ThreadSettings thread_settings_;
};

} // namespace rtps
Expand Down
16 changes: 15 additions & 1 deletion test/unittest/rtps/flowcontrol/FlowControllerFactoryTests.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <rtps/flowcontrol/FlowControllerFactory.hpp>
#include <rtps/flowcontrol//FlowControllerImpl.hpp>
#include <rtps/flowcontrol/FlowControllerImpl.hpp>

#include <gtest/gtest.h>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <fastdds/rtps/attributes/ThreadSettings.hpp>

#include "FlowControllerPublishModesTests.hpp"

using namespace eprosima::fastdds::rtps;
Expand All @@ -7,7 +23,7 @@ TYPED_TEST(FlowControllerPublishModes, async_publish_mode)
{
FlowControllerDescriptor flow_controller_descr;
FlowControllerImpl<FlowControllerAsyncPublishMode, TypeParam> async(nullptr,
&flow_controller_descr, 0);
&flow_controller_descr, 0, ThreadSettings{});
async.init();

// Instantiate writers.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <fastdds/rtps/attributes/ThreadSettings.hpp>

#include "FlowControllerPublishModesTests.hpp"

using namespace eprosima::fastdds::rtps;
Expand Down Expand Up @@ -28,7 +44,7 @@ TYPED_TEST(FlowControllerPublishModes, limited_async_publish_mode)
flow_controller_descr.max_bytes_per_period = 10200;
flow_controller_descr.period_ms = 10;
FlowControllerImpl<FlowControllerLimitedAsyncPublishModeMock, TypeParam> async(nullptr,
&flow_controller_descr, 0);
&flow_controller_descr, 0, ThreadSettings{});
async.init();

// Instantiate writers.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <fastdds/rtps/attributes/ThreadSettings.hpp>

#include "FlowControllerPublishModesTests.hpp"

using namespace eprosima::fastdds::rtps;
Expand All @@ -7,7 +23,7 @@ TYPED_TEST(FlowControllerPublishModes, pure_sync_publish_mode)
{
FlowControllerDescriptor flow_controller_descr;
FlowControllerImpl<FlowControllerPureSyncPublishMode, TypeParam> pure_sync(nullptr,
&flow_controller_descr, 0);
&flow_controller_descr, 0, ThreadSettings{});
pure_sync.init();

// Initialize callback to get info.
Expand Down
Loading

0 comments on commit f82f4fa

Please sign in to comment.