From bf5877c42888de4bae0382b6c067efb495df1f5f Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 27 Jan 2020 08:21:27 +0100 Subject: [PATCH] RTPSMessageGroup_t preallocation (#938) * Refs #7116. Separating RTPSMessageGroup_t into a private header. * Refs #7116. Passing send buffers management responsibility to RTPSParticipantImpl. * Refs #7116. RAII on RTPSMessageGroup_t. * Refs #7116. Protecting access to buffer pool. * Refs #7116. Preallocating one buffer per expected thread. * Refs #7116. Protecting access to buffer pool with specific mutex. * Refs #7116. New SendBuffersManager class. * Refs #7116. Allowing wrapping buffer on RTPSMessageGroup_t. * Refs #7116. Using a single allocation for all initially allocated buffers. * Refs #7116. Adding assertion. * Refs #7116. Fixed alignment. * Refs #7116. Correct calculation of raw buffer size. * Refs #7116. Early return on security initialization failure. * Refs #7116. Making send buffers allocation configurable. * Refs #7116. Adding XML configuration for send_buffers allocation. * Refs #7116. Adding participant allocation attributes to XML parser unit test. * Refs #7165. Fixed warnings. * Refs #7165. Fixed leak. * Refs #7165. Return send buffer even if an exception is thrown. * Refs #7165. Correctly return send buffer. --- .../RTPSParticipantAllocationAttributes.hpp | 25 ++++ include/fastrtps/rtps/common/CDRMessage_t.h | 18 +++ .../fastrtps/rtps/messages/RTPSMessageGroup.h | 2 + include/fastrtps/xmlparser/XMLParser.h | 5 + include/fastrtps/xmlparser/XMLParserCommon.h | 3 + resources/xsd/fastRTPS_profiles.xsd | 8 ++ src/cpp/CMakeLists.txt | 1 + src/cpp/rtps/RTPSDomain.cpp | 16 +-- src/cpp/rtps/messages/RTPSMessageGroup.cpp | 85 +++--------- src/cpp/rtps/messages/RTPSMessageGroup_t.hpp | 116 ++++++++++++++++ src/cpp/rtps/messages/SendBuffersManager.cpp | 124 ++++++++++++++++++ src/cpp/rtps/messages/SendBuffersManager.hpp | 106 +++++++++++++++ .../rtps/participant/RTPSParticipantImpl.cpp | 38 +++++- .../rtps/participant/RTPSParticipantImpl.h | 12 +- src/cpp/xmlparser/XMLElementParser.cpp | 58 ++++++++ src/cpp/xmlparser/XMLParserCommon.cpp | 3 + .../xmlparser/XMLProfileParserTests.cpp | 14 ++ test/unittest/xmlparser/test_xml_profiles.xml | 4 + 18 files changed, 558 insertions(+), 80 deletions(-) create mode 100644 src/cpp/rtps/messages/RTPSMessageGroup_t.hpp create mode 100644 src/cpp/rtps/messages/SendBuffersManager.cpp create mode 100644 src/cpp/rtps/messages/SendBuffersManager.hpp diff --git a/include/fastrtps/rtps/attributes/RTPSParticipantAllocationAttributes.hpp b/include/fastrtps/rtps/attributes/RTPSParticipantAllocationAttributes.hpp index c138961870a..8af4acd9858 100644 --- a/include/fastrtps/rtps/attributes/RTPSParticipantAllocationAttributes.hpp +++ b/include/fastrtps/rtps/attributes/RTPSParticipantAllocationAttributes.hpp @@ -49,6 +49,29 @@ struct RemoteLocatorsAllocationAttributes size_t max_multicast_locators = 1u; }; +/** + * @brief Holds limits for send buffers allocations. + */ +struct SendBuffersAllocationAttributes +{ + /** Initial number of send buffers to allocate. + * + * This attribute controls the initial number of send buffers to be allocated. + * The default value of 0 will perform an initial guess of the number of buffers + * required, based on the number of threads from which a send operation could be + * started. + */ + size_t preallocated_number = 0u; + + /** Whether the number of send buffers is allowed to grow. + * + * This attribute controls how the buffer manager behaves when a send buffer is not + * available. When true, a new buffer will be created. When false, it will wait for a + * buffer to be returned. This is a tradeoff between latency and dynamic allocations. + */ + bool dynamic = false; +}; + /** * @brief Holds allocation limits affecting collections managed by a participant. */ @@ -62,6 +85,8 @@ struct RTPSParticipantAllocationAttributes ResourceLimitedContainerConfig readers; //! Defines the allocation behaviour for collections dependent on the total number of writers per participant. ResourceLimitedContainerConfig writers; + //! Defines the allocation behaviour for the send buffer manager. + SendBuffersAllocationAttributes send_buffers; //! @return the allocation config for the total of readers in the system (participants * readers) ResourceLimitedContainerConfig total_readers() const diff --git a/include/fastrtps/rtps/common/CDRMessage_t.h b/include/fastrtps/rtps/common/CDRMessage_t.h index 6bbe6140051..1bc48cd73d4 100644 --- a/include/fastrtps/rtps/common/CDRMessage_t.h +++ b/include/fastrtps/rtps/common/CDRMessage_t.h @@ -172,6 +172,24 @@ struct RTPS_DllAPI CDRMessage_t{ return *(this); } + void init( + octet* buffer_ptr, + uint32_t size) + { + assert(buffer == nullptr); + wraps = true; + pos = 0; + length = 0; + buffer = buffer_ptr; + max_size = size; + +#if __BIG_ENDIAN__ + msg_endian = BIGEND; +#else + msg_endian = LITTLEEND; +#endif + } + void reserve( uint32_t size) { diff --git a/include/fastrtps/rtps/messages/RTPSMessageGroup.h b/include/fastrtps/rtps/messages/RTPSMessageGroup.h index 8fba7a841c2..9c8f95d96bc 100644 --- a/include/fastrtps/rtps/messages/RTPSMessageGroup.h +++ b/include/fastrtps/rtps/messages/RTPSMessageGroup.h @@ -36,6 +36,7 @@ namespace rtps { class RTPSParticipantImpl; class Endpoint; +class RTPSMessageGroup_t; /** * RTPSMessageGroup Class used to construct a RTPS message. @@ -229,6 +230,7 @@ class Endpoint; std::chrono::steady_clock::time_point max_blocking_time_point_; + std::unique_ptr send_buffer_; }; } /* namespace rtps */ diff --git a/include/fastrtps/xmlparser/XMLParser.h b/include/fastrtps/xmlparser/XMLParser.h index b52ef445cef..0d5b1c0e72d 100644 --- a/include/fastrtps/xmlparser/XMLParser.h +++ b/include/fastrtps/xmlparser/XMLParser.h @@ -486,6 +486,11 @@ class XMLParser rtps::RemoteLocatorsAllocationAttributes& allocation, uint8_t ident); + RTPS_DllAPI static XMLP_ret getXMLSendBuffersAllocationAttributes( + tinyxml2::XMLElement* elem, + rtps::SendBuffersAllocationAttributes& allocation, + uint8_t ident); + RTPS_DllAPI static XMLP_ret getXMLDiscoverySettings( tinyxml2::XMLElement* elem, rtps::DiscoverySettings& settings, diff --git a/include/fastrtps/xmlparser/XMLParserCommon.h b/include/fastrtps/xmlparser/XMLParserCommon.h index ff887150327..5ffda8e781a 100644 --- a/include/fastrtps/xmlparser/XMLParserCommon.h +++ b/include/fastrtps/xmlparser/XMLParserCommon.h @@ -115,6 +115,9 @@ extern const char* MAX_MULTICAST_LOCATORS; extern const char* TOTAL_PARTICIPANTS; extern const char* TOTAL_READERS; extern const char* TOTAL_WRITERS; +extern const char* SEND_BUFFERS; +extern const char* PREALLOCATED_NUMBER; +extern const char* DYNAMIC_LC; /// Publisher-subscriber attributes extern const char* TOPIC; diff --git a/resources/xsd/fastRTPS_profiles.xsd b/resources/xsd/fastRTPS_profiles.xsd index 824ed6e8b12..5811076fd8d 100644 --- a/resources/xsd/fastRTPS_profiles.xsd +++ b/resources/xsd/fastRTPS_profiles.xsd @@ -383,12 +383,20 @@ + + + + + + + + diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index 625dacd622e..47261fba8e6 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -63,6 +63,7 @@ set(${PROJECT_NAME}_source_files rtps/messages/RTPSMessageCreator.cpp rtps/messages/RTPSMessageGroup.cpp rtps/messages/RTPSGapBuilder.cpp + rtps/messages/SendBuffersManager.cpp rtps/messages/MessageReceiver.cpp rtps/messages/submessages/AckNackMsg.hpp rtps/messages/submessages/DataMsg.hpp diff --git a/src/cpp/rtps/RTPSDomain.cpp b/src/cpp/rtps/RTPSDomain.cpp index ba8f64341d3..5882e3351ac 100644 --- a/src/cpp/rtps/RTPSDomain.cpp +++ b/src/cpp/rtps/RTPSDomain.cpp @@ -187,23 +187,23 @@ RTPSParticipant* RTPSDomain::createParticipant( pimpl = new RTPSParticipantImpl(PParam, guidP, p, listen); } -#if HAVE_SECURITY - // Check security was correctly initialized - if (!pimpl->is_security_initialized()) + // Check there is at least one transport registered. + if (!pimpl->networkFactoryHasRegisteredTransports()) { - logError(RTPS_PARTICIPANT, "Cannot create participant due to security initialization error"); + logError(RTPS_PARTICIPANT, "Cannot create participant, because there is any transport"); delete pimpl; return nullptr; } -#endif - // Check there is at least one transport registered. - if (!pimpl->networkFactoryHasRegisteredTransports()) +#if HAVE_SECURITY + // Check security was correctly initialized + if (!pimpl->is_security_initialized()) { - logError(RTPS_PARTICIPANT, "Cannot create participant, because there is any transport"); + logError(RTPS_PARTICIPANT, "Cannot create participant due to security initialization error"); delete pimpl; return nullptr; } +#endif { std::lock_guard guard(m_mutex); diff --git a/src/cpp/rtps/messages/RTPSMessageGroup.cpp b/src/cpp/rtps/messages/RTPSMessageGroup.cpp index b68e52ed55b..90147f3c21d 100644 --- a/src/cpp/rtps/messages/RTPSMessageGroup.cpp +++ b/src/cpp/rtps/messages/RTPSMessageGroup.cpp @@ -23,6 +23,7 @@ #include "../participant/RTPSParticipantImpl.h" #include "../flowcontrol/FlowController.h" #include "RTPSGapBuilder.hpp" +#include "RTPSMessageGroup_t.hpp" #include @@ -32,56 +33,6 @@ namespace eprosima { namespace fastrtps { namespace rtps { -/** - * Class RTPSMessageGroup_t that contains the messages used to send multiples changes as one message. - * @ingroup WRITER_MODULE - */ -class RTPSMessageGroup_t -{ - public: - - RTPSMessageGroup_t() - : rtpsmsg_submessage_(0u) - , rtpsmsg_fullmsg_(0u) -#if HAVE_SECURITY - , rtpsmsg_encrypt_(0u) -#endif - { - } - - void init( -#if HAVE_SECURITY - bool has_security, -#endif - uint32_t payload, - const GuidPrefix_t& participant_guid) - { - - rtpsmsg_fullmsg_.reserve(payload); - rtpsmsg_submessage_.reserve(payload); - -#if HAVE_SECURITY - if (has_security) - { - rtpsmsg_encrypt_.reserve(payload); - } -#endif - - CDRMessage::initCDRMsg(&rtpsmsg_fullmsg_); - RTPSMessageCreator::addHeader(&rtpsmsg_fullmsg_, participant_guid); - } - - CDRMessage_t rtpsmsg_submessage_; - - CDRMessage_t rtpsmsg_fullmsg_; - -#if HAVE_SECURITY - CDRMessage_t rtpsmsg_encrypt_; -#endif -}; - -static thread_local std::unique_ptr tls_group; - bool sort_changes_group (CacheChange_t* c1,CacheChange_t* c2) { return(c1->sequenceNumber < c2->sequenceNumber); @@ -158,6 +109,7 @@ RTPSMessageGroup::RTPSMessageGroup( , encrypt_msg_(nullptr) #endif , max_blocking_time_point_(max_blocking_time_point) + , send_buffer_(participant->get_send_buffer()) { // Avoid warning when neither SECURITY nor DEBUG is used (void)participant; @@ -165,23 +117,8 @@ RTPSMessageGroup::RTPSMessageGroup( assert(participant); assert(endpoint); - if (!tls_group) - { - tls_group.reset(new RTPSMessageGroup_t()); - } - - uint32_t max_payload_size = participant->getMaxMessageSize(); - const GuidPrefix_t& guid_prefix = participant->getGuid().guidPrefix; - -#if HAVE_SECURITY - tls_group->init(participant->is_secure(), max_payload_size, guid_prefix); -#else - tls_group->init(max_payload_size, guid_prefix); -#endif - - full_msg_ = &(tls_group->rtpsmsg_fullmsg_); - submessage_msg_ = &(tls_group->rtpsmsg_submessage_); - + full_msg_ = &(send_buffer_->rtpsmsg_fullmsg_); + submessage_msg_ = &(send_buffer_->rtpsmsg_submessage_); // Init RTPS message. reset_to_header(); @@ -191,7 +128,7 @@ RTPSMessageGroup::RTPSMessageGroup( #if HAVE_SECURITY if (participant->is_secure()) { - encrypt_msg_ = &(tls_group->rtpsmsg_encrypt_); + encrypt_msg_ = &(send_buffer_->rtpsmsg_encrypt_); CDRMessage::initCDRMsg(encrypt_msg_); } #endif @@ -199,7 +136,17 @@ RTPSMessageGroup::RTPSMessageGroup( RTPSMessageGroup::~RTPSMessageGroup() noexcept(false) { - send(); + try + { + send(); + } + catch (...) + { + participant_->return_send_buffer(std::move(send_buffer_)); + throw; + } + + participant_->return_send_buffer(std::move(send_buffer_)); } void RTPSMessageGroup::reset_to_header() diff --git a/src/cpp/rtps/messages/RTPSMessageGroup_t.hpp b/src/cpp/rtps/messages/RTPSMessageGroup_t.hpp new file mode 100644 index 00000000000..75730544ef9 --- /dev/null +++ b/src/cpp/rtps/messages/RTPSMessageGroup_t.hpp @@ -0,0 +1,116 @@ +// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file RTPSMessageGroup_t.hpp + */ + + #ifndef RTPS_MESSAGES_RTPSMESSAGEGROUP_T_HPP + #define RTPS_MESSAGES_RTPSMESSAGEGROUP_T_HPP + +#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC + +#include +#include +#include + +namespace eprosima { +namespace fastrtps { +namespace rtps { + + /** + * Class RTPSMessageGroup_t that contains the messages used to send multiples changes as one message. + * @ingroup WRITER_MODULE + */ +class RTPSMessageGroup_t +{ +public: + + RTPSMessageGroup_t( +#if HAVE_SECURITY + bool has_security, +#endif + uint32_t payload, + const GuidPrefix_t& participant_guid) + : rtpsmsg_submessage_(0u) + , rtpsmsg_fullmsg_(0u) +#if HAVE_SECURITY + , rtpsmsg_encrypt_(0u) +#endif + { + rtpsmsg_fullmsg_.reserve(payload); + rtpsmsg_submessage_.reserve(payload); + +#if HAVE_SECURITY + if (has_security) + { + rtpsmsg_encrypt_.reserve(payload); + } +#endif + + init(participant_guid); + } + + RTPSMessageGroup_t( + octet* buffer_ptr, +#if HAVE_SECURITY + bool has_security, +#endif + uint32_t payload, + const GuidPrefix_t& participant_guid) + : rtpsmsg_submessage_(0u) + , rtpsmsg_fullmsg_(0u) +#if HAVE_SECURITY + , rtpsmsg_encrypt_(0u) +#endif + { + rtpsmsg_fullmsg_.init(buffer_ptr, payload); + buffer_ptr += payload; + rtpsmsg_submessage_.init(buffer_ptr, payload); + +#if HAVE_SECURITY + if (has_security) + { + buffer_ptr += payload; + rtpsmsg_encrypt_.init(buffer_ptr, payload); + } +#endif + + init(participant_guid); + } + + inline void init( + const GuidPrefix_t& participant_guid) + { + CDRMessage::initCDRMsg(&rtpsmsg_fullmsg_); + RTPSMessageCreator::addHeader(&rtpsmsg_fullmsg_, participant_guid); + } + + CDRMessage_t rtpsmsg_submessage_; + + CDRMessage_t rtpsmsg_fullmsg_; + +#if HAVE_SECURITY + CDRMessage_t rtpsmsg_encrypt_; +#endif +}; + +} // namespace rtps +} // namespace fastrtps +} // namespace eprosima + +#endif // DOXYGEN_SHOULD_SKIP_THIS_PUBLIC + +#endif // RTPS_MESSAGES_RTPSMESSAGEGROUP_T_HPP + diff --git a/src/cpp/rtps/messages/SendBuffersManager.cpp b/src/cpp/rtps/messages/SendBuffersManager.cpp new file mode 100644 index 00000000000..ff4218723fd --- /dev/null +++ b/src/cpp/rtps/messages/SendBuffersManager.cpp @@ -0,0 +1,124 @@ +// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SendBuffersManager.cpp + */ + +#include "SendBuffersManager.hpp" +#include "../participant/RTPSParticipantImpl.h" + +namespace eprosima { +namespace fastrtps { +namespace rtps { + +SendBuffersManager::SendBuffersManager( + size_t reserved_size, + bool allow_growing) + : allow_growing_(allow_growing) +{ + pool_.reserve(reserved_size); +} + +void SendBuffersManager::init( + const RTPSParticipantImpl* participant) +{ + std::lock_guard guard(mutex_); + + if (n_created_ < pool_.capacity()) + { + const GuidPrefix_t& guid_prefix = participant->getGuid().guidPrefix; + + // Single allocation for the data of all the buffers. + // We align the payload size to the size of a pointer, so all buffers will + // be aligned as if directly allocated. + constexpr size_t align_size = sizeof(octet*) - 1; + uint32_t payload_size = participant->getMaxMessageSize(); + assert(payload_size > 0u); + payload_size = (payload_size + align_size) & ~align_size; + size_t advance = payload_size; +#if HAVE_SECURITY + bool secure = participant->is_secure(); + advance *= secure ? 3 : 2; +#else + advance *= 2; +#endif + size_t data_size = advance * (pool_.capacity() - n_created_); + common_buffer_.assign(data_size, 0); + + octet* raw_buffer = common_buffer_.data(); + while(n_created_ < pool_.capacity()) + { + pool_.emplace_back(new RTPSMessageGroup_t( + raw_buffer, +#if HAVE_SECURITY + secure, +#endif + payload_size, guid_prefix + )); + raw_buffer += advance; + ++n_created_; + } + } +} + +std::unique_ptr SendBuffersManager::get_buffer( + const RTPSParticipantImpl* participant) +{ + std::unique_lock lock(mutex_); + + std::unique_ptr ret_val; + + if (pool_.empty()) + { + if (allow_growing_ || n_created_ < pool_.capacity()) + { + add_one_buffer(participant); + } + else + { + available_cv_.wait(lock); + assert(!pool_.empty()); + } + } + + ret_val = std::move(pool_.back()); + pool_.pop_back(); + + return ret_val; +} + +void SendBuffersManager::return_buffer( + std::unique_ptr && buffer) +{ + std::lock_guard guard(mutex_); + pool_.push_back(std::move(buffer)); + available_cv_.notify_one(); +} + +void SendBuffersManager::add_one_buffer( + const RTPSParticipantImpl* participant) +{ + RTPSMessageGroup_t* new_item = new RTPSMessageGroup_t( +#if HAVE_SECURITY + participant->is_secure(), +#endif + participant->getMaxMessageSize(), participant->getGuid().guidPrefix); + pool_.emplace_back(new_item); + ++n_created_; +} + +} /* namespace rtps */ +} /* namespace fastrtps */ +} /* namespace eprosima */ diff --git a/src/cpp/rtps/messages/SendBuffersManager.hpp b/src/cpp/rtps/messages/SendBuffersManager.hpp new file mode 100644 index 00000000000..38758c4b24f --- /dev/null +++ b/src/cpp/rtps/messages/SendBuffersManager.hpp @@ -0,0 +1,106 @@ +// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SendBuffersManager.hpp + */ + +#ifndef RTPS_MESSAGES_SENDBUFFERSMANAGER_HPP +#define RTPS_MESSAGES_SENDBUFFERSMANAGER_HPP +#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC + +#include "RTPSMessageGroup_t.hpp" +#include + +#include // std::vector +#include // std::unique_ptr +#include // std::mutex + +namespace eprosima { +namespace fastrtps { +namespace rtps { + +class RTPSParticipantImpl; + +/** + * Manages a pool of send buffers. + * @ingroup WRITER_MODULE + */ +class SendBuffersManager +{ +public: + + /** + * Construct a SendBuffersManager. + * @param reserved_size Initial size for the pool. + * @param allow_growing Whether we allow creation of more than reserved_size elements. + */ + SendBuffersManager( + size_t reserved_size, + bool allow_growing); + + ~SendBuffersManager() + { + assert(pool_.size() == n_created_); + } + + /** + * Initialization of pool. + * Fills the pool to its reserved capacity. + * @param participant Pointer to the participant creating the pool. + */ + void init( + const RTPSParticipantImpl* participant); + + /** + * Get one buffer from the pool. + * @param participant Pointer to the participant asking for a buffer. + * @return unique pointer to a send buffer. + */ + std::unique_ptr get_buffer( + const RTPSParticipantImpl* participant); + + /** + * Return one buffer to the pool. + * @param buffer unique pointer to the buffer being returned. + */ + void return_buffer( + std::unique_ptr && buffer); + +private: + + void add_one_buffer( + const RTPSParticipantImpl* participant); + + //!Protects all data + std::mutex mutex_; + //!Send buffers pool + std::vector> pool_; + //!Raw buffer shared by the buffers created inside init() + std::vector common_buffer_; + //!Creation counter + std::size_t n_created_ = 0; + //!Whether we allow n_created_ to grow beyond the pool_ capacity. + bool allow_growing_ = true; + //!To wait for a buffer to be returned to the pool. + std::condition_variable available_cv_; +}; + +} /* namespace rtps */ +} /* namespace fastrtps */ +} /* namespace eprosima */ + +#endif + +#endif // RTPS_MESSAGES_SENDBUFFERSMANAGER_HPP diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index ebb91f8796e..667242542f7 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -134,6 +134,11 @@ RTPSParticipantImpl::RTPSParticipantImpl( mp_userParticipant->mp_impl = this; mp_event_thr.init_thread(); + if (!networkFactoryHasRegisteredTransports()) + { + return; + } + // Throughput controller, if the descriptor has valid values if (PParam.throughputController.bytesPerPeriod != UINT32_MAX && PParam.throughputController.periodMillisecs != 0) { @@ -237,12 +242,33 @@ RTPSParticipantImpl::RTPSParticipantImpl( createReceiverResources(m_att.defaultUnicastLocatorList, true); createReceiverResources(m_att.defaultMulticastLocatorList, true); + bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic; + size_t num_send_buffers = m_att.allocation.send_buffers.preallocated_number; + if(num_send_buffers == 0) + { + // Three buffers (user, events and async writer threads) + num_send_buffers = 3; + // Add one buffer per reception thread + num_send_buffers += m_receiverResourcelist.size(); + } + + // Create buffer pool + send_buffers_.reset(new SendBuffersManager(num_send_buffers, allow_growing_buffers)); + #if HAVE_SECURITY // Start security // TODO(Ricardo) Get returned value in future. m_security_manager_initialized = m_security_manager.init(security_attributes_, PParam.properties, m_is_security_active); + if (!m_security_manager_initialized) + { + // Participant will be deleted, no need to allocate buffers or create builtin endpoints + return; + } #endif + // Allocate all pending send buffers + send_buffers_->init(this); + mp_builtinProtocols = new BuiltinProtocols(); logInfo(RTPS_PARTICIPANT, "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix); @@ -306,8 +332,6 @@ RTPSParticipantImpl::~RTPSParticipantImpl() { disable(); - delete mp_builtinProtocols; - #if HAVE_SECURITY m_security_manager.destroy(); #endif @@ -1234,6 +1258,16 @@ IPersistenceService* RTPSParticipantImpl::get_persistence_service(const Endpoint PersistenceFactory::create_persistence_service(m_att.properties); } +std::unique_ptr RTPSParticipantImpl::get_send_buffer() +{ + return send_buffers_->get_buffer(this); +} + +void RTPSParticipantImpl::return_send_buffer(std::unique_ptr && buffer) +{ + send_buffers_->return_buffer(std::move(buffer)); +} + } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 614a3c73b4a..f1b8c2e69a4 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -46,6 +46,9 @@ #include #include +#include "../messages/RTPSMessageGroup_t.hpp" +#include "../messages/SendBuffersManager.hpp" + #if HAVE_SECURITY #include #include @@ -61,7 +64,9 @@ class TopicAttributes; class MessageReceiver; namespace rtps -{ class RTPSParticipant; +{ + +class RTPSParticipant; class RTPSParticipantListener; class BuiltinProtocols; struct CDRMessage_t; @@ -298,6 +303,9 @@ class RTPSParticipantImpl RTPSWriter* find_local_writer( const GUID_t& writer_guid); + std::unique_ptr get_send_buffer(); + void return_send_buffer(std::unique_ptr && buffer); + private: //!Attributes of the RTPSParticipant RTPSParticipantAttributes m_att; @@ -328,6 +336,8 @@ class RTPSParticipantImpl NetworkFactory m_network_Factory; //!Async writer thread AsyncWriterThread async_thread_; + //!Pool of send buffers + std::unique_ptr send_buffers_; #if HAVE_SECURITY // Security manager diff --git a/src/cpp/xmlparser/XMLElementParser.cpp b/src/cpp/xmlparser/XMLElementParser.cpp index 69fb8367577..a36cb5cbdef 100644 --- a/src/cpp/xmlparser/XMLElementParser.cpp +++ b/src/cpp/xmlparser/XMLElementParser.cpp @@ -37,6 +37,7 @@ XMLP_ret XMLParser::getXMLParticipantAllocationAttributes( + */ @@ -78,6 +79,14 @@ XMLP_ret XMLParser::getXMLParticipantAllocationAttributes( return XMLP_ret::XML_ERROR; } } + else if (strcmp(name, SEND_BUFFERS) == 0) + { + // send_buffers - sendBuffersAllocationConfigType + if (XMLP_ret::XML_OK != getXMLSendBuffersAllocationAttributes(p_aux0, allocation.send_buffers, ident)) + { + return XMLP_ret::XML_ERROR; + } + } else { logError(XMLPARSER, "Invalid element found into 'rtpsParticipantAllocationAttributesType'. Name: " << name); @@ -136,6 +145,55 @@ XMLP_ret XMLParser::getXMLRemoteLocatorsAllocationAttributes( return XMLP_ret::XML_OK; } +XMLP_ret XMLParser::getXMLSendBuffersAllocationAttributes( + tinyxml2::XMLElement* elem, + rtps::SendBuffersAllocationAttributes& allocation, + uint8_t ident) +{ + /* + + + + + + + */ + + tinyxml2::XMLElement *p_aux0 = nullptr; + const char* name = nullptr; + uint32_t tmp; + for (p_aux0 = elem->FirstChildElement(); p_aux0 != NULL; p_aux0 = p_aux0->NextSiblingElement()) + { + name = p_aux0->Name(); + if (strcmp(name, PREALLOCATED_NUMBER) == 0) + { + // preallocated_number - uint32Type + if (XMLP_ret::XML_OK != getXMLUint(p_aux0, &tmp, ident)) + { + return XMLP_ret::XML_ERROR; + } + allocation.preallocated_number = tmp; + } + else if (strcmp(name, DYNAMIC_LC) == 0) + { + // dynamic - boolType + bool tmp_bool = false; + if (XMLP_ret::XML_OK != getXMLBool(p_aux0, &tmp_bool, ident)) + { + return XMLP_ret::XML_ERROR; + } + allocation.dynamic = tmp_bool; + } + else + { + logError(XMLPARSER, "Invalid element found into 'sendBuffersAllocationConfigType'. Name: " << name); + return XMLP_ret::XML_ERROR; + } + } + + return XMLP_ret::XML_OK; +} + XMLP_ret XMLParser::getXMLDiscoverySettings(tinyxml2::XMLElement* elem, rtps::DiscoverySettings& settings, uint8_t ident) { /* diff --git a/src/cpp/xmlparser/XMLParserCommon.cpp b/src/cpp/xmlparser/XMLParserCommon.cpp index 24aa9842332..30e09dd1451 100644 --- a/src/cpp/xmlparser/XMLParserCommon.cpp +++ b/src/cpp/xmlparser/XMLParserCommon.cpp @@ -97,6 +97,9 @@ const char* MAX_MULTICAST_LOCATORS = "max_multicast_locators"; const char* TOTAL_PARTICIPANTS = "total_participants"; const char* TOTAL_READERS = "total_readers"; const char* TOTAL_WRITERS = "total_writers"; +const char* SEND_BUFFERS = "send_buffers"; +const char* PREALLOCATED_NUMBER = "preallocated_number"; +const char* DYNAMIC_LC = "dynamic"; /// Publisher-subscriber attributes const char* TOPIC = "topic"; diff --git a/test/unittest/xmlparser/XMLProfileParserTests.cpp b/test/unittest/xmlparser/XMLProfileParserTests.cpp index 62b740af26b..f07d4036a31 100644 --- a/test/unittest/xmlparser/XMLProfileParserTests.cpp +++ b/test/unittest/xmlparser/XMLProfileParserTests.cpp @@ -108,6 +108,20 @@ TEST_F(XMLProfileParserTests, XMLParserParcipant) LocatorListIterator loc_list_it; PortParameters &port = rtps_atts.port; + EXPECT_EQ(rtps_atts.allocation.locators.max_unicast_locators, 4u); + EXPECT_EQ(rtps_atts.allocation.locators.max_multicast_locators, 1u); + EXPECT_EQ(rtps_atts.allocation.participants.initial, 10u); + EXPECT_EQ(rtps_atts.allocation.participants.maximum, 20u); + EXPECT_EQ(rtps_atts.allocation.participants.increment, 2u); + EXPECT_EQ(rtps_atts.allocation.readers.initial, 10u); + EXPECT_EQ(rtps_atts.allocation.readers.maximum, 20u); + EXPECT_EQ(rtps_atts.allocation.readers.increment, 2u); + EXPECT_EQ(rtps_atts.allocation.writers.initial, 10u); + EXPECT_EQ(rtps_atts.allocation.writers.maximum, 20u); + EXPECT_EQ(rtps_atts.allocation.writers.increment, 2u); + EXPECT_EQ(rtps_atts.allocation.send_buffers.preallocated_number, 127u); + EXPECT_EQ(rtps_atts.allocation.send_buffers.dynamic, true); + IPLocator::setIPv4(locator, 192, 168, 1 , 2); locator.port = 2019; EXPECT_EQ(*rtps_atts.defaultUnicastLocatorList.begin(), locator); diff --git a/test/unittest/xmlparser/test_xml_profiles.xml b/test/unittest/xmlparser/test_xml_profiles.xml index 21eec9bc0d3..c8a8b516cda 100644 --- a/test/unittest/xmlparser/test_xml_profiles.xml +++ b/test/unittest/xmlparser/test_xml_profiles.xml @@ -26,6 +26,10 @@ 20 2 + + 127 + true +