Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RTPSMessageGroup_t preallocation <1.9.x> [7165] #938

Merged
merged 20 commits into from
Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
cc15eed
Refs #7116. Separating RTPSMessageGroup_t into a private header.
MiguelCompany Dec 18, 2019
c2517b7
Refs #7116. Passing send buffers management responsibility to RTPSPar…
MiguelCompany Dec 18, 2019
0671cfa
Refs #7116. RAII on RTPSMessageGroup_t.
MiguelCompany Dec 18, 2019
991af62
Refs #7116. Protecting access to buffer pool.
MiguelCompany Dec 18, 2019
21a3fb0
Refs #7116. Preallocating one buffer per expected thread.
MiguelCompany Dec 18, 2019
c39a616
Refs #7116. Protecting access to buffer pool with specific mutex.
MiguelCompany Dec 19, 2019
9526605
Refs #7116. New SendBuffersManager class.
MiguelCompany Dec 19, 2019
e3a9583
Refs #7116. Allowing wrapping buffer on RTPSMessageGroup_t.
MiguelCompany Dec 19, 2019
878109c
Refs #7116. Using a single allocation for all initially allocated buf…
MiguelCompany Dec 19, 2019
efd27d4
Refs #7116. Adding assertion.
MiguelCompany Dec 19, 2019
2433ef1
Refs #7116. Fixed alignment.
MiguelCompany Dec 19, 2019
32fd0c5
Refs #7116. Correct calculation of raw buffer size.
MiguelCompany Dec 19, 2019
9f67eb8
Refs #7116. Early return on security initialization failure.
MiguelCompany Dec 20, 2019
41d0905
Refs #7116. Making send buffers allocation configurable.
MiguelCompany Dec 20, 2019
d691493
Refs #7116. Adding XML configuration for send_buffers allocation.
MiguelCompany Jan 2, 2020
a16a233
Refs #7116. Adding participant allocation attributes to XML parser un…
MiguelCompany Jan 2, 2020
f5b043f
Refs #7165. Fixed warnings.
MiguelCompany Jan 3, 2020
2fe275a
Refs #7165. Fixed leak.
MiguelCompany Jan 3, 2020
897e829
Refs #7165. Return send buffer even if an exception is thrown.
MiguelCompany Jan 3, 2020
094d100
Refs #7165. Correctly return send buffer.
MiguelCompany Jan 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ struct RemoteLocatorsAllocationAttributes
size_t max_multicast_locators = 1u;
};

/**
* @brief Holds limits for send buffers allocations.
*/
struct SendBuffersAllocationAttributes
{
/** Initial number of send buffers to allocate.
*
* This attribute controls the initial number of send buffers to be allocated.
* The default value of 0 will perform an initial guess of the number of buffers
* required, based on the number of threads from which a send operation could be
* started.
*/
size_t preallocated_number = 0u;

/** Whether the number of send buffers is allowed to grow.
*
* This attribute controls how the buffer manager behaves when a send buffer is not
* available. When true, a new buffer will be created. When false, it will wait for a
* buffer to be returned. This is a tradeoff between latency and dynamic allocations.
*/
bool dynamic = false;
};

/**
* @brief Holds allocation limits affecting collections managed by a participant.
*/
Expand All @@ -62,6 +85,8 @@ struct RTPSParticipantAllocationAttributes
ResourceLimitedContainerConfig readers;
//! Defines the allocation behaviour for collections dependent on the total number of writers per participant.
ResourceLimitedContainerConfig writers;
//! Defines the allocation behaviour for the send buffer manager.
SendBuffersAllocationAttributes send_buffers;

//! @return the allocation config for the total of readers in the system (participants * readers)
ResourceLimitedContainerConfig total_readers() const
Expand Down
18 changes: 18 additions & 0 deletions include/fastrtps/rtps/common/CDRMessage_t.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,24 @@ struct RTPS_DllAPI CDRMessage_t{
return *(this);
}

void init(
octet* buffer_ptr,
uint32_t size)
{
assert(buffer == nullptr);
wraps = true;
pos = 0;
length = 0;
buffer = buffer_ptr;
max_size = size;

#if __BIG_ENDIAN__
msg_endian = BIGEND;
#else
msg_endian = LITTLEEND;
#endif
}

