Skip to content

Commit

Permalink
Reduce proxies memory consumption (#916)
Browse files Browse the repository at this point in the history
  • Loading branch information
MiguelCompany committed Jan 28, 2020
1 parent 0f27014 commit 1e78f83
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 88 deletions.
39 changes: 19 additions & 20 deletions include/fastrtps/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,12 @@ class ReaderProxy
* Constructor.
* @param times WriterTimes to use in the ReaderProxy.
* @param loc_alloc Maximum number of remote locators to keep in the ReaderProxy.
* @param limits Maximum size allowed for variable length fields on messages
* @param writer Pointer to the StatefulWriter creating the reader proxy.
*/
ReaderProxy(
const WriterTimes& times,
const RemoteLocatorsAllocationAttributes& loc_alloc,
const VariableLengthDataLimits& limits,
StatefulWriter* writer);
StatefulWriter* writer);

/**
* Activate this proxy associating it to a remote reader.
Expand Down Expand Up @@ -252,7 +250,7 @@ class ReaderProxy
*/
inline const GUID_t& guid() const
{
return reader_attributes_.guid();
return locator_info_.remote_guid();
}

/**
Expand All @@ -261,7 +259,7 @@ class ReaderProxy
*/
inline DurabilityKind_t durability_kind() const
{
return reader_attributes_.m_qos.m_durability.durabilityKind();
return durability_kind_;
}

/**
Expand All @@ -270,7 +268,7 @@ class ReaderProxy
*/
inline bool expects_inline_qos() const
{
return reader_attributes_.m_expectsInlineQos;
return expects_inline_qos_;
}

/**
Expand All @@ -279,26 +277,21 @@ class ReaderProxy
*/
inline bool is_reliable() const
{
return reader_attributes_.m_qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS;
return is_reliable_;
}

/**
* Check if the reader represented by this proxy is remote and reliable.
* @return true if the reader represented by this proxy is remote and reliable.
*/
inline bool is_remote_and_reliable() const
inline bool disable_positive_acks() const
{
return !locator_info_.is_local_reader() &&
reader_attributes_.m_qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS;
return disable_positive_acks_;
}

/**
* Get the attributes of the reader represented by this proxy.
* @return the attributes of the reader represented by this proxy.
* Check if the reader represented by this proxy is remote and reliable.
* @return true if the reader represented by this proxy is remote and reliable.
*/
inline const ReaderProxyData& reader_attributes() const
inline bool is_remote_and_reliable() const
{
return reader_attributes_;
return !locator_info_.is_local_reader() && is_reliable_;
}

