Skip to content

Commit

Permalink
RTPSMessageGroup_t preallocation (#938)
Browse files Browse the repository at this point in the history
* Refs #7116. Separating RTPSMessageGroup_t into a private header.

* Refs #7116. Passing send buffers management responsibility to RTPSParticipantImpl.

* Refs #7116. RAII on RTPSMessageGroup_t.

* Refs #7116. Protecting access to buffer pool.

* Refs #7116. Preallocating one buffer per expected thread.

* Refs #7116. Protecting access to buffer pool with specific mutex.

* Refs #7116. New SendBuffersManager class.

* Refs #7116. Allowing wrapping buffer on RTPSMessageGroup_t.

* Refs #7116. Using a single allocation for all initially allocated buffers.

* Refs #7116. Adding assertion.

* Refs #7116. Fixed alignment.

* Refs #7116. Correct calculation of raw buffer size.

* Refs #7116. Early return on security initialization failure.

* Refs #7116. Making send buffers allocation configurable.

* Refs #7116. Adding XML configuration for send_buffers allocation.

* Refs #7116. Adding participant allocation attributes to XML parser unit test.

* Refs #7165. Fixed warnings.

* Refs #7165. Fixed leak.

* Refs #7165. Return send buffer even if an exception is thrown.

* Refs #7165. Correctly return send buffer.
  • Loading branch information
MiguelCompany authored Jan 27, 2020
1 parent 136e594 commit bf5877c
Show file tree
Hide file tree
Showing 18 changed files with 558 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ struct RemoteLocatorsAllocationAttributes
size_t max_multicast_locators = 1u;
};

/**
* @brief Holds limits for send buffers allocations.
*/
struct SendBuffersAllocationAttributes
{
/** Initial number of send buffers to allocate.
*
* This attribute controls the initial number of send buffers to be allocated.
* The default value of 0 will perform an initial guess of the number of buffers
* required, based on the number of threads from which a send operation could be
* started.
*/
size_t preallocated_number = 0u;

/** Whether the number of send buffers is allowed to grow.
*
* This attribute controls how the buffer manager behaves when a send buffer is not
* available. When true, a new buffer will be created. When false, it will wait for a
* buffer to be returned. This is a tradeoff between latency and dynamic allocations.
*/
bool dynamic = false;
};

/**
* @brief Holds allocation limits affecting collections managed by a participant.
*/
Expand All @@ -62,6 +85,8 @@ struct RTPSParticipantAllocationAttributes
ResourceLimitedContainerConfig readers;
//! Defines the allocation behaviour for collections dependent on the total number of writers per participant.
ResourceLimitedContainerConfig writers;
//! Defines the allocation behaviour for the send buffer manager.
SendBuffersAllocationAttributes send_buffers;

//! @return the allocation config for the total of readers in the system (participants * readers)
ResourceLimitedContainerConfig total_readers() const
Expand Down
18 changes: 18 additions & 0 deletions include/fastrtps/rtps/common/CDRMessage_t.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,24 @@ struct RTPS_DllAPI CDRMessage_t{
return *(this);
}

void init(
octet* buffer_ptr,
uint32_t size)
{
assert(buffer == nullptr);
wraps = true;
pos = 0;
length = 0;
buffer = buffer_ptr;
max_size = size;

#if __BIG_ENDIAN__
msg_endian = BIGEND;
#else
msg_endian = LITTLEEND;
#endif
}