void reserve(
uint32_t size)
{
Expand Down
2 changes: 2 additions & 0 deletions include/fastrtps/rtps/messages/RTPSMessageGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace rtps {

class RTPSParticipantImpl;
class Endpoint;
class RTPSMessageGroup_t;

/**
* RTPSMessageGroup Class used to construct a RTPS message.
Expand Down Expand Up @@ -229,6 +230,7 @@ class Endpoint;

std::chrono::steady_clock::time_point max_blocking_time_point_;

std::unique_ptr<RTPSMessageGroup_t> send_buffer_;
};

} /* namespace rtps */
Expand Down
5 changes: 5 additions & 0 deletions include/fastrtps/xmlparser/XMLParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,11 @@ class XMLParser
rtps::RemoteLocatorsAllocationAttributes& allocation,
uint8_t ident);

RTPS_DllAPI static XMLP_ret getXMLSendBuffersAllocationAttributes(
tinyxml2::XMLElement* elem,
rtps::SendBuffersAllocationAttributes& allocation,
uint8_t ident);

RTPS_DllAPI static XMLP_ret getXMLDiscoverySettings(
tinyxml2::XMLElement* elem,
rtps::DiscoverySettings& settings,
Expand Down
3 changes: 3 additions & 0 deletions include/fastrtps/xmlparser/XMLParserCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ extern const char* MAX_MULTICAST_LOCATORS;
extern const char* TOTAL_PARTICIPANTS;
extern const char* TOTAL_READERS;
extern const char* TOTAL_WRITERS;
extern const char* SEND_BUFFERS;
extern const char* PREALLOCATED_NUMBER;
extern const char* DYNAMIC_LC;

/// Publisher-subscriber attributes
extern const char* TOPIC;
Expand Down
8 changes: 8 additions & 0 deletions resources/xsd/fastRTPS_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,20 @@
</xs:all>
</xs:complexType>

<xs:complexType name="sendBuffersAllocationConfigType">
<xs:all minOccurs="0">
<xs:element name="preallocated_number" type="uint32Type" minOccurs="0"/>
<xs:element name="dynamic" type="boolType" minOccurs="0"/>
</xs:all>
</xs:complexType>

<xs:complexType name="rtpsParticipantAllocationAttributesType">
<xs:all minOccurs="0">
<xs:element name="remote_locators" type="remoteLocatorsAllocationConfigType" minOccurs="0"/>
<xs:element name="total_participants" type="containerAllocationConfigType" minOccurs="0"/>
<xs:element name="total_readers" type="containerAllocationConfigType" minOccurs="0"/>
<xs:element name="total_writers" type="containerAllocationConfigType" minOccurs="0"/>
<xs:element name="send_buffers" type="sendBuffersAllocationConfigType" minOccurs="0"/>
</xs:all>
</xs:complexType>

Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ set(${PROJECT_NAME}_source_files
rtps/messages/RTPSMessageCreator.cpp
rtps/messages/RTPSMessageGroup.cpp
rtps/messages/RTPSGapBuilder.cpp
rtps/messages/SendBuffersManager.cpp
rtps/messages/MessageReceiver.cpp
rtps/messages/submessages/AckNackMsg.hpp
rtps/messages/submessages/DataMsg.hpp
Expand Down
16 changes: 8 additions & 8 deletions src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,23 +187,23 @@ RTPSParticipant* RTPSDomain::createParticipant(
pimpl = new RTPSParticipantImpl(PParam, guidP, p, listen);
}

#if HAVE_SECURITY
// Check security was correctly initialized
if (!pimpl->is_security_initialized())
// Check there is at least one transport registered.
if (!pimpl->networkFactoryHasRegisteredTransports())
{
logError(RTPS_PARTICIPANT, "Cannot create participant due to security initialization error");
logError(RTPS_PARTICIPANT, "Cannot create participant, because there is any transport");
delete pimpl;
return nullptr;
}
#endif

// Check there is at least one transport registered.
if (!pimpl->networkFactoryHasRegisteredTransports())
#if HAVE_SECURITY
// Check security was correctly initialized
if (!pimpl->is_security_initialized())
{
logError(RTPS_PARTICIPANT, "Cannot create participant, because there is any transport");
logError(RTPS_PARTICIPANT, "Cannot create participant due to security initialization error");
delete pimpl;
return nullptr;
}
#endif

{
std::lock_guard<std::mutex> guard(m_mutex);
Expand Down
85 changes: 16 additions & 69 deletions src/cpp/rtps/messages/RTPSMessageGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "../participant/RTPSParticipantImpl.h"
#include "../flowcontrol/FlowController.h"
#include "RTPSGapBuilder.hpp"
#include "RTPSMessageGroup_t.hpp"

#include <fastrtps/log/Log.h>

Expand All @@ -32,56 +33,6 @@ namespace eprosima {
namespace fastrtps {
namespace rtps {

/**
* Class RTPSMessageGroup_t that contains the messages used to send multiples changes as one message.
* @ingroup WRITER_MODULE
*/
class RTPSMessageGroup_t
{
public:

RTPSMessageGroup_t()
: rtpsmsg_submessage_(0u)
, rtpsmsg_fullmsg_(0u)
#if HAVE_SECURITY
, rtpsmsg_encrypt_(0u)
#endif
{
}

void init(
#if HAVE_SECURITY
bool has_security,
#endif
uint32_t payload,
const GuidPrefix_t& participant_guid)
{

rtpsmsg_fullmsg_.reserve(payload);
rtpsmsg_submessage_.reserve(payload);

#if HAVE_SECURITY
if (has_security)
{
rtpsmsg_encrypt_.reserve(payload);
}
#endif

CDRMessage::initCDRMsg(&rtpsmsg_fullmsg_);
RTPSMessageCreator::addHeader(&rtpsmsg_fullmsg_, participant_guid);
}

CDRMessage_t rtpsmsg_submessage_;

CDRMessage_t rtpsmsg_fullmsg_;

#if HAVE_SECURITY
CDRMessage_t rtpsmsg_encrypt_;
#endif
};

static thread_local std::unique_ptr<RTPSMessageGroup_t> tls_group;

bool sort_changes_group (CacheChange_t* c1,CacheChange_t* c2)
{
return(c1->sequenceNumber < c2->sequenceNumber);
Expand Down Expand Up @@ -158,30 +109,16 @@ RTPSMessageGroup::RTPSMessageGroup(
, encrypt_msg_(nullptr)
#endif
, max_blocking_time_point_(max_blocking_time_point)
, send_buffer_(participant->get_send_buffer())
{
// Avoid warning when neither SECURITY nor DEBUG is used
(void)participant;

assert(participant);
assert(endpoint);

if (!tls_group)
{
tls_group.reset(new RTPSMessageGroup_t());
}

uint32_t max_payload_size = participant->getMaxMessageSize();
const GuidPrefix_t& guid_prefix = participant->getGuid().guidPrefix;

#if HAVE_SECURITY
tls_group->init(participant->is_secure(), max_payload_size, guid_prefix);
#else
tls_group->init(max_payload_size, guid_prefix);
#endif

full_msg_ = &(tls_group->rtpsmsg_fullmsg_);
submessage_msg_ = &(tls_group->rtpsmsg_submessage_);

full_msg_ = &(send_buffer_->rtpsmsg_fullmsg_);
submessage_msg_ = &(send_buffer_->rtpsmsg_submessage_);

// Init RTPS message.
reset_to_header();
Expand All @@ -191,15 +128,25 @@ RTPSMessageGroup::RTPSMessageGroup(
#if HAVE_SECURITY
if (participant->is_secure())
{
encrypt_msg_ = &(tls_group->rtpsmsg_encrypt_);
encrypt_msg_ = &(send_buffer_->rtpsmsg_encrypt_);
CDRMessage::initCDRMsg(encrypt_msg_);
}
#endif
}

RTPSMessageGroup::~RTPSMessageGroup() noexcept(false)
{
send();
try
{
send();
}
catch (...)
{
participant_->return_send_buffer(std::move(send_buffer_));
throw;
}

participant_->return_send_buffer(std::move(send_buffer_));
}

void RTPSMessageGroup::reset_to_header()
Expand Down
Loading