Skip to content

Commit

Permalink
Refs 5751. Built-in endpoints history configuration
Browse files Browse the repository at this point in the history
This is a port of #907 from 1.9.x
  • Loading branch information
IkerLuengo committed Mar 3, 2020
1 parent dcd6942 commit 6f862d0
Show file tree
Hide file tree
Showing 16 changed files with 516 additions and 380 deletions.
10 changes: 10 additions & 0 deletions include/fastdds/rtps/attributes/RTPSParticipantAttributes.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ typedef enum ParticipantFilteringFlags : uint32_t
FILTER_SAME_PROCESS = 0x4
} ParticipantFilteringFlags_t;

#define BUILTIN_DATA_MAX_SIZE 5000

//! PDP factory for EXTERNAL type
class PDP;
class BuiltinProtocols;
Expand Down Expand Up @@ -294,9 +296,15 @@ class BuiltinAttributes
//! Memory policy for builtin readers
MemoryManagementPolicy_t readerHistoryMemoryPolicy = MemoryManagementPolicy_t::PREALLOCATED_MEMORY_MODE;

//! Maximum payload size for builtin readers
uint32_t readerPayloadSize = BUILTIN_DATA_MAX_SIZE;

//! Memory policy for builtin writers
MemoryManagementPolicy_t writerHistoryMemoryPolicy= MemoryManagementPolicy_t::PREALLOCATED_MEMORY_MODE;

//! Maximum payload size for builtin writers
uint32_t writerPayloadSize = BUILTIN_DATA_MAX_SIZE;

//! Mutation tries if the port is being used.
uint32_t mutation_tries = 100u;

Expand All @@ -318,7 +326,9 @@ class BuiltinAttributes
(this->metatrafficMulticastLocatorList == b.metatrafficMulticastLocatorList) &&
(this->initialPeersList == b.initialPeersList) &&
(this->readerHistoryMemoryPolicy == b.readerHistoryMemoryPolicy) &&
(this->readerPayloadSize == b.readerPayloadSize) &&
(this->writerHistoryMemoryPolicy == b.writerHistoryMemoryPolicy) &&
(this->writerPayloadSize == b.writerPayloadSize) &&
(this->mutation_tries == b.mutation_tries) &&
(this->avoid_builtin_multicast == b.avoid_builtin_multicast);
}
Expand Down
4 changes: 0 additions & 4 deletions include/fastdds/rtps/builtin/data/ParticipantProxyData.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@

#include <chrono>

#define DISCOVERY_PARTICIPANT_DATA_MAX_SIZE 5000
#define DISCOVERY_TOPIC_DATA_MAX_SIZE 500
#define DISCOVERY_PUBLICATION_DATA_MAX_SIZE 5000
#define DISCOVERY_SUBSCRIPTION_DATA_MAX_SIZE 5000
#define BUILTIN_PARTICIPANT_DATA_MAX_SIZE 100
#define TYPELOOKUP_DATA_MAX_SIZE 5000

Expand Down
2 changes: 2 additions & 0 deletions include/fastrtps/xmlparser/XMLParserCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ extern const char* PUBREADER_SUBWRITER;
extern const char* STATIC_ENDPOINT_XML;
extern const char* READER_HIST_MEM_POLICY;
extern const char* WRITER_HIST_MEM_POLICY;
extern const char* READER_PAYLOAD_SIZE;
extern const char* WRITER_PAYLOAD_SIZE;
extern const char* MUTATION_TRIES;
extern const char* ACCESS_SCOPE;
extern const char* ENABLED;
Expand Down
2 changes: 2 additions & 0 deletions resources/xsd/fastRTPS_profiles.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@
<xs:element name="initialPeersList" type="locatorListType" minOccurs="0"/>
<xs:element name="readerHistoryMemoryPolicy" type="historyMemoryPolicyType" minOccurs="0"/>
<xs:element name="writerHistoryMemoryPolicy" type="historyMemoryPolicyType" minOccurs="0"/>
<xs:element name="readerPayloadSize" type="uint32Type" minOccurs="0"/>
<xs:element name="writerPayloadSize" type="uint32Type" minOccurs="0"/>
<xs:element name="mutation_tries" type="uint32Type" minOccurs="0"/>
<xs:element name="avoid_builtin_multicast" type="boolType" minOccurs="0"/>
</xs:all>
Expand Down
94 changes: 52 additions & 42 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,35 @@
#include <mutex>