/**
Expand Down Expand Up @@ -399,8 +392,14 @@ class ReaderProxy
bool is_active_;
//!Reader locator information
ReaderLocator locator_info_;
//!Attributes of the Remote Reader
ReaderProxyData reader_attributes_;
//!Taken from QoS
DurabilityKind_t durability_kind_;
//!Taken from QoS
bool expects_inline_qos_;
//!Taken from QoS
bool is_reliable_;
//!Taken from QoS
bool disable_positive_acks_;
//!Pointer to the associated StatefulWriter.
StatefulWriter* writer_;
//!Set of the changes and its state.
Expand Down
12 changes: 6 additions & 6 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ StatefulReader::StatefulReader(
const RTPSParticipantAttributes& part_att = pimpl->getRTPSParticipantAttributes();
for (size_t n = 0; n < att.matched_writers_allocation.initial; ++n)
{
matched_writers_pool_.push_back(new WriterProxy(this, part_att.allocation.locators, part_att.allocation.data_limits, proxy_changes_config_));
matched_writers_pool_.push_back(new WriterProxy(this, part_att.allocation.locators, proxy_changes_config_));
}
}

Expand Down Expand Up @@ -126,7 +126,7 @@ bool StatefulReader::matched_writer_add(
if (matched_writers_.size() + matched_writers_pool_.size() < max_readers)
{
const RTPSParticipantAttributes& part_att = mp_RTPSParticipant->getRTPSParticipantAttributes();
wp = new WriterProxy(this, part_att.allocation.locators, part_att.allocation.data_limits, proxy_changes_config_);
wp = new WriterProxy(this, part_att.allocation.locators, proxy_changes_config_);
}
else
{
Expand Down Expand Up @@ -214,7 +214,7 @@ bool StatefulReader::matched_writer_remove(

wproxy = *it;
matched_writers_.erase(it);
remove_persistence_guid(wproxy->guid(), wproxy->attributes().persistence_guid());
remove_persistence_guid(wproxy->guid(), wproxy->persistence_guid());
break;
}
}
Expand Down Expand Up @@ -311,7 +311,7 @@ bool StatefulReader::processDataMsg(
if (liveliness_lease_duration_ < c_TimeInfinite)
{
if (liveliness_kind_ == MANUAL_BY_TOPIC_LIVELINESS_QOS ||
pWP->attributes().m_qos.m_liveliness.kind == MANUAL_BY_TOPIC_LIVELINESS_QOS)
pWP->liveliness_kind() == MANUAL_BY_TOPIC_LIVELINESS_QOS)
{
auto wlp = this->mp_RTPSParticipant->wlp();
if (wlp != nullptr)
Expand Down Expand Up @@ -406,7 +406,7 @@ bool StatefulReader::processDataFragMsg(
if (liveliness_lease_duration_ < c_TimeInfinite)
{
if (liveliness_kind_ == MANUAL_BY_TOPIC_LIVELINESS_QOS ||
pWP->attributes().m_qos.m_liveliness.kind == MANUAL_BY_TOPIC_LIVELINESS_QOS)
pWP->liveliness_kind() == MANUAL_BY_TOPIC_LIVELINESS_QOS)
{
auto wlp = this->mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
Expand Down Expand Up @@ -541,7 +541,7 @@ bool StatefulReader::processHeartbeatMsg(
if (liveliness_lease_duration_ < c_TimeInfinite)
{
if (liveliness_kind_ == MANUAL_BY_TOPIC_LIVELINESS_QOS ||
writer->attributes().m_qos.m_liveliness.kind == MANUAL_BY_TOPIC_LIVELINESS_QOS)
writer->liveliness_kind() == MANUAL_BY_TOPIC_LIVELINESS_QOS)
{
auto wlp = this->mp_RTPSParticipant->wlp();
if ( wlp != nullptr)
Expand Down
36 changes: 23 additions & 13 deletions src/cpp/rtps/reader/WriterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ constexpr size_t changes_node_size = memory::set_node_size<std::pair<size_t, Seq
WriterProxy::WriterProxy(
StatefulReader* reader,
const RemoteLocatorsAllocationAttributes& loc_alloc,
const VariableLengthDataLimits& limits,
const ResourceLimitedContainerConfig& changes_allocation)
: reader_(reader)
, attributes_(loc_alloc.max_unicast_locators, loc_alloc.max_multicast_locators, limits)
, heartbeat_response_(nullptr)
, initial_acknack_(nullptr)
, last_heartbeat_count_(0)
Expand All @@ -74,6 +72,9 @@ WriterProxy::WriterProxy(
, guid_as_vector_(ResourceLimitedContainerConfig::fixed_size_configuration(1u))
, guid_prefix_as_vector_(ResourceLimitedContainerConfig::fixed_size_configuration(1u))
, is_on_same_process_(false)
, ownership_strength_(0)
, liveliness_kind_(AUTOMATIC_LIVELINESS_QOS)
, locators_entry_(loc_alloc.max_unicast_locators, loc_alloc.max_multicast_locators)
{
//Create Events
heartbeat_response_ =
Expand Down Expand Up @@ -108,11 +109,16 @@ void WriterProxy::start(
heartbeat_response_->update_interval(reader_->getTimes().heartbeatResponseDelay);
initial_acknack_->update_interval(reader_->getTimes().initialAcknackDelay);

attributes_ = attributes;
guid_as_vector_.push_back(attributes_.guid());
guid_prefix_as_vector_.push_back(attributes_.guid().guidPrefix);
locators_entry_.remote_guid = attributes.guid();
guid_as_vector_.push_back(attributes.guid());
guid_prefix_as_vector_.push_back(attributes.guid().guidPrefix);
persistence_guid_ = attributes.persistence_guid();
is_alive_ = true;
is_on_same_process_ = RTPSDomainImpl::should_intraprocess_between(reader_->getGuid(), attributes_.guid());
is_on_same_process_ = RTPSDomainImpl::should_intraprocess_between(reader_->getGuid(), attributes.guid());
ownership_strength_ = attributes.m_qos.m_ownershipStrength.value;
liveliness_kind_ = attributes.m_qos.m_liveliness.kind;
locators_entry_.unicast = attributes.remote_locators().unicast;
locators_entry_.multicast = attributes.remote_locators().multicast;

initial_acknack_->restart_timer();
loaded_from_storage(initial_sequence);
Expand All @@ -126,7 +132,9 @@ void WriterProxy::update(
#endif

assert(is_alive_);
attributes_ = attributes;
ownership_strength_ = attributes.m_qos.m_ownershipStrength.value;
locators_entry_.unicast = attributes.remote_locators().unicast;
locators_entry_.multicast = attributes.remote_locators().multicast;
}

void WriterProxy::stop()
Expand All @@ -140,7 +148,9 @@ void WriterProxy::stop()
void WriterProxy::clear()
{
is_alive_ = false;
attributes_.guid(c_Guid_Unknown);
locators_entry_.unicast.clear();
locators_entry_.multicast.clear();
locators_entry_.remote_guid = c_Guid_Unknown;
last_heartbeat_count_ = 0;
heartbeat_final_flag_.store(false);
guid_as_vector_.clear();
Expand All @@ -165,7 +175,7 @@ void WriterProxy::missing_changes_update(
assert(get_mutex_owner() == get_thread_id());
#endif

logInfo(RTPS_READER, attributes_.guid().entityId << ": changes up to seq_num: " << seq_num << " missing.");
logInfo(RTPS_READER, guid().entityId << ": changes up to seq_num: " << seq_num << " missing.");

// Check was not removed from container.
if (seq_num > changes_from_writer_low_mark_)
Expand All @@ -184,7 +194,7 @@ void WriterProxy::lost_changes_update(
assert(get_mutex_owner() == get_thread_id());
#endif

logInfo(RTPS_READER, attributes_.guid().entityId << ": up to seq_num: " << seq_num);
logInfo(RTPS_READER, guid().entityId << ": up to seq_num: " << seq_num);

// Check was not removed from container.
if (seq_num > changes_from_writer_low_mark_)
Expand All @@ -208,7 +218,7 @@ void WriterProxy::lost_changes_update(
bool WriterProxy::received_change_set(
const SequenceNumber_t& seq_num)
{
logInfo(RTPS_READER, attributes_.guid().entityId << ": seq_num: " << seq_num);
logInfo(RTPS_READER, guid().entityId << ": seq_num: " << seq_num);
return received_change_set(seq_num, true);
}

Expand Down Expand Up @@ -464,11 +474,11 @@ void WriterProxy::perform_initial_ack_nack() const
SequenceNumberSet_t sns(SequenceNumber_t(0, 0));
if (is_on_same_process_)
{
RTPSWriter* writer = RTPSDomainImpl::find_local_writer(attributes_.guid());
RTPSWriter* writer = RTPSDomainImpl::find_local_writer(guid());
if (writer)
{
bool tmp;
writer->process_acknack(attributes_.guid(), reader_->getGuid(), 1, SequenceNumberSet_t(), false, tmp);
writer->process_acknack(guid(), reader_->getGuid(), 1, SequenceNumberSet_t(), false, tmp);
}
}
else
Expand Down
42 changes: 24 additions & 18 deletions src/cpp/rtps/reader/WriterProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
#include <fastrtps/rtps/common/Types.h>
#include <fastrtps/rtps/common/Locator.h>
#include <fastrtps/rtps/common/CacheChange.h>
#include <fastrtps/rtps/attributes/ReaderAttributes.h>
#include <fastrtps/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
#include <fastrtps/rtps/messages/RTPSMessageSenderInterface.hpp>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
#include <fastrtps/rtps/builtin/data/WriterProxyData.h>
#include <fastrtps/rtps/common/LocatorSelectorEntry.hpp>

#include <foonathan/memory/container.hpp>
#include <foonathan/memory/memory_pool.hpp>
Expand Down Expand Up @@ -68,7 +68,6 @@ class WriterProxy : public RTPSMessageSenderInterface
WriterProxy(
StatefulReader* reader,
const RemoteLocatorsAllocationAttributes& loc_alloc,
const VariableLengthDataLimits& limits,
const ResourceLimitedContainerConfig& changes_allocation);

/**
Expand Down Expand Up @@ -158,22 +157,23 @@ class WriterProxy : public RTPSMessageSenderInterface
size_t unknown_missing_changes_up_to(
const SequenceNumber_t& seq_num) const;

/**
* Get the attributes of the writer represented by this proxy.
* @return const reference to the attributes of the writer represented by this proxy.
*/
inline const WriterProxyData& attributes() const
{
return attributes_;
}

