diff --git a/include/fastdds/rtps/writer/RTPSWriter.h b/include/fastdds/rtps/writer/RTPSWriter.h index 94080abd9d8..f341a6ac4e0 100644 --- a/include/fastdds/rtps/writer/RTPSWriter.h +++ b/include/fastdds/rtps/writer/RTPSWriter.h @@ -544,7 +544,7 @@ class RTPSWriter return sent_ok; } - static void add_statistics_sent_submessage( + void add_statistics_sent_submessage( CacheChange_t* change, size_t num_locators); diff --git a/include/fastdds/rtps/writer/StatefulWriter.h b/include/fastdds/rtps/writer/StatefulWriter.h index 60324a57baf..3d46843fda3 100644 --- a/include/fastdds/rtps/writer/StatefulWriter.h +++ b/include/fastdds/rtps/writer/StatefulWriter.h @@ -106,8 +106,6 @@ class StatefulWriter : public RTPSWriter //!WriterTimes WriterTimes m_times; - //! Vector containing all the active ReaderProxies. - ResourceLimitedVector matched_readers_; //! Vector containing all the remote ReaderProxies. ResourceLimitedVector matched_remote_readers_; //! Vector containing all the inactive, ready for reuse, ReaderProxies. diff --git a/include/fastdds/statistics/rtps/StatisticsCommon.hpp b/include/fastdds/statistics/rtps/StatisticsCommon.hpp index 64ddeafc538..956a26e0523 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommon.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommon.hpp @@ -161,11 +161,15 @@ class StatisticsWriterImpl void on_heartbeat( uint32_t count); - /// Report that a DATA message is sent - void on_data(); + /** + * @brief Report that a DATA / DATA_FRAG message is generated + * @param num_destinations number of locators to which the message will be sent + */ + void on_data_generated( + size_t num_destinations); - /// Report that a DATA_FRAG message is sent - void on_data_frag(); + /// Notify listeners of DATA / DATA_FRAG counts + void on_data_sent(); /// Report that a GAP message is sent void on_gap(); diff --git a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp index 635708cc911..a9efa082299 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp @@ -62,13 +62,17 @@ class StatisticsWriterImpl { } - /// Report that a DATA message is sent - inline void on_data() + /** + * @brief Report that a DATA / DATA_FRAG message is generated + * @param number of locators to which the message will be sent + */ + inline void on_data_generated( + size_t) { } - /// Report that a DATA_FRAG message is sent - inline void on_data_frag() + /// Notify listeners of DATA / DATA_FRAG counts + inline void on_data_sent() { } diff --git a/src/cpp/rtps/messages/RTPSMessageGroup.cpp b/src/cpp/rtps/messages/RTPSMessageGroup.cpp index 359d96797e6..8a62003f615 100644 --- a/src/cpp/rtps/messages/RTPSMessageGroup.cpp +++ b/src/cpp/rtps/messages/RTPSMessageGroup.cpp @@ -429,10 +429,6 @@ bool RTPSMessageGroup::add_data( } #endif // if HAVE_SECURITY - // Notify the statistics module, note that only writers add DATAs - assert(nullptr != dynamic_cast(endpoint_)); - static_cast(endpoint_)->on_data(); - return insert_submessage(is_big_submessage); } @@ -530,10 +526,6 @@ bool RTPSMessageGroup::add_data_frag( } #endif // if HAVE_SECURITY - // Notify the statistics module, note that only writers add DATAs - assert(nullptr != dynamic_cast(endpoint_)); - static_cast(endpoint_)->on_data_frag(); - return insert_submessage(false); } diff --git a/src/cpp/rtps/writer/RTPSWriter.cpp b/src/cpp/rtps/writer/RTPSWriter.cpp index 01c7dd02394..90e35200e47 100644 --- a/src/cpp/rtps/writer/RTPSWriter.cpp +++ b/src/cpp/rtps/writer/RTPSWriter.cpp @@ -447,6 +447,7 @@ void RTPSWriter::add_statistics_sent_submessage( #ifdef FASTDDS_STATISTICS change->num_sent_submessages += num_locators; + on_data_generated(num_locators); #endif // ifdef FASTDDS_STATISTICS } diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 1e5ed6ea43f..f42d1eee6f9 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -602,49 +602,60 @@ void StatefulWriter::unsent_change_added_to_history( CacheChange_t* change, const std::chrono::time_point& max_blocking_time) { - std::lock_guard guard(mp_mutex); + bool should_notify_data_sent = false; - if (liveliness_lease_duration_ < c_TimeInfinite) { - mp_RTPSParticipant->wlp()->assert_liveliness( - getGuid(), - liveliness_kind_, - liveliness_lease_duration_); - } - - // Prepare the metadata for datasharing - if (is_datasharing_compatible()) - { - prepare_datasharing_delivery(change); - } + std::lock_guard guard(mp_mutex); - // Now for the rest of readers - if (!matched_remote_readers_.empty() || !matched_datasharing_readers_.empty() || !matched_local_readers_.empty()) - { - if (!isAsync()) + if (liveliness_lease_duration_ < c_TimeInfinite) { - sync_delivery(change, max_blocking_time); + mp_RTPSParticipant->wlp()->assert_liveliness( + getGuid(), + liveliness_kind_, + liveliness_lease_duration_); } - else + + // Prepare the metadata for datasharing + if (is_datasharing_compatible()) { - async_delivery(change, max_blocking_time); + prepare_datasharing_delivery(change); } - if (disable_positive_acks_) + // Now for the rest of readers + if (!matched_remote_readers_.empty() || !matched_datasharing_readers_.empty() || + !matched_local_readers_.empty()) { - auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); - auto interval = source_timestamp - now + keep_duration_us_; - assert(interval.count() >= 0); + if (!isAsync()) + { + sync_delivery(change, max_blocking_time); + should_notify_data_sent = true; + } + else + { + async_delivery(change, max_blocking_time); + } + + if (disable_positive_acks_) + { + auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); + auto now = system_clock::now(); + auto interval = source_timestamp - now + keep_duration_us_; + assert(interval.count() >= 0); - ack_event_->update_interval_millisec((double)duration_cast(interval).count()); - ack_event_->restart_timer(max_blocking_time); + ack_event_->update_interval_millisec((double)duration_cast(interval).count()); + ack_event_->restart_timer(max_blocking_time); + } + } + else + { + logInfo(RTPS_WRITER, "No reader proxy to add change."); + check_acked_status(); } } - else + + if (should_notify_data_sent) { - logInfo(RTPS_WRITER, "No reader proxy to add change."); - check_acked_status(); + on_data_sent(); } } @@ -764,39 +775,43 @@ bool StatefulWriter::change_removed_by_history( void StatefulWriter::send_any_unsent_changes() { - std::lock_guard guard(mp_mutex); + { + std::lock_guard guard(mp_mutex); - bool activateHeartbeatPeriod = false; - SequenceNumber_t max_sequence = mp_history->next_sequence_number(); + bool activateHeartbeatPeriod = false; + SequenceNumber_t max_sequence = mp_history->next_sequence_number(); - if (mp_history->getHistorySize() == 0 || getMatchedReadersSize() == 0) - { - send_heartbeat_to_all_readers(); - } - else if (m_separateSendingEnabled) - { - send_changes_separatedly(max_sequence, activateHeartbeatPeriod); - } - else - { - bool no_flow_controllers = m_controllers.empty() && mp_RTPSParticipant->getFlowControllers().empty(); - if (no_flow_controllers || !there_are_remote_readers_) + if (mp_history->getHistorySize() == 0 || getMatchedReadersSize() == 0) { - send_all_unsent_changes(max_sequence, activateHeartbeatPeriod); + send_heartbeat_to_all_readers(); + } + else if (m_separateSendingEnabled) + { + send_changes_separatedly(max_sequence, activateHeartbeatPeriod); } else { - send_unsent_changes_with_flow_control(max_sequence, activateHeartbeatPeriod); + bool no_flow_controllers = m_controllers.empty() && mp_RTPSParticipant->getFlowControllers().empty(); + if (no_flow_controllers || !there_are_remote_readers_) + { + send_all_unsent_changes(max_sequence, activateHeartbeatPeriod); + } + else + { + send_unsent_changes_with_flow_control(max_sequence, activateHeartbeatPeriod); + } + } + + if (activateHeartbeatPeriod) + { + periodic_hb_event_->restart_timer(); } - } - if (activateHeartbeatPeriod) - { - periodic_hb_event_->restart_timer(); + // On VOLATILE writers, remove auto-acked (best effort readers) changes + check_acked_status(); } - // On VOLATILE writers, remove auto-acked (best effort readers) changes - check_acked_status(); + on_data_sent(); logInfo(RTPS_WRITER, "Finish sending unsent changes"); } @@ -955,7 +970,7 @@ void StatefulWriter::send_changes_separatedly( else { SequenceNumber_t max_ack_seq = SequenceNumber_t::unknown(); - auto sent_fun = [num_locators]( + auto sent_fun = [this, num_locators]( CacheChange_t* change, FragmentNumber_t /*frag*/) { diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index bd2fd4103d8..f7de4bac1d8 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -332,95 +332,105 @@ void StatelessWriter::unsent_change_added_to_history( CacheChange_t* change, const std::chrono::time_point& max_blocking_time) { - std::lock_guard guard(mp_mutex); + bool should_notify_data_sent = false; - if (liveliness_lease_duration_ < c_TimeInfinite) { - mp_RTPSParticipant->wlp()->assert_liveliness( - getGuid(), - liveliness_kind_, - liveliness_lease_duration_); - } + std::lock_guard guard(mp_mutex); - // Notify the datasharing readers - // This also prepares the metadata for late-joiners - if (is_datasharing_compatible()) - { - datasharing_delivery(change); - } + if (liveliness_lease_duration_ < c_TimeInfinite) + { + mp_RTPSParticipant->wlp()->assert_liveliness( + getGuid(), + liveliness_kind_, + liveliness_lease_duration_); + } - // Now for the rest of readers - if (!fixed_locators_.empty() || getMatchedReadersSize() > 0) - { - if (!isAsync()) + // Notify the datasharing readers + // This also prepares the metadata for late-joiners + if (is_datasharing_compatible()) + { + datasharing_delivery(change); + } + + // Now for the rest of readers + if (!fixed_locators_.empty() || getMatchedReadersSize() > 0) { - try + if (!isAsync()) { - if (m_separateSendingEnabled) + try { - std::vector guids(1); - for (std::unique_ptr& it : matched_local_readers_) - { - intraprocess_delivery(change, *it); - } - for (std::unique_ptr& it : matched_remote_readers_) + if (m_separateSendingEnabled) { - RTPSMessageGroup group(mp_RTPSParticipant, this, *it, max_blocking_time); - size_t num_locators = it->locators_size(); - send_data_or_fragments(group, change, is_inline_qos_expected_, - [num_locators]( - CacheChange_t* change, - FragmentNumber_t /*frag*/) - { - add_statistics_sent_submessage(change, num_locators); - }); + std::vector guids(1); + for (std::unique_ptr& it : matched_local_readers_) + { + intraprocess_delivery(change, *it); + } + for (std::unique_ptr& it : matched_remote_readers_) + { + RTPSMessageGroup group(mp_RTPSParticipant, this, *it, max_blocking_time); + size_t num_locators = it->locators_size(); + send_data_or_fragments(group, change, is_inline_qos_expected_, + [this, num_locators]( + CacheChange_t* change, + FragmentNumber_t /*frag*/) + { + add_statistics_sent_submessage(change, num_locators); + }); + } } - } - else - { - for (std::unique_ptr& it : matched_local_readers_) + else { - intraprocess_delivery(change, *it); + for (std::unique_ptr& it : matched_local_readers_) + { + intraprocess_delivery(change, *it); + } + + if (there_are_remote_readers_ || !fixed_locators_.empty()) + { + RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); + size_t num_locators = locator_selector_.selected_size() + fixed_locators_.size(); + send_data_or_fragments(group, change, is_inline_qos_expected_, + [this, num_locators]( + CacheChange_t* change, + FragmentNumber_t /*frag*/) + { + add_statistics_sent_submessage(change, num_locators); + }); + } } - if (there_are_remote_readers_ || !fixed_locators_.empty()) + on_sample_datas(change->write_params.sample_identity(), change->num_sent_submessages); + should_notify_data_sent = true; + if (mp_listener != nullptr) { - RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); - size_t num_locators = locator_selector_.selected_size() + fixed_locators_.size(); - send_data_or_fragments(group, change, is_inline_qos_expected_, - [num_locators]( - CacheChange_t* change, - FragmentNumber_t /*frag*/) - { - add_statistics_sent_submessage(change, num_locators); - }); + mp_listener->onWriterChangeReceivedByAll(this, change); } } - - on_sample_datas(change->write_params.sample_identity(), change->num_sent_submessages); - if (mp_listener != nullptr) + catch (const RTPSMessageGroup::timeout&) { - mp_listener->onWriterChangeReceivedByAll(this, change); + logError(RTPS_WRITER, "Max blocking time reached"); } } - catch (const RTPSMessageGroup::timeout&) + else { - logError(RTPS_WRITER, "Max blocking time reached"); + unsent_changes_.push_back(ChangeForReader_t(change)); + mp_RTPSParticipant->async_thread().wake_up(this, max_blocking_time); } } else { - unsent_changes_.push_back(ChangeForReader_t(change)); - mp_RTPSParticipant->async_thread().wake_up(this, max_blocking_time); + logInfo(RTPS_WRITER, "No reader to add change."); + if (mp_listener != nullptr) + { + mp_listener->onWriterChangeReceivedByAll(this, change); + } } } - else + + if (should_notify_data_sent) { - logInfo(RTPS_WRITER, "No reader to add change."); - if (mp_listener != nullptr) - { - mp_listener->onWriterChangeReceivedByAll(this, change); - } + on_data_sent(); } } @@ -550,19 +560,22 @@ void StatelessWriter::update_unsent_changes( void StatelessWriter::send_any_unsent_changes() { - std::lock_guard guard(mp_mutex); - - bool remote_destinations = there_are_remote_readers_ || !fixed_locators_.empty(); - bool no_flow_controllers = flow_controllers_.empty() && mp_RTPSParticipant->getFlowControllers().empty(); - if (!remote_destinations || no_flow_controllers) - { - send_all_unsent_changes(); - } - else { - send_unsent_changes_with_flow_control(); + std::lock_guard guard(mp_mutex); + + bool remote_destinations = there_are_remote_readers_ || !fixed_locators_.empty(); + bool no_flow_controllers = flow_controllers_.empty() && mp_RTPSParticipant->getFlowControllers().empty(); + if (!remote_destinations || no_flow_controllers) + { + send_all_unsent_changes(); + } + else + { + send_unsent_changes_with_flow_control(); + } } + on_data_sent(); logInfo(RTPS_WRITER, "Finish sending unsent changes"); // In case someone is waiting for changes to be sent @@ -636,7 +649,7 @@ void StatelessWriter::send_all_unsent_changes() { auto change = unsentChange.getChange(); bool sent = send_data_or_fragments(group, change, is_inline_qos_expected_, - [num_locators]( + [this, num_locators]( CacheChange_t* change, FragmentNumber_t /*frag*/) { diff --git a/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp b/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp index a34facb3fe9..32aacc6997f 100644 --- a/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp +++ b/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp @@ -79,14 +79,23 @@ void StatisticsWriterImpl::on_sample_datas( }); } -void StatisticsWriterImpl::on_data() +void StatisticsWriterImpl::on_data_generated( + size_t num_destinations) +{ + std::lock_guard lock(get_statistics_mutex()); + auto members = get_members(); + members->data_counter += static_cast(num_destinations); +} + +void StatisticsWriterImpl::on_data_sent() { EntityCount notification; notification.guid(to_statistics_type(get_guid())); { std::lock_guard lock(get_statistics_mutex()); - notification.count(++get_members()->data_counter); + auto members = get_members(); + notification.count(members->data_counter); } // Perform the callbacks @@ -101,12 +110,6 @@ void StatisticsWriterImpl::on_data() }); } -void StatisticsWriterImpl::on_data_frag() -{ - // there is no specific EventKind thus it will be redirected to DATA_COUNT - on_data(); -} - void StatisticsWriterImpl::on_heartbeat( uint32_t count) {