Skip to content

Commit

Permalink
Avoid temporary allocations when not using flow controllers
Browse files Browse the repository at this point in the history
This is a port of #943

* Refs #7013. Uncrustify

* Refs #7013. Templating duplicated code.

* Refs #7013. Reducing number of scheduled timed events.

* Refs #7060. Avoiding usage of RTPSWriterCollector on StatelessWriter.

* Refs #7060. Separated methods on StatefulWriter.

* Refs #7060. Allow separate sending on async wrtiters.

* Refs #7060. Sending separatedly when readers do not share locators.

* Refs #7060. Adding ReaderProxy::change_is_unsent.

* Refs #7060. Sending all fragments when flow controllers not defined.

* Refs #7060. Improving send_changes_separatedly.

* Refs #7060. Fixed failing tests.

* Refs #7060. When history is empty, async thread just sends heartbeat.

* Edit documentation

Co-authored-by: IkerLuengo <57146230+IkerLuengo@users.noreply.github.com>
  • Loading branch information
IkerLuengo and IkerLuengo committed Mar 2, 2020
1 parent 9b7f143 commit cf59876
Show file tree
Hide file tree
Showing 8 changed files with 908 additions and 447 deletions.
10 changes: 10 additions & 0 deletions include/fastdds/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ class ReaderProxy
bool change_is_acked(
const SequenceNumber_t& seq_num) const;

/**
* Check if a specific change is marked to be sent to this reader.
* @param[in] seq_num Sequence number of the change to be checked.
* @param[out] is_irrelevant Will be forced to true if change is irrelevant for this reader.
* @return true when the change is marked to be sent, false otherwise.
*/
bool change_is_unsent(
const SequenceNumber_t& seq_num,
bool& is_irrelevant) const;

/**
* Mark all changes up to the one indicated by seq_num as Acknowledged.
* For instance, when seq_num is 30, changes 1-29 are marked as acknowledged.
Expand Down
16 changes: 16 additions & 0 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,20 @@ class StatefulWriter : public RTPSWriter
*/
bool ack_timer_expired();

void send_heartbeat_to_all_readers();

void send_changes_separatedly(
SequenceNumber_t max_sequence,
bool& activateHeartbeatPeriod);

void send_all_unsent_changes(
SequenceNumber_t max_sequence,
bool& activateHeartbeatPeriod);

void send_unsent_changes_with_flow_control(
SequenceNumber_t max_sequence,
bool& activateHeartbeatPeriod);

//! True to disable piggyback heartbeats
bool disable_heartbeat_piggyback_;
//! True to disable positive ACKs
Expand All @@ -345,6 +359,8 @@ class StatefulWriter : public RTPSWriter

bool there_are_remote_readers_ = false;

bool readers_dont_share_locators_ = true;

StatefulWriter& operator =(
const StatefulWriter&) = delete;
};
Expand Down
4 changes: 4 additions & 0 deletions include/fastdds/rtps/writer/StatelessWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ class StatelessWriter : public RTPSWriter
CacheChange_t* change,
ReaderLocator& reader_locator);

void send_all_unsent_changes();

void send_unsent_changes_with_flow_control();

bool is_inline_qos_expected_ = false;
LocatorList_t fixed_locators_;
ResourceLimitedVector<ReaderLocator> matched_readers_;
Expand Down
145 changes: 78 additions & 67 deletions src/cpp/rtps/flowcontrol/ThroughputController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,119 +22,130 @@
#include <cassert>