/**
* Get the GUID of the writer represented by this proxy.
* @return const reference to the GUID of the writer represented by this proxy.
*/
inline const GUID_t& guid() const
{
return attributes_.guid();
return locators_entry_.remote_guid;
}

inline const GUID_t& persistence_guid() const
{
return persistence_guid_;
}

inline LivelinessQosPolicyKind liveliness_kind() const
{
return liveliness_kind_;
}

/**
Expand All @@ -182,7 +182,7 @@ class WriterProxy : public RTPSMessageSenderInterface
*/
inline uint32_t ownership_strength() const
{
return attributes_.m_qos.m_ownershipStrength.value;
return ownership_strength_;
}

/**
Expand All @@ -191,9 +191,9 @@ class WriterProxy : public RTPSMessageSenderInterface
*/
inline const ResourceLimitedVector<Locator_t>& remote_locators_shrinked() const
{
return attributes_.remote_locators().unicast.empty() ?
attributes_.remote_locators().multicast :
attributes_.remote_locators().unicast;
return locators_entry_.unicast.empty() ?
locators_entry_.multicast :
locators_entry_.unicast;
}

/**
Expand Down Expand Up @@ -339,8 +339,6 @@ class WriterProxy : public RTPSMessageSenderInterface

//! Pointer to associated StatefulReader.
StatefulReader* reader_;
//! Parameters of the WriterProxy
WriterProxyData attributes_;
//!Timed event to postpone the heartbeatResponse.
TimedEvent* heartbeat_response_;
//! Timed event to send initial acknack.
Expand Down Expand Up @@ -371,6 +369,14 @@ class WriterProxy : public RTPSMessageSenderInterface
ResourceLimitedVector<GuidPrefix_t> guid_prefix_as_vector_;
//! Is the writer on the same process
bool is_on_same_process_;
//! Taken from QoS
uint32_t ownership_strength_;
//! Taken from QoS
LivelinessQosPolicyKind liveliness_kind_;
//! Taken from proxy data
GUID_t persistence_guid_;
//! Taken from proxy data
LocatorSelectorEntry locators_entry_;

using ChangeIterator = decltype(changes_received_)::iterator;

Expand Down
Loading

0 comments on commit 1e78f83

Please sign in to comment.