namespace eprosima {
namespace fastrtps{
namespace fastrtps {
namespace rtps {

bool EDPClient::processLocalReaderProxyData(
RTPSReader* local_reader,
ReaderProxyData* rdata)
RTPSReader* local_reader,
ReaderProxyData* rdata)
{
logInfo(RTPS_EDP,rdata->guid().entityId);
logInfo(RTPS_EDP, rdata->guid().entityId);
(void)local_reader;

auto* writer = &subscriptions_writer_;

#if HAVE_SECURITY
if(local_reader->getAttributes().security_attributes().is_discovery_protected)
if (local_reader->getAttributes().security_attributes().is_discovery_protected)
{
writer = &subscriptions_secure_writer_;
}
#endif

if(writer->first != nullptr)
if (writer->first != nullptr)
{
// TODO(Ricardo) Write a getCdrSerializedPayload for ReaderProxyData.
CacheChange_t* change = writer->first->new_change([]() -> uint32_t
{return DISCOVERY_SUBSCRIPTION_DATA_MAX_SIZE;},
ALIVE,rdata->key());
CacheChange_t* change = writer->first->new_change([this]() -> uint32_t
{
return mp_PDP->builtin_attributes().writerPayloadSize;
},
ALIVE, rdata->key());

if(change !=nullptr)
if (change != nullptr)
{
CDRMessage_t aux_msg(change->serializedPayload);

Expand All @@ -74,9 +76,9 @@ bool EDPClient::processLocalReaderProxyData(

{
std::unique_lock<RecursiveTimedMutex> lock(*writer->second->getMutex());
for(auto ch = writer->second->changesBegin(); ch != writer->second->changesEnd(); ++ch)
for (auto ch = writer->second->changesBegin(); ch != writer->second->changesEnd(); ++ch)
{
if((*ch)->instanceHandle == change->instanceHandle)
if ((*ch)->instanceHandle == change->instanceHandle)
{
writer->second->remove_change(*ch);
break;
Expand Down Expand Up @@ -104,27 +106,29 @@ bool EDPClient::processLocalReaderProxyData(
}

bool EDPClient::processLocalWriterProxyData(
RTPSWriter* local_writer,
WriterProxyData* wdata)
RTPSWriter* local_writer,
WriterProxyData* wdata)
{
logInfo(RTPS_EDP, wdata->guid().entityId);
(void)local_writer;

auto* writer = &publications_writer_;

#if HAVE_SECURITY
if(local_writer->getAttributes().security_attributes().is_discovery_protected)
if (local_writer->getAttributes().security_attributes().is_discovery_protected)
{
writer = &publications_secure_writer_;
}
#endif

if(writer->first !=nullptr)
if (writer->first != nullptr)
{
CacheChange_t* change = writer->first->new_change([]() -> uint32_t
{return DISCOVERY_PUBLICATION_DATA_MAX_SIZE;},
ALIVE, wdata->key());
if(change != nullptr)
CacheChange_t* change = writer->first->new_change([this]() -> uint32_t
{
return mp_PDP->builtin_attributes().writerPayloadSize;
},
ALIVE, wdata->key());
if (change != nullptr)
{
//wdata->toParameterList();

Expand All @@ -143,9 +147,9 @@ bool EDPClient::processLocalWriterProxyData(

{
std::unique_lock<RecursiveTimedMutex> lock(*writer->second->getMutex());
for(auto ch = writer->second->changesBegin(); ch != writer->second->changesEnd(); ++ch)
for (auto ch = writer->second->changesBegin(); ch != writer->second->changesEnd(); ++ch)
{
if((*ch)->instanceHandle == change->instanceHandle)
if ((*ch)->instanceHandle == change->instanceHandle)
{
writer->second->remove_change(*ch);
break;
Expand All @@ -170,33 +174,36 @@ bool EDPClient::processLocalWriterProxyData(
return true;
}

bool EDPClient::removeLocalWriter(RTPSWriter* W)
bool EDPClient::removeLocalWriter(
RTPSWriter* W)
{
logInfo(RTPS_EDP,W->getGuid().entityId);
logInfo(RTPS_EDP, W->getGuid().entityId);

auto* writer = &publications_writer_;

#if HAVE_SECURITY
if(W->getAttributes().security_attributes().is_discovery_protected)
if (W->getAttributes().security_attributes().is_discovery_protected)
{
writer = &publications_secure_writer_;
}
#endif

if(writer->first!=nullptr)
if (writer->first != nullptr)
{
InstanceHandle_t iH;
iH = W->getGuid();
CacheChange_t* change = writer->first->new_change([]() -> uint32_t
{return DISCOVERY_PUBLICATION_DATA_MAX_SIZE;},
NOT_ALIVE_DISPOSED_UNREGISTERED,iH);
if(change != nullptr)
CacheChange_t* change = writer->first->new_change([this]() -> uint32_t
{
return mp_PDP->builtin_attributes().writerPayloadSize;
},
NOT_ALIVE_DISPOSED_UNREGISTERED, iH);
if (change != nullptr)
{
{
std::lock_guard<RecursiveTimedMutex> guard(*writer->second->getMutex());
for(auto ch = writer->second->changesBegin(); ch != writer->second->changesEnd(); ++ch)
for (auto ch = writer->second->changesBegin(); ch != writer->second->changesEnd(); ++ch)
{
if((*ch)->instanceHandle == change->instanceHandle)
if ((*ch)->instanceHandle == change->instanceHandle)
{
writer->second->remove_change(*ch);
break;
Expand All @@ -219,33 +226,36 @@ bool EDPClient::removeLocalWriter(RTPSWriter* W)
return mp_PDP->removeWriterProxyData(W->getGuid());
}

bool EDPClient::removeLocalReader(RTPSReader* R)
bool EDPClient::removeLocalReader(
RTPSReader* R)
{
logInfo(RTPS_EDP,R->getGuid().entityId);
logInfo(RTPS_EDP, R->getGuid().entityId);

auto* writer = &subscriptions_writer_;

#if HAVE_SECURITY
if(R->getAttributes().security_attributes().is_discovery_protected)
if (R->getAttributes().security_attributes().is_discovery_protected)
{
writer = &subscriptions_secure_writer_;
}
#endif

if(writer->first!=nullptr)
if (writer->first != nullptr)
{
InstanceHandle_t iH;
iH = (R->getGuid());
CacheChange_t* change = writer->first->new_change([]() -> uint32_t
{return DISCOVERY_SUBSCRIPTION_DATA_MAX_SIZE;},
NOT_ALIVE_DISPOSED_UNREGISTERED,iH);
if(change != nullptr)
CacheChange_t* change = writer->first->new_change([this]() -> uint32_t
{
return mp_PDP->builtin_attributes().writerPayloadSize;
},
NOT_ALIVE_DISPOSED_UNREGISTERED, iH);
if (change != nullptr)
{
{
std::lock_guard<RecursiveTimedMutex> guard(*writer->second->getMutex());
for(auto ch = writer->second->changesBegin(); ch != writer->second->changesEnd(); ++ch)
for (auto ch = writer->second->changesBegin(); ch != writer->second->changesEnd(); ++ch)
{
if((*ch)->instanceHandle == change->instanceHandle)
if ((*ch)->instanceHandle == change->instanceHandle)
{
writer->second->remove_change(*ch);
break;
Expand Down
Loading

0 comments on commit 6f862d0

Please sign in to comment.