void reserve(
uint32_t size)
{
Expand Down
2 changes: 2 additions & 0 deletions include/fastrtps/rtps/messages/RTPSMessageGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace rtps {

class RTPSParticipantImpl;
class Endpoint;
class RTPSMessageGroup_t;

/**
* RTPSMessageGroup Class used to construct a RTPS message.
Expand Down Expand Up @@ -229,6 +230,7 @@ class Endpoint;

std::chrono::steady_clock::time_point max_blocking_time_point_;

std::unique_ptr<RTPSMessageGroup_t> send_buffer_;
};

} /* namespace rtps */
Expand Down
5 changes: 5 additions & 0 deletions include/fastrtps/xmlparser/XMLParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,11 @@ class XMLParser
rtps::RemoteLocatorsAllocationAttributes& allocation,
uint8_t ident);

RTPS_DllAPI static XMLP_ret getXMLSendBuffersAllocationAttributes(
tinyxml2::XMLElement* elem,
rtps::SendBuffersAllocationAttributes& allocation,
uint8_t ident);

RTPS_DllAPI static XMLP_ret getXMLDiscoverySettings(
tinyxml2::XMLElement* elem,
rtps::DiscoverySettings& settings,
Expand Down
3 changes: 3 additions & 0 deletions include/fastrtps/xmlparser/XMLParserCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ extern const char* MAX_MULTICAST_LOCATORS;
extern const char* TOTAL_PARTICIPANTS;
extern const char* TOTAL_READERS;
extern const char* TOTAL_WRITERS;
extern const char* SEND_BUFFERS;
extern const char* PREALLOCATED_NUMBER;
extern const char* DYNAMIC_LC;

/// Publisher-subscriber attributes
extern const char* TOPIC;
Expand Down
8 changes: 8 additions & 0 deletions resources/xsd/fastRTPS_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,20 @@
</xs:all>
</xs:complexType>

<xs:complexType name="sendBuffersAllocationConfigType">
<xs:all minOccurs="0">
<xs:element name="preallocated_number" type="uint32Type" minOccurs="0"/>
<xs:element name="dynamic" type="boolType" minOccurs="0"/>
</xs:all>
</xs:complexType>

<xs:complexType name="rtpsParticipantAllocationAttributesType">
<xs:all minOccurs="0">
<xs:element name="remote_locators" type="remoteLocatorsAllocationConfigType" minOccurs="0"/>
<xs:element name="total_participants" type="containerAllocationConfigType" minOccurs="0"/>
<xs:element name="total_readers" type="containerAllocationConfigType" minOccurs="0"/>
<xs:element name="total_writers" type="containerAllocationConfigType" minOccurs="0"/>
<xs:element name="send_buffers" type="sendBuffersAllocationConfigType" minOccurs="0"/>
</xs:all>
</xs:complexType>

Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ set(${PROJECT_NAME}_source_files
rtps/messages/RTPSMessageCreator.cpp
rtps/messages/RTPSMessageGroup.cpp
rtps/messages/RTPSGapBuilder.cpp
rtps/messages/SendBuffersManager.cpp
rtps/messages/MessageReceiver.cpp
rtps/messages/submessages/AckNackMsg.hpp
rtps/messages/submessages/DataMsg.hpp
Expand Down
16 changes: 8 additions & 8 deletions src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,23 +187,23 @@ RTPSParticipant* RTPSDomain::createParticipant(
pimpl = new RTPSParticipantImpl(PParam, guidP, p, listen);
}

#if HAVE_SECURITY
// Check security was correctly initialized
if (!pimpl->is_security_initialized())
// Check there is at least one transport registered.
if (!pimpl->networkFactoryHasRegisteredTransports())
{
logError(RTPS_PARTICIPANT, "Cannot create participant due to security initialization error");
logError(RTPS_PARTICIPANT, "Cannot create participant, because there is any transport");
delete pimpl;
return nullptr;
}
#endif

// Check there is at least one transport registered.
if (!pimpl->networkFactoryHasRegisteredTransports())
#if HAVE_SECURITY
// Check security was correctly initialized
if (!pimpl->is_security_initialized())
{
logError(RTPS_PARTICIPANT, "Cannot create participant, because there is any transport");
logError(RTPS_PARTICIPANT, "Cannot create participant due to security initialization error");
delete pimpl;
return nullptr;
}
#endif

{
std::lock_guard<std::mutex> guard(m_mutex);
Expand Down
85 changes: 16 additions & 69 deletions src/cpp/rtps/messages/RTPSMessageGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "../participant/RTPSParticipantImpl.h"
#include "../flowcontrol/FlowController.h"
#include "RTPSGapBuilder.hpp"
#include "RTPSMessageGroup_t.hpp"

#include <fastrtps/log/Log.h>

Expand All @@ -32,56 +33,6 @@ namespace eprosima {
namespace fastrtps {
namespace rtps {

/**
* Class RTPSMessageGroup_t that contains the messages used to send multiples changes as one message.
* @ingroup WRITER_MODULE
*/
class RTPSMessageGroup_t
{
public:

RTPSMessageGroup_t()
: rtpsmsg_submessage_(0u)
, rtpsmsg_fullmsg_(0u)
#if HAVE_SECURITY
, rtpsmsg_encrypt_(0u)
#endif
{
}

void init(
#if HAVE_SECURITY
bool has_security,
#endif
uint32_t payload,
const GuidPrefix_t& participant_guid)
{

rtpsmsg_fullmsg_.reserve(payload);
rtpsmsg_submessage_.reserve(payload);

#if HAVE_SECURITY
if (has_security)
{
rtpsmsg_encrypt_.reserve(payload);
}
#endif

CDRMessage::initCDRMsg(&rtpsmsg_fullmsg_);
RTPSMessageCreator::addHeader(&rtpsmsg_fullmsg_, participant_guid);
}

CDRMessage_t rtpsmsg_submessage_;

CDRMessage_t rtpsmsg_fullmsg_;

#if HAVE_SECURITY
CDRMessage_t rtpsmsg_encrypt_;
#endif
};

static thread_local std::unique_ptr<RTPSMessageGroup_t> tls_group;

bool sort_changes_group (CacheChange_t* c1,CacheChange_t* c2)
{
return(c1->sequenceNumber < c2->sequenceNumber);
Expand Down Expand Up @@ -158,30 +109,16 @@ RTPSMessageGroup::RTPSMessageGroup(
, encrypt_msg_(nullptr)
#endif
, max_blocking_time_point_(max_blocking_time_point)
, send_buffer_(participant->get_send_buffer())
{
// Avoid warning when neither SECURITY nor DEBUG is used
(void)participant;

assert(participant);
assert(endpoint);

if (!tls_group)
{
tls_group.reset(new RTPSMessageGroup_t());
}

uint32_t max_payload_size = participant->getMaxMessageSize();
const GuidPrefix_t& guid_prefix = participant->getGuid().guidPrefix;

#if HAVE_SECURITY
tls_group->init(participant->is_secure(), max_payload_size, guid_prefix);
#else
tls_group->init(max_payload_size, guid_prefix);
#endif

full_msg_ = &(tls_group->rtpsmsg_fullmsg_);
submessage_msg_ = &(tls_group->rtpsmsg_submessage_);

full_msg_ = &(send_buffer_->rtpsmsg_fullmsg_);
submessage_msg_ = &(send_buffer_->rtpsmsg_submessage_);

// Init RTPS message.
reset_to_header();
Expand All @@ -191,15 +128,25 @@ RTPSMessageGroup::RTPSMessageGroup(
#if HAVE_SECURITY
if (participant->is_secure())
{
encrypt_msg_ = &(tls_group->rtpsmsg_encrypt_);
encrypt_msg_ = &(send_buffer_->rtpsmsg_encrypt_);
CDRMessage::initCDRMsg(encrypt_msg_);
}
#endif
}

RTPSMessageGroup::~RTPSMessageGroup() noexcept(false)
{
send();
try
{
send();
}
catch (...)
{
participant_->return_send_buffer(std::move(send_buffer_));
throw;
}

participant_->return_send_buffer(std::move(send_buffer_));
}

void RTPSMessageGroup::reset_to_header()
Expand Down
Loading

0 comments on commit bf5877c

Please sign in to comment.