Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid temporary allocations when not using flow controllers <1.9.x> [7274] #943

Merged
merged 13 commits into from
Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions include/fastrtps/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ class ReaderProxy
bool change_is_acked(
const SequenceNumber_t& seq_num) const;

/**
* Check if a specific change is marked to be sent to this reader.
* @param[in] seq_num Sequence number of the change to be checked.
* @param[out] is_irrelevant Will be forced to true if change is irrelevant for this reader.
* @return true when the change is marked to be sent, false otherwise.
*/
bool change_is_unsent(
const SequenceNumber_t& seq_num,
bool& is_irrelevant) const;

/**
* Mark all changes up to the one indicated by seq_num as Acknowledged.
* For instance, when seq_num is 30, changes 1-29 are marked as acknowledged.
Expand Down
15 changes: 15 additions & 0 deletions include/fastrtps/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,20 @@ class StatefulWriter : public RTPSWriter
*/
bool ack_timer_expired();

void send_heartbeat_to_all_readers();

void send_changes_separatedly(
SequenceNumber_t max_sequence,
bool& activateHeartbeatPeriod);

void send_all_unsent_changes(
SequenceNumber_t max_sequence,
bool& activateHeartbeatPeriod);

void send_unsent_changes_with_flow_control(
SequenceNumber_t max_sequence,
bool& activateHeartbeatPeriod);

//! True to disable piggyback heartbeats
bool disable_heartbeat_piggyback_;
//! True to disable positive ACKs
Expand All @@ -344,6 +358,7 @@ class StatefulWriter : public RTPSWriter

bool there_are_remote_readers_ = false;

bool readers_dont_share_locators_ = true;

StatefulWriter& operator =(
const StatefulWriter&) = delete;
Expand Down
4 changes: 4 additions & 0 deletions include/fastrtps/rtps/writer/StatelessWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ class StatelessWriter : public RTPSWriter
CacheChange_t* change,
ReaderLocator& reader_locator);

void send_all_unsent_changes();

void send_unsent_changes_with_flow_control();

bool is_inline_qos_expected_ = false;
LocatorList_t fixed_locators_;
ResourceLimitedVector<ReaderLocator> matched_readers_;
Expand Down
145 changes: 78 additions & 67 deletions src/cpp/rtps/flowcontrol/ThroughputController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,119 +21,130 @@
#include <cassert>


