diff --git a/include/fastdds/dds/domain/qos/DomainParticipantQos.hpp b/include/fastdds/dds/domain/qos/DomainParticipantQos.hpp index d8d690d4e1e..9e26bd10a8c 100644 --- a/include/fastdds/dds/domain/qos/DomainParticipantQos.hpp +++ b/include/fastdds/dds/domain/qos/DomainParticipantQos.hpp @@ -325,6 +325,37 @@ class DomainParticipantQos return flow_controllers_; } + /** + * Getter for builtin flow controllers sender threads ThreadSettings + * + * @return rtps::ThreadSettings reference + */ + rtps::ThreadSettings& builtin_controllers_sender_thread() + { + return builtin_controllers_sender_thread_; + } + + /** + * Getter for builtin flow controllers sender threads ThreadSettings + * + * @return rtps::ThreadSettings reference + */ + const rtps::ThreadSettings& builtin_controllers_sender_thread() const + { + return builtin_controllers_sender_thread_; + } + + /** + * Setter for the builtin flow controllers sender threads ThreadSettings + * + * @param value New ThreadSettings to be set + */ + void builtin_controllers_sender_thread( + const rtps::ThreadSettings& value) + { + builtin_controllers_sender_thread_ = value; + } + /** * Getter for timed event ThreadSettings * @@ -447,6 +478,9 @@ class DomainParticipantQos */ FlowControllerDescriptorList flow_controllers_; + //! Thread settings for the builtin flow controllers sender threads + rtps::ThreadSettings builtin_controllers_sender_thread_; + //! Thread settings for the timed events thread rtps::ThreadSettings timed_events_thread_; diff --git a/include/fastdds/rtps/attributes/RTPSParticipantAttributes.h b/include/fastdds/rtps/attributes/RTPSParticipantAttributes.h index b6ce937ccdf..ba7cc74eae4 100644 --- a/include/fastdds/rtps/attributes/RTPSParticipantAttributes.h +++ b/include/fastdds/rtps/attributes/RTPSParticipantAttributes.h @@ -19,10 +19,14 @@ #ifndef _FASTDDS_RTPSPARTICIPANTPARAMETERS_H_ #define _FASTDDS_RTPSPARTICIPANTPARAMETERS_H_ +#include +#include + #include #include #include #include +#include #include #include #include @@ -31,11 +35,9 @@ #include #include #include +#include #include -#include -#include - namespace eprosima { namespace fastdds { @@ -484,7 +486,8 @@ class RTPSParticipantAttributes (this->useBuiltinTransports == b.useBuiltinTransports) && (this->properties == b.properties) && (this->prefix == b.prefix) && - (this->flow_controllers == b.flow_controllers); + (this->flow_controllers == b.flow_controllers) && + (this->builtin_controllers_sender_thread == b.builtin_controllers_sender_thread); } /** @@ -576,6 +579,9 @@ class RTPSParticipantAttributes //! Flow controllers. FlowControllerDescriptorList flow_controllers; + //! Thread settings for the builtin flow controllers sender threads + fastdds::rtps::ThreadSettings builtin_controllers_sender_thread; + private: //! Name of the participant. diff --git a/include/fastdds/rtps/flowcontrol/FlowControllerDescriptor.hpp b/include/fastdds/rtps/flowcontrol/FlowControllerDescriptor.hpp index 3d9ca3a10f3..35873f7f994 100644 --- a/include/fastdds/rtps/flowcontrol/FlowControllerDescriptor.hpp +++ b/include/fastdds/rtps/flowcontrol/FlowControllerDescriptor.hpp @@ -54,7 +54,7 @@ struct FlowControllerDescriptor uint64_t period_ms = 100; //! Thread settings for the sender thread - ThreadSettings sender_thread_; + ThreadSettings sender_thread; }; diff --git a/src/cpp/fastdds/utils/QosConverters.cpp b/src/cpp/fastdds/utils/QosConverters.cpp index dbfcb7453e4..12bfbe6ae78 100644 --- a/src/cpp/fastdds/utils/QosConverters.cpp +++ b/src/cpp/fastdds/utils/QosConverters.cpp @@ -196,6 +196,7 @@ void set_attributes_from_qos( attr.listenSocketBufferSize = qos.transport().listen_socket_buffer_size; attr.userData = qos.user_data().data_vec(); attr.flow_controllers = qos.flow_controllers(); + attr.builtin_controllers_sender_thread = qos.builtin_controllers_sender_thread(); } void set_qos_from_attributes( diff --git a/src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp b/src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp index e1e3b9f5464..aa5860dfd63 100644 --- a/src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp +++ b/src/cpp/rtps/flowcontrol/FlowControllerFactory.cpp @@ -20,31 +20,38 @@ void FlowControllerFactory::init( participant_ = participant; // Create default flow controllers. + const ThreadSettings& sender_thread_settings = + (nullptr == participant_) ? ThreadSettings{} + : participant_->getAttributes().builtin_controllers_sender_thread; + // PureSyncFlowController -> used by volatile besteffort writers. flow_controllers_.insert(decltype(flow_controllers_)::value_type( pure_sync_flow_controller_name, std::unique_ptr( new FlowControllerImpl(participant_, nullptr, 0)))); + FlowControllerFifoSchedule>(participant_, nullptr, 0, sender_thread_settings)))); // SyncFlowController -> used by rest of besteffort writers. flow_controllers_.insert(decltype(flow_controllers_)::value_type( sync_flow_controller_name, std::unique_ptr( new FlowControllerImpl(participant_, nullptr, async_controller_index_++)))); + FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++, + sender_thread_settings)))); // AsyncFlowController flow_controllers_.insert(decltype(flow_controllers_)::value_type( async_flow_controller_name, std::unique_ptr( new FlowControllerImpl(participant_, nullptr, async_controller_index_++)))); + FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++, + sender_thread_settings)))); #ifdef FASTDDS_STATISTICS flow_controllers_.insert(decltype(flow_controllers_)::value_type( async_statistics_flow_controller_name, std::unique_ptr( new FlowControllerImpl(participant_, nullptr, async_controller_index_++)))); + FlowControllerFifoSchedule>(participant_, nullptr, async_controller_index_++, + sender_thread_settings)))); #endif // ifndef FASTDDS_STATISTICS } @@ -58,6 +65,8 @@ void FlowControllerFactory::register_flow_controller ( return; } + const ThreadSettings& sender_thread_settings = flow_controller_descr.sender_thread; + if (0 < flow_controller_descr.max_bytes_per_period) { switch (flow_controller_descr.scheduler) @@ -68,7 +77,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr, async_controller_index_++)))); + &flow_controller_descr, async_controller_index_++, sender_thread_settings)))); break; case FlowControllerSchedulerPolicy::ROUND_ROBIN: flow_controllers_.insert(decltype(flow_controllers_)::value_type( @@ -76,7 +85,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr, async_controller_index_++)))); + &flow_controller_descr, async_controller_index_++, sender_thread_settings)))); break; case FlowControllerSchedulerPolicy::HIGH_PRIORITY: flow_controllers_.insert(decltype(flow_controllers_)::value_type( @@ -84,7 +93,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr, async_controller_index_++)))); + &flow_controller_descr, async_controller_index_++, sender_thread_settings)))); break; case FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION: flow_controllers_.insert(decltype(flow_controllers_)::value_type( @@ -92,7 +101,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr, async_controller_index_++)))); + &flow_controller_descr, async_controller_index_++, sender_thread_settings)))); break; default: assert(false); @@ -108,7 +117,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr, async_controller_index_++)))); + &flow_controller_descr, async_controller_index_++, sender_thread_settings)))); break; case FlowControllerSchedulerPolicy::ROUND_ROBIN: flow_controllers_.insert(decltype(flow_controllers_)::value_type( @@ -116,7 +125,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr, async_controller_index_++)))); + &flow_controller_descr, async_controller_index_++, sender_thread_settings)))); break; case FlowControllerSchedulerPolicy::HIGH_PRIORITY: flow_controllers_.insert(decltype(flow_controllers_)::value_type( @@ -124,7 +133,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr, async_controller_index_++)))); + &flow_controller_descr, async_controller_index_++, sender_thread_settings)))); break; case FlowControllerSchedulerPolicy::PRIORITY_WITH_RESERVATION: flow_controllers_.insert(decltype(flow_controllers_)::value_type( @@ -132,7 +141,7 @@ void FlowControllerFactory::register_flow_controller ( std::unique_ptr( new FlowControllerImpl(participant_, - &flow_controller_descr, async_controller_index_++)))); + &flow_controller_descr, async_controller_index_++, sender_thread_settings)))); break; default: assert(false); diff --git a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp index 62515ed1ce7..0ae267ff6d3 100644 --- a/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp +++ b/src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp @@ -9,6 +9,7 @@ #include #include "FlowController.hpp" +#include #include #include #include @@ -930,12 +931,13 @@ class FlowControllerImpl : public FlowController FlowControllerImpl( fastrtps::rtps::RTPSParticipantImpl* participant, const FlowControllerDescriptor* descriptor, - uint32_t async_index - ) + uint32_t async_index, + ThreadSettings thread_settings) : participant_(participant) , async_mode(participant, descriptor) , participant_id_(0) , async_index_(async_index) + , thread_settings_(thread_settings) { if (nullptr != participant) { @@ -1486,6 +1488,9 @@ class FlowControllerImpl : public FlowController uint32_t participant_id_ = 0; uint32_t async_index_ = 0; + + //! Thread settings for the sender thread + ThreadSettings thread_settings_; }; } // namespace rtps diff --git a/test/unittest/rtps/flowcontrol/FlowControllerFactoryTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerFactoryTests.cpp index d6d142ec157..ba30728fbb0 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerFactoryTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerFactoryTests.cpp @@ -1,5 +1,19 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #include -#include +#include #include diff --git a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnAsyncTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnAsyncTests.cpp index 801d753830f..e7ae98b337c 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnAsyncTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnAsyncTests.cpp @@ -1,3 +1,19 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + #include "FlowControllerPublishModesTests.hpp" using namespace eprosima::fastdds::rtps; @@ -7,7 +23,7 @@ TYPED_TEST(FlowControllerPublishModes, async_publish_mode) { FlowControllerDescriptor flow_controller_descr; FlowControllerImpl async(nullptr, - &flow_controller_descr, 0); + &flow_controller_descr, 0, ThreadSettings{}); async.init(); // Instantiate writers. diff --git a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnLimitedAsyncTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnLimitedAsyncTests.cpp index a95e6144ec6..71781c1748c 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnLimitedAsyncTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnLimitedAsyncTests.cpp @@ -1,3 +1,19 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + #include "FlowControllerPublishModesTests.hpp" using namespace eprosima::fastdds::rtps; @@ -28,7 +44,7 @@ TYPED_TEST(FlowControllerPublishModes, limited_async_publish_mode) flow_controller_descr.max_bytes_per_period = 10200; flow_controller_descr.period_ms = 10; FlowControllerImpl async(nullptr, - &flow_controller_descr, 0); + &flow_controller_descr, 0, ThreadSettings{}); async.init(); // Instantiate writers. diff --git a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnPureSyncTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnPureSyncTests.cpp index b06ff64d6a1..b31c5baea09 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnPureSyncTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnPureSyncTests.cpp @@ -1,3 +1,19 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + #include "FlowControllerPublishModesTests.hpp" using namespace eprosima::fastdds::rtps; @@ -7,7 +23,7 @@ TYPED_TEST(FlowControllerPublishModes, pure_sync_publish_mode) { FlowControllerDescriptor flow_controller_descr; FlowControllerImpl pure_sync(nullptr, - &flow_controller_descr, 0); + &flow_controller_descr, 0, ThreadSettings{}); pure_sync.init(); // Initialize callback to get info. diff --git a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnSyncTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnSyncTests.cpp index 8c64e3940c7..63ad9fbdd8f 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnSyncTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerPublishModesOnSyncTests.cpp @@ -1,3 +1,19 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + #include "FlowControllerPublishModesTests.hpp" using namespace eprosima::fastdds::rtps; @@ -7,7 +23,7 @@ TYPED_TEST(FlowControllerPublishModes, sync_publish_mode) { FlowControllerDescriptor flow_controller_descr; FlowControllerImpl sync(nullptr, - &flow_controller_descr, 0); + &flow_controller_descr, 0, ThreadSettings{}); sync.init(); // Instantiate writers. diff --git a/test/unittest/rtps/flowcontrol/FlowControllerSchedulersTests.cpp b/test/unittest/rtps/flowcontrol/FlowControllerSchedulersTests.cpp index 2b83a4cd886..f827492eb05 100644 --- a/test/unittest/rtps/flowcontrol/FlowControllerSchedulersTests.cpp +++ b/test/unittest/rtps/flowcontrol/FlowControllerSchedulersTests.cpp @@ -1,8 +1,24 @@ -#include -#include +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. #include +#include +#include + +#include + namespace eprosima { namespace fastrtps { namespace rtps { @@ -90,7 +106,7 @@ TEST_F(FlowControllerSchedulers, Fifo) flow_controller_descr.max_bytes_per_period = 10200; flow_controller_descr.period_ms = 10; FlowControllerImpl async(nullptr, - &flow_controller_descr, 0); + &flow_controller_descr, 0, ThreadSettings{}); async.init(); // Instantiate writers. @@ -691,7 +707,7 @@ TEST_F(FlowControllerSchedulers, RoundRobin) flow_controller_descr.max_bytes_per_period = 10200; flow_controller_descr.period_ms = 10; FlowControllerImpl async(nullptr, - &flow_controller_descr, 0); + &flow_controller_descr, 0, ThreadSettings{}); async.init(); // Instantiate writers. @@ -1292,7 +1308,7 @@ TEST_F(FlowControllerSchedulers, HighPriority) flow_controller_descr.max_bytes_per_period = 10200; flow_controller_descr.period_ms = 10; FlowControllerImpl async(nullptr, - &flow_controller_descr, 0); + &flow_controller_descr, 0, ThreadSettings{}); async.init(); // Instantiate writers. @@ -1916,7 +1932,7 @@ TEST_F(FlowControllerSchedulers, PriorityWithReservation) flow_controller_descr.period_ms = 10; FlowControllerImpl async(nullptr, - &flow_controller_descr, 0); + &flow_controller_descr, 0, ThreadSettings{}); async.init(); // Instantiate writers.