namespace eprosima{
namespace fastrtps{
namespace rtps{

ThroughputController::ThroughputController(const ThroughputControllerDescriptor& descriptor, RTPSWriter* associatedWriter):
mBytesPerPeriod(descriptor.bytesPerPeriod),
mAccumulatedPayloadSize(0),
mPeriodMillisecs(descriptor.periodMillisecs),
mAssociatedParticipant(nullptr),
mAssociatedWriter(associatedWriter)
namespace eprosima {
namespace fastrtps {
namespace rtps {

ThroughputController::ThroughputController(
const ThroughputControllerDescriptor& descriptor,
RTPSWriter* associatedWriter)
: mBytesPerPeriod(descriptor.bytesPerPeriod)
, mAccumulatedPayloadSize(0)
, mPeriodMillisecs(descriptor.periodMillisecs)
, mAssociatedParticipant(nullptr)
, mAssociatedWriter(associatedWriter)
{
}

ThroughputController::ThroughputController(const ThroughputControllerDescriptor& descriptor, RTPSParticipantImpl* associatedParticipant):
mBytesPerPeriod(descriptor.bytesPerPeriod),
mAccumulatedPayloadSize(0),
mPeriodMillisecs(descriptor.periodMillisecs),
mAssociatedParticipant(associatedParticipant),
mAssociatedWriter(nullptr)
ThroughputController::ThroughputController(
const ThroughputControllerDescriptor& descriptor,
RTPSParticipantImpl* associatedParticipant)
: mBytesPerPeriod(descriptor.bytesPerPeriod)
, mAccumulatedPayloadSize(0)
, mPeriodMillisecs(descriptor.periodMillisecs)
, mAssociatedParticipant(associatedParticipant)
, mAssociatedWriter(nullptr)
{
}

void ThroughputController::operator()(RTPSWriterCollector<ReaderLocator*>& changesToSend)
void ThroughputController::operator ()(
RTPSWriterCollector<ReaderLocator*>& changesToSend)
{
std::unique_lock<std::recursive_mutex> scopedLock(mThroughputControllerMutex);
process_nts(changesToSend);
}

auto it = changesToSend.items().begin();

while(it != changesToSend.items().end())
{
if(!process_change_nts_(it->cacheChange, it->sequenceNumber, it->fragmentNumber))
break;

++it;
}

changesToSend.items().erase(it, changesToSend.items().end());
void ThroughputController::operator ()(
RTPSWriterCollector<ReaderProxy*>& changesToSend)
{
std::unique_lock<std::recursive_mutex> scopedLock(mThroughputControllerMutex);
process_nts(changesToSend);
}

void ThroughputController::operator()(RTPSWriterCollector<ReaderProxy*>& changesToSend)
void ThroughputController::disable()
{
std::unique_lock<std::recursive_mutex> scopedLock(mThroughputControllerMutex);
mAssociatedWriter = nullptr;
mAssociatedParticipant = nullptr;
}

template<typename Collector>
void ThroughputController::process_nts(Collector& changesToSend)
{
uint32_t size_to_restore = 0;
auto it = changesToSend.items().begin();

while(it != changesToSend.items().end())
while (
it != changesToSend.items().end() &&
process_change_nts_(it->cacheChange, it->sequenceNumber, it->fragmentNumber, &size_to_restore))
{
if(!process_change_nts_(it->cacheChange, it->sequenceNumber, it->fragmentNumber))
break;

++it;
}

changesToSend.items().erase(it, changesToSend.items().end());
}


void ThroughputController::disable()
{
std::unique_lock<std::recursive_mutex> scopedLock(mThroughputControllerMutex);
mAssociatedWriter = nullptr;
mAssociatedParticipant = nullptr;
if (size_to_restore > 0)
{
ScheduleRefresh(size_to_restore);
}
}

bool ThroughputController::process_change_nts_(CacheChange_t* change, const SequenceNumber_t& /*seqNum*/,
const FragmentNumber_t fragNum)
bool ThroughputController::process_change_nts_(
CacheChange_t* change,
const SequenceNumber_t& /*seqNum*/,
const FragmentNumber_t fragNum,
uint32_t* accumulated_size)
{
assert(change != nullptr);

uint32_t dataLength = change->serializedPayload.length;

if (fragNum != 0)
{
dataLength = (fragNum + 1) != change->getFragmentCount() ?
change->getFragmentSize() : change->serializedPayload.length - (fragNum * change->getFragmentSize());
change->getFragmentSize() : change->serializedPayload.length - (fragNum * change->getFragmentSize());
}

if((mAccumulatedPayloadSize + dataLength) <= mBytesPerPeriod)
if ((mAccumulatedPayloadSize + dataLength) <= mBytesPerPeriod)
{
mAccumulatedPayloadSize += dataLength;
ScheduleRefresh(dataLength);
*accumulated_size += dataLength;
return true;
}

return false;
}

void ThroughputController::ScheduleRefresh(uint32_t sizeToRestore)
void ThroughputController::ScheduleRefresh(
uint32_t sizeToRestore)
{
std::shared_ptr<asio::steady_timer> throwawayTimer(std::make_shared<asio::steady_timer>(*FlowController::ControllerService));
std::shared_ptr<asio::steady_timer> throwawayTimer(std::make_shared<asio::steady_timer>(
*FlowController::ControllerService));
auto refresh = [throwawayTimer, this, sizeToRestore]
(const asio::error_code& error)
{
if ((error != asio::error::operation_aborted) &&
FlowController::IsListening(this))
(const asio::error_code& error)
{
std::unique_lock<std::recursive_mutex> scopedLock(mThroughputControllerMutex);
throwawayTimer->cancel();
mAccumulatedPayloadSize = sizeToRestore > mAccumulatedPayloadSize ? 0 : mAccumulatedPayloadSize - sizeToRestore;

if (mAssociatedWriter)
{
mAssociatedWriter->getRTPSParticipant()->async_thread().wake_up(mAssociatedWriter);
}
else if (mAssociatedParticipant)
if ((error != asio::error::operation_aborted) &&
FlowController::IsListening(this))
{
std::unique_lock<std::recursive_mutex> lock(*mAssociatedParticipant->getParticipantMutex());
for (auto it = mAssociatedParticipant->userWritersListBegin();
it != mAssociatedParticipant->userWritersListEnd(); ++it)
std::unique_lock<std::recursive_mutex> scopedLock(mThroughputControllerMutex);
throwawayTimer->cancel();
mAccumulatedPayloadSize = sizeToRestore > mAccumulatedPayloadSize ?
0 : mAccumulatedPayloadSize - sizeToRestore;

if (mAssociatedWriter)
{
mAssociatedWriter->getRTPSParticipant()->async_thread().wake_up(mAssociatedWriter);
}
else if (mAssociatedParticipant)
{
mAssociatedParticipant->async_thread().wake_up(*it);
std::unique_lock<std::recursive_mutex> lock(*mAssociatedParticipant->getParticipantMutex());
for (auto it = mAssociatedParticipant->userWritersListBegin();
it != mAssociatedParticipant->userWritersListEnd(); ++it)
{
mAssociatedParticipant->async_thread().wake_up(*it);
}
}
}
}
};
};

throwawayTimer->expires_from_now(std::chrono::milliseconds(mPeriodMillisecs));
throwawayTimer->async_wait(refresh);
Expand Down
60 changes: 37 additions & 23 deletions src/cpp/rtps/flowcontrol/ThroughputController.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

#include <thread>

namespace eprosima{
namespace fastrtps{
namespace rtps{
namespace eprosima {
namespace fastrtps {
namespace rtps {

class RTPSWriter;
class RTPSParticipantImpl;
Expand All @@ -35,33 +35,47 @@ class RTPSParticipantImpl;
*/
class ThroughputController : public FlowController
{
public:
ThroughputController(const ThroughputControllerDescriptor&, RTPSWriter* associatedWriter);
ThroughputController(const ThroughputControllerDescriptor&, RTPSParticipantImpl* associatedParticipant);
public:

virtual void operator()(RTPSWriterCollector<ReaderLocator*>& changesToSend) override;
virtual void operator()(RTPSWriterCollector<ReaderProxy*>& changesToSend) override;
ThroughputController(
const ThroughputControllerDescriptor&,
RTPSWriter* associatedWriter);
ThroughputController(
const ThroughputControllerDescriptor&,
RTPSParticipantImpl* associatedParticipant);

virtual void disable() override;
virtual void operator ()(
RTPSWriterCollector<ReaderLocator*>& changesToSend) override;
virtual void operator ()(
RTPSWriterCollector<ReaderProxy*>& changesToSend) override;

private:
virtual void disable() override;

bool process_change_nts_(CacheChange_t* change, const SequenceNumber_t& seqNum,
const FragmentNumber_t fragNum);
private:

uint32_t mBytesPerPeriod;
uint32_t mAccumulatedPayloadSize;
uint32_t mPeriodMillisecs;
std::recursive_mutex mThroughputControllerMutex;
template<typename Collector>
void process_nts(Collector& changesToSend);

RTPSParticipantImpl* mAssociatedParticipant;
RTPSWriter* mAssociatedWriter;
bool process_change_nts_(
CacheChange_t* change,
const SequenceNumber_t& seqNum,
const FragmentNumber_t fragNum,
uint32_t* accumulated_size);

/*
* Schedules the filter to be refreshed in period ms. When it does, its capacity
* will be partially restored, by "sizeToRestore" bytes.
*/
void ScheduleRefresh(uint32_t sizeToRestore);
uint32_t mBytesPerPeriod;
uint32_t mAccumulatedPayloadSize;
uint32_t mPeriodMillisecs;
std::recursive_mutex mThroughputControllerMutex;

RTPSParticipantImpl* mAssociatedParticipant;
RTPSWriter* mAssociatedWriter;

/*
* Schedules the filter to be refreshed in period ms. When it does, its capacity
* will be partially restored, by "sizeToRestore" bytes.
*/
void ScheduleRefresh(
uint32_t sizeToRestore);
};

} // namespace rtps
Expand Down
25 changes: 25 additions & 0 deletions src/cpp/rtps/writer/ReaderProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,31 @@ bool ReaderProxy::change_is_acked(
return !chit->isRelevant() || chit->getStatus() == ACKNOWLEDGED;
}

bool ReaderProxy::change_is_unsent(
const SequenceNumber_t& seq_num,
bool& is_irrelevant) const
{
if (seq_num <= changes_low_mark_ || changes_for_reader_.empty())
{
return false;
}

ChangeConstIterator chit = find_change(seq_num);
if (chit == changes_for_reader_.end())
{
// There is a hole in changes_for_reader_
// This means a change was removed.
return false;
}

if (chit->isRelevant())
{
is_irrelevant = false;
}

return chit->getStatus() == UNSENT;
}

void ReaderProxy::acked_changes_set(
const SequenceNumber_t& seq_num)
{
Expand Down
Loading

0 comments on commit cf59876

Please sign in to comment.