Skip to content

Commit

Permalink
Periodic heartbeat also depending on ReaderProxy's low_mark (#2253)
Browse files Browse the repository at this point in the history
* Refs #12161. Periodic heartbeat also depending on low_mark

Signed-off-by: Ricardo González <ricardo@richiware.dev>

* Refs #12642. Apply suggestions

Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
  • Loading branch information
richiware authored Nov 19, 2021
1 parent f244f42 commit a82ee68
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 30 deletions.
4 changes: 3 additions & 1 deletion include/fastdds/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,11 @@ class ReaderProxy

/*!
* @brief Returns there is some UNACKNOWLEDGED change.
* @param first_seq_in_history Minimum sequence number in the writer history.
* @return There is some UNACKNOWLEDGED change.
*/
bool has_unacknowledged() const;
bool has_unacknowledged(
const SequenceNumber_t& first_seq_in_history) const;

/**
* Get the GUID of the reader represented by this proxy.
Expand Down
8 changes: 7 additions & 1 deletion src/cpp/rtps/writer/ReaderProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,14 @@ void ReaderProxy::change_has_been_removed(
changes_for_reader_.erase(chit);
}

bool ReaderProxy::has_unacknowledged() const
bool ReaderProxy::has_unacknowledged(
const SequenceNumber_t& first_seq_in_history) const
{
if (first_seq_in_history > changes_low_mark_)
{
return true;
}

for (const ChangeForReader_t& it : changes_for_reader_)
{
if (it.getStatus() == UNACKNOWLEDGED)
Expand Down
54 changes: 26 additions & 28 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1586,38 +1586,30 @@ bool StatefulWriter::send_periodic_heartbeat(
bool unacked_changes = false;
if (!liveliness)
{
SequenceNumber_t firstSeq, lastSeq;

firstSeq = get_seq_num_min();
lastSeq = get_seq_num_max();

if (firstSeq == c_SequenceNumber_Unknown || lastSeq == c_SequenceNumber_Unknown)
SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min();
if (SequenceNumber_t::unknown() == first_seq_to_check_acknowledge)
{
return false;
first_seq_to_check_acknowledge = mp_history->next_sequence_number() - 1;
}
else
{
assert(firstSeq <= lastSeq);

unacked_changes = for_matched_readers(matched_local_readers_, matched_datasharing_readers_,
matched_remote_readers_,
[](ReaderProxy* reader)
{
return reader->has_unacknowledged();
}
);
unacked_changes = for_matched_readers(matched_local_readers_, matched_datasharing_readers_,
matched_remote_readers_,
[first_seq_to_check_acknowledge](ReaderProxy* reader)
{
return reader->has_unacknowledged(first_seq_to_check_acknowledge);
}
);

if (unacked_changes)
if (unacked_changes)
{
try
{
try
{
//TODO if separating, here sends periodic for all readers, instead of ones needed it.
send_heartbeat_to_all_readers();
}
catch (const RTPSMessageGroup::timeout&)
{
logError(RTPS_WRITER, "Max blocking time reached");
}
//TODO if separating, here sends periodic for all readers, instead of ones needed it.
send_heartbeat_to_all_readers();
}
catch (const RTPSMessageGroup::timeout&)
{
logError(RTPS_WRITER, "Max blocking time reached");
}
}
}
Expand Down Expand Up @@ -1673,7 +1665,13 @@ void StatefulWriter::send_heartbeat_to_nts(
bool liveliness,
bool force /* = false */)
{
if (remoteReaderProxy.is_reliable() && (force || liveliness || remoteReaderProxy.has_unacknowledged()))
SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min();
if (SequenceNumber_t::unknown() == first_seq_to_check_acknowledge)
{
first_seq_to_check_acknowledge = mp_history->next_sequence_number() - 1;
}
if (remoteReaderProxy.is_reliable() &&
(force || liveliness || remoteReaderProxy.has_unacknowledged(first_seq_to_check_acknowledge)))
{
if (remoteReaderProxy.is_local_reader())
{
Expand Down

0 comments on commit a82ee68

Please sign in to comment.