namespace eprosima{
namespace fastrtps{
namespace rtps{

ThroughputController::ThroughputController(const ThroughputControllerDescriptor& descriptor, RTPSWriter* associatedWriter):
mBytesPerPeriod(descriptor.bytesPerPeriod),
mAccumulatedPayloadSize(0),
mPeriodMillisecs(descriptor.periodMillisecs),
mAssociatedParticipant(nullptr),
mAssociatedWriter(associatedWriter)
namespace eprosima {
namespace fastrtps {
namespace rtps {

ThroughputController::ThroughputController(
const ThroughputControllerDescriptor& descriptor,
RTPSWriter* associatedWriter)
: mBytesPerPeriod(descriptor.bytesPerPeriod)
, mAccumulatedPayloadSize(0)
, mPeriodMillisecs(descriptor.periodMillisecs)
, mAssociatedParticipant(nullptr)
, mAssociatedWriter(associatedWriter)
{
}

ThroughputController::ThroughputController(const ThroughputControllerDescriptor& descriptor, RTPSParticipantImpl* associatedParticipant):
mBytesPerPeriod(descriptor.bytesPerPeriod),
mAccumulatedPayloadSize(0),
mPeriodMillisecs(descriptor.periodMillisecs),
mAssociatedParticipant(associatedParticipant),
mAssociatedWriter(nullptr)
ThroughputController::ThroughputController(
const ThroughputControllerDescriptor& descriptor,
RTPSParticipantImpl* associatedParticipant)
: mBytesPerPeriod(descriptor.bytesPerPeriod)
, mAccumulatedPayloadSize(0)
, mPeriodMillisecs(descriptor.periodMillisecs)
, mAssociatedParticipant(associatedParticipant)
, mAssociatedWriter(nullptr)
{
}

void ThroughputController::operator()(RTPSWriterCollector<ReaderLocator*>& changesToSend)
void ThroughputController::operator ()(
RTPSWriterCollector<ReaderLocator*>& changesToSend)
{
std::unique_lock<std::recursive_mutex> scopedLock(mThroughputControllerMutex);
process_nts(changesToSend);
}

auto it = changesToSend.items().begin();

while(it != changesToSend.items().end())
{
if(!process_change_nts_(it->cacheChange, it->sequenceNumber, it->fragmentNumber))
break;

++it;
}

changesToSend.items().erase(it, changesToSend.items().end());
void ThroughputController::operator ()(
RTPSWriterCollector<ReaderProxy*>& changesToSend)
{
std::unique_lock<std::recursive_mutex> scopedLock(mThroughputControllerMutex);
process_nts(changesToSend);
}

void ThroughputController::operator()(RTPSWriterCollector<ReaderProxy*>& changesToSend)
void ThroughputController::disable()
{
std::unique_lock<std::recursive_mutex> scopedLock(mThroughputControllerMutex);
mAssociatedWriter = nullptr;
mAssociatedParticipant = nullptr;
}

template<typename Collector>
void ThroughputController::process_nts(Collector& changesToSend)
{
uint32_t size_to_restore = 0;
auto it = changesToSend.items().begin();

while(it != changesToSend.items().end())
while (
it != changesToSend.items().end() &&
process_change_nts_(it->cacheChange, it->sequenceNumber, it->fragmentNumber, &size_to_restore))
{
if(!process_change_nts_(it->cacheChange, it->sequenceNumber, it->fragmentNumber))
break;

++it;
}

changesToSend.items().erase(it, changesToSend.items().end());
}


void ThroughputController::disable()
{
std::unique_lock<std::recursive_mutex> scopedLock(mThroughputControllerMutex);
mAssociatedWriter = nullptr;
mAssociatedParticipant = nullptr;
if (size_to_restore > 0)
{
ScheduleRefresh(size_to_restore);
}
}

bool ThroughputController::process_change_nts_(CacheChange_t* change, const SequenceNumber_t& /*seqNum*/,
const FragmentNumber_t fragNum)
bool ThroughputController::process_change_nts_(
CacheChange_t* change,
const SequenceNumber_t& /*seqNum*/,
const FragmentNumber_t fragNum,
uint32_t* accumulated_size)
{
assert(change != nullptr);

uint32_t dataLength = change->serializedPayload.length;

if (fragNum != 0)
{
dataLength = (fragNum + 1) != change->getFragmentCount() ?
change->getFragmentSize() : change->serializedPayload.length - (fragNum * change->getFragmentSize());
change->getFragmentSize() : change->serializedPayload.length - (fragNum * change->getFragmentSize());
}

if((mAccumulatedPayloadSize + dataLength) <= mBytesPerPeriod)
if ((mAccumulatedPayloadSize + dataLength) <= mBytesPerPeriod)
{
mAccumulatedPayloadSize += dataLength;
ScheduleRefresh(dataLength);
*accumulated_size += dataLength;
return true;
}

return false;
}

void ThroughputController::ScheduleRefresh(uint32_t sizeToRestore)
void ThroughputController::ScheduleRefresh(
uint32_t sizeToRestore)
{
std::shared_ptr<asio::steady_timer> throwawayTimer(std::make_shared<asio::steady_timer>(*FlowController::ControllerService));
std::shared_ptr<asio::steady_timer> throwawayTimer(std::make_shared<asio::steady_timer>(
*FlowController::ControllerService));
auto refresh = [throwawayTimer, this, sizeToRestore]
(const asio::error_code& error)
{
if ((error != asio::error::operation_aborted) &&
FlowController::IsListening(this))
(const asio::error_code& error)
{
std::unique_lock<std::recursive_mutex> scopedLock(mThroughputControllerMutex);
throwawayTimer->cancel();
mAccumulatedPayloadSize = sizeToRestore > mAccumulatedPayloadSize ? 0 : mAccumulatedPayloadSize - sizeToRestore;

if (mAssociatedWriter)
{
mAssociatedWriter->getRTPSParticipant()->async_thread().wake_up(mAssociatedWriter);
}
else if (mAssociatedParticipant)
if ((error != asio::error::operation_aborted) &&
FlowController::IsListening(this))
{
std::unique_lock<std::recursive_mutex> lock(*mAssociatedParticipant->getParticipantMutex());
for (auto it = mAssociatedParticipant->userWritersListBegin();
it != mAssociatedParticipant->userWritersListEnd(); ++it)
std::unique_lock<std::recursive_mutex> scopedLock(mThroughputControllerMutex);
throwawayTimer->cancel();
mAccumulatedPayloadSize = sizeToRestore > mAccumulatedPayloadSize ?
0 : mAccumulatedPayloadSize - sizeToRestore;

if (mAssociatedWriter)
{
mAssociatedWriter->getRTPSParticipant()->async_thread().wake_up(mAssociatedWriter);
}
else if (mAssociatedParticipant)
{
mAssociatedParticipant->async_thread().wake_up(*it);
std::unique_lock<std::recursive_mutex> lock(*mAssociatedParticipant->getParticipantMutex());
for (auto it = mAssociatedParticipant->userWritersListBegin();
it != mAssociatedParticipant->userWritersListEnd(); ++it)
{
mAssociatedParticipant->async_thread().wake_up(*it);
}
}
}
}
};
};

throwawayTimer->expires_from_now(std::chrono::milliseconds(mPeriodMillisecs));
throwawayTimer->async_wait(refresh);
Expand Down
60 changes: 37 additions & 23 deletions src/cpp/rtps/flowcontrol/ThroughputController.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

#include <thread>

namespace eprosima{
namespace fastrtps{
namespace rtps{
namespace eprosima {
namespace fastrtps {
namespace rtps {

class RTPSWriter;
class RTPSParticipantImpl;
Expand All @@ -35,33 +35,47 @@ class RTPSParticipantImpl;
*/
class ThroughputController : public FlowController
{
public:
ThroughputController(const ThroughputControllerDescriptor&, RTPSWriter* associatedWriter);
ThroughputController(const ThroughputControllerDescriptor&, RTPSParticipantImpl* associatedParticipant);
public:

virtual void operator()(RTPSWriterCollector<ReaderLocator*>& changesToSend) override;
virtual void operator()(RTPSWriterCollector<ReaderProxy*>& changesToSend) override;
ThroughputController(
const ThroughputControllerDescriptor&,
RTPSWriter* associatedWriter);
ThroughputController(
const ThroughputControllerDescriptor&,
RTPSParticipantImpl* associatedParticipant);

virtual void disable() override;
virtual void operator ()(
RTPSWriterCollector<ReaderLocator*>& changesToSend) override;
virtual void operator ()(
RTPSWriterCollector<ReaderProxy*>& changesToSend) override;

private:
virtual void disable() override;

bool process_change_nts_(CacheChange_t* change, const SequenceNumber_t& seqNum,
const FragmentNumber_t fragNum);
private:

uint32_t mBytesPerPeriod;
uint32_t mAccumulatedPayloadSize;
uint32_t mPeriodMillisecs;
std::recursive_mutex mThroughputControllerMutex;
template<typename Collector>
void process_nts(Collector& changesToSend);

RTPSParticipantImpl* mAssociatedParticipant;
RTPSWriter* mAssociatedWriter;
bool process_change_nts_(
CacheChange_t* change,
const SequenceNumber_t& seqNum,
const FragmentNumber_t fragNum,
uint32_t* accumulated_size);

/*
* Schedules the filter to be refreshed in period ms. When it does, its capacity
* will be partially restored, by "sizeToRestore" bytes.
*/
void ScheduleRefresh(uint32_t sizeToRestore);
uint32_t mBytesPerPeriod;
uint32_t mAccumulatedPayloadSize;
uint32_t mPeriodMillisecs;
std::recursive_mutex mThroughputControllerMutex;

RTPSParticipantImpl* mAssociatedParticipant;
RTPSWriter* mAssociatedWriter;

/*
* Schedules the filter to be refreshed in period ms. When it does, its capacity
* will be partially restored, by "sizeToRestore" bytes.
*/
void ScheduleRefresh(
uint32_t sizeToRestore);
};

} // namespace rtps
Expand Down
25 changes: 25 additions & 0 deletions src/cpp/rtps/writer/ReaderProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,31 @@ bool ReaderProxy::change_is_acked(
return !chit->isRelevant() || chit->getStatus() == ACKNOWLEDGED;
}

bool ReaderProxy::change_is_unsent(
const SequenceNumber_t& seq_num,
bool& is_irrelevant) const
{
if (seq_num <= changes_low_mark_ || changes_for_reader_.empty())
{
return false;
}

ChangeConstIterator chit = find_change(seq_num);
if (chit == changes_for_reader_.end())
{
// There is a hole in changes_for_reader_
// This means a change was removed.
return false;
}

if (chit->isRelevant())
{
is_irrelevant = false;
}

return chit->getStatus() == UNSENT;
}

void ReaderProxy::acked_changes_set(
const SequenceNumber_t& seq_num)
{
Expand Down
Loading