From a82ee6806771d40edad4975b3aacc2110498bd12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Gonz=C3=A1lez?= Date: Fri, 19 Nov 2021 07:43:26 +0100 Subject: [PATCH] Periodic heartbeat also depending on ReaderProxy's low_mark (#2253) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refs #12161. Periodic heartbeat also depending on low_mark Signed-off-by: Ricardo González * Refs #12642. Apply suggestions Signed-off-by: Ricardo González Moreno --- include/fastdds/rtps/writer/ReaderProxy.h | 4 +- src/cpp/rtps/writer/ReaderProxy.cpp | 8 +++- src/cpp/rtps/writer/StatefulWriter.cpp | 54 +++++++++++------------ 3 files changed, 36 insertions(+), 30 deletions(-) diff --git a/include/fastdds/rtps/writer/ReaderProxy.h b/include/fastdds/rtps/writer/ReaderProxy.h index ee32f7f5c94..b593f833bfd 100644 --- a/include/fastdds/rtps/writer/ReaderProxy.h +++ b/include/fastdds/rtps/writer/ReaderProxy.h @@ -219,9 +219,11 @@ class ReaderProxy /*! * @brief Returns there is some UNACKNOWLEDGED change. + * @param first_seq_in_history Minimum sequence number in the writer history. * @return There is some UNACKNOWLEDGED change. */ - bool has_unacknowledged() const; + bool has_unacknowledged( + const SequenceNumber_t& first_seq_in_history) const; /** * Get the GUID of the reader represented by this proxy. diff --git a/src/cpp/rtps/writer/ReaderProxy.cpp b/src/cpp/rtps/writer/ReaderProxy.cpp index 9b90c648f6e..7315c6b93e5 100644 --- a/src/cpp/rtps/writer/ReaderProxy.cpp +++ b/src/cpp/rtps/writer/ReaderProxy.cpp @@ -560,8 +560,14 @@ void ReaderProxy::change_has_been_removed( changes_for_reader_.erase(chit); } -bool ReaderProxy::has_unacknowledged() const +bool ReaderProxy::has_unacknowledged( + const SequenceNumber_t& first_seq_in_history) const { + if (first_seq_in_history > changes_low_mark_) + { + return true; + } + for (const ChangeForReader_t& it : changes_for_reader_) { if (it.getStatus() == UNACKNOWLEDGED) diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index df3aea84eed..699948edd55 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -1586,38 +1586,30 @@ bool StatefulWriter::send_periodic_heartbeat( bool unacked_changes = false; if (!liveliness) { - SequenceNumber_t firstSeq, lastSeq; - - firstSeq = get_seq_num_min(); - lastSeq = get_seq_num_max(); - - if (firstSeq == c_SequenceNumber_Unknown || lastSeq == c_SequenceNumber_Unknown) + SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min(); + if (SequenceNumber_t::unknown() == first_seq_to_check_acknowledge) { - return false; + first_seq_to_check_acknowledge = mp_history->next_sequence_number() - 1; } - else - { - assert(firstSeq <= lastSeq); - unacked_changes = for_matched_readers(matched_local_readers_, matched_datasharing_readers_, - matched_remote_readers_, - [](ReaderProxy* reader) - { - return reader->has_unacknowledged(); - } - ); + unacked_changes = for_matched_readers(matched_local_readers_, matched_datasharing_readers_, + matched_remote_readers_, + [first_seq_to_check_acknowledge](ReaderProxy* reader) + { + return reader->has_unacknowledged(first_seq_to_check_acknowledge); + } + ); - if (unacked_changes) + if (unacked_changes) + { + try { - try - { - //TODO if separating, here sends periodic for all readers, instead of ones needed it. - send_heartbeat_to_all_readers(); - } - catch (const RTPSMessageGroup::timeout&) - { - logError(RTPS_WRITER, "Max blocking time reached"); - } + //TODO if separating, here sends periodic for all readers, instead of ones needed it. + send_heartbeat_to_all_readers(); + } + catch (const RTPSMessageGroup::timeout&) + { + logError(RTPS_WRITER, "Max blocking time reached"); } } } @@ -1673,7 +1665,13 @@ void StatefulWriter::send_heartbeat_to_nts( bool liveliness, bool force /* = false */) { - if (remoteReaderProxy.is_reliable() && (force || liveliness || remoteReaderProxy.has_unacknowledged())) + SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min(); + if (SequenceNumber_t::unknown() == first_seq_to_check_acknowledge) + { + first_seq_to_check_acknowledge = mp_history->next_sequence_number() - 1; + } + if (remoteReaderProxy.is_reliable() && + (force || liveliness || remoteReaderProxy.has_unacknowledged(first_seq_to_check_acknowledge))) { if (remoteReaderProxy.is_local_reader()) {