From 8883a2707d837b612285a5fb2da07bf41ced8369 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 4 May 2021 10:37:29 +0200 Subject: [PATCH 1/5] Refs 11446. Refactored StatisticsWriterImpl::on_data. Signed-off-by: Miguel Company --- .../fastdds/statistics/rtps/StatisticsCommon.hpp | 11 ++++++----- .../statistics/rtps/StatisticsCommonEmpty.hpp | 13 ++++++------- src/cpp/rtps/messages/RTPSMessageGroup.cpp | 8 -------- .../statistics/rtps/writer/StatisticsWriterImpl.cpp | 13 +++++-------- 4 files changed, 17 insertions(+), 28 deletions(-) diff --git a/include/fastdds/statistics/rtps/StatisticsCommon.hpp b/include/fastdds/statistics/rtps/StatisticsCommon.hpp index 64ddeafc538..2f994e9eb85 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommon.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommon.hpp @@ -161,11 +161,12 @@ class StatisticsWriterImpl void on_heartbeat( uint32_t count); - /// Report that a DATA message is sent - void on_data(); - - /// Report that a DATA_FRAG message is sent - void on_data_frag(); + /** + * @brief Report that a DATA / DATA_FRAG message is sent + * @param num_destinations number of locators to which the message is sent + */ + void on_data( + size_t num_destinations); /// Report that a GAP message is sent void on_gap(); diff --git a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp index 635708cc911..fc39241b6ff 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp @@ -62,13 +62,12 @@ class StatisticsWriterImpl { } - /// Report that a DATA message is sent - inline void on_data() - { - } - - /// Report that a DATA_FRAG message is sent - inline void on_data_frag() + /** + * @brief Report that a DATA / DATA_FRAG message is sent + * @param number of locators to which the message is sent + */ + inline void on_data( + size_t) { } diff --git a/src/cpp/rtps/messages/RTPSMessageGroup.cpp b/src/cpp/rtps/messages/RTPSMessageGroup.cpp index 359d96797e6..8a62003f615 100644 --- a/src/cpp/rtps/messages/RTPSMessageGroup.cpp +++ b/src/cpp/rtps/messages/RTPSMessageGroup.cpp @@ -429,10 +429,6 @@ bool RTPSMessageGroup::add_data( } #endif // if HAVE_SECURITY - // Notify the statistics module, note that only writers add DATAs - assert(nullptr != dynamic_cast(endpoint_)); - static_cast(endpoint_)->on_data(); - return insert_submessage(is_big_submessage); } @@ -530,10 +526,6 @@ bool RTPSMessageGroup::add_data_frag( } #endif // if HAVE_SECURITY - // Notify the statistics module, note that only writers add DATAs - assert(nullptr != dynamic_cast(endpoint_)); - static_cast(endpoint_)->on_data_frag(); - return insert_submessage(false); } diff --git a/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp b/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp index a34facb3fe9..f27f120a793 100644 --- a/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp +++ b/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp @@ -79,14 +79,17 @@ void StatisticsWriterImpl::on_sample_datas( }); } -void StatisticsWriterImpl::on_data() +void StatisticsWriterImpl::on_data( + size_t num_destinations) { EntityCount notification; notification.guid(to_statistics_type(get_guid())); { std::lock_guard lock(get_statistics_mutex()); - notification.count(++get_members()->data_counter); + auto members = get_members(); + members->data_counter += static_cast(num_destinations); + notification.count(members->data_counter); } // Perform the callbacks @@ -101,12 +104,6 @@ void StatisticsWriterImpl::on_data() }); } -void StatisticsWriterImpl::on_data_frag() -{ - // there is no specific EventKind thus it will be redirected to DATA_COUNT - on_data(); -} - void StatisticsWriterImpl::on_heartbeat( uint32_t count) { From 2f22e29d2fd75fcfd1774f71c52f0ffa18308d11 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 4 May 2021 10:37:56 +0200 Subject: [PATCH 2/5] Refs 11446. Changes on RTPSWriter. Signed-off-by: Miguel Company --- include/fastdds/rtps/writer/RTPSWriter.h | 2 +- src/cpp/rtps/writer/RTPSWriter.cpp | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/include/fastdds/rtps/writer/RTPSWriter.h b/include/fastdds/rtps/writer/RTPSWriter.h index 94080abd9d8..f341a6ac4e0 100644 --- a/include/fastdds/rtps/writer/RTPSWriter.h +++ b/include/fastdds/rtps/writer/RTPSWriter.h @@ -544,7 +544,7 @@ class RTPSWriter return sent_ok; } - static void add_statistics_sent_submessage( + void add_statistics_sent_submessage( CacheChange_t* change, size_t num_locators); diff --git a/src/cpp/rtps/writer/RTPSWriter.cpp b/src/cpp/rtps/writer/RTPSWriter.cpp index 01c7dd02394..8db4804a79a 100644 --- a/src/cpp/rtps/writer/RTPSWriter.cpp +++ b/src/cpp/rtps/writer/RTPSWriter.cpp @@ -447,6 +447,7 @@ void RTPSWriter::add_statistics_sent_submessage( #ifdef FASTDDS_STATISTICS change->num_sent_submessages += num_locators; + on_data(num_locators); #endif // ifdef FASTDDS_STATISTICS } From 099c346c82ce5a901088aa76e2e526bd5b33b344 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 4 May 2021 14:53:55 +0200 Subject: [PATCH 3/5] Refs 11446. Sepparate on_data generated / sent. Signed-off-by: Miguel Company --- .../statistics/rtps/StatisticsCommon.hpp | 9 +- .../statistics/rtps/StatisticsCommonEmpty.hpp | 11 +- src/cpp/rtps/writer/RTPSWriter.cpp | 2 +- src/cpp/rtps/writer/StatefulWriter.cpp | 121 +++++++------ src/cpp/rtps/writer/StatelessWriter.cpp | 159 ++++++++++-------- .../rtps/writer/StatisticsWriterImpl.cpp | 10 +- 6 files changed, 177 insertions(+), 135 deletions(-) diff --git a/include/fastdds/statistics/rtps/StatisticsCommon.hpp b/include/fastdds/statistics/rtps/StatisticsCommon.hpp index 2f994e9eb85..956a26e0523 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommon.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommon.hpp @@ -162,12 +162,15 @@ class StatisticsWriterImpl uint32_t count); /** - * @brief Report that a DATA / DATA_FRAG message is sent - * @param num_destinations number of locators to which the message is sent + * @brief Report that a DATA / DATA_FRAG message is generated + * @param num_destinations number of locators to which the message will be sent */ - void on_data( + void on_data_generated( size_t num_destinations); + /// Notify listeners of DATA / DATA_FRAG counts + void on_data_sent(); + /// Report that a GAP message is sent void on_gap(); diff --git a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp index fc39241b6ff..a9efa082299 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp @@ -63,14 +63,19 @@ class StatisticsWriterImpl } /** - * @brief Report that a DATA / DATA_FRAG message is sent - * @param number of locators to which the message is sent + * @brief Report that a DATA / DATA_FRAG message is generated + * @param number of locators to which the message will be sent */ - inline void on_data( + inline void on_data_generated( size_t) { } + /// Notify listeners of DATA / DATA_FRAG counts + inline void on_data_sent() + { + } + /// Report that a GAP message is sent inline void on_gap() { diff --git a/src/cpp/rtps/writer/RTPSWriter.cpp b/src/cpp/rtps/writer/RTPSWriter.cpp index 8db4804a79a..90e35200e47 100644 --- a/src/cpp/rtps/writer/RTPSWriter.cpp +++ b/src/cpp/rtps/writer/RTPSWriter.cpp @@ -447,7 +447,7 @@ void RTPSWriter::add_statistics_sent_submessage( #ifdef FASTDDS_STATISTICS change->num_sent_submessages += num_locators; - on_data(num_locators); + on_data_generated(num_locators); #endif // ifdef FASTDDS_STATISTICS } diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 1e5ed6ea43f..4cc965521f2 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -602,49 +602,60 @@ void StatefulWriter::unsent_change_added_to_history( CacheChange_t* change, const std::chrono::time_point& max_blocking_time) { - std::lock_guard guard(mp_mutex); + bool should_notify_data_sent = false; - if (liveliness_lease_duration_ < c_TimeInfinite) { - mp_RTPSParticipant->wlp()->assert_liveliness( - getGuid(), - liveliness_kind_, - liveliness_lease_duration_); - } - - // Prepare the metadata for datasharing - if (is_datasharing_compatible()) - { - prepare_datasharing_delivery(change); - } + std::lock_guard guard(mp_mutex); - // Now for the rest of readers - if (!matched_remote_readers_.empty() || !matched_datasharing_readers_.empty() || !matched_local_readers_.empty()) - { - if (!isAsync()) + if (liveliness_lease_duration_ < c_TimeInfinite) { - sync_delivery(change, max_blocking_time); + mp_RTPSParticipant->wlp()->assert_liveliness( + getGuid(), + liveliness_kind_, + liveliness_lease_duration_); } - else + + // Prepare the metadata for datasharing + if (is_datasharing_compatible()) { - async_delivery(change, max_blocking_time); + prepare_datasharing_delivery(change); } - if (disable_positive_acks_) + // Now for the rest of readers + if (!matched_remote_readers_.empty() || !matched_datasharing_readers_.empty() || + !matched_local_readers_.empty()) { - auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); - auto now = system_clock::now(); - auto interval = source_timestamp - now + keep_duration_us_; - assert(interval.count() >= 0); + if (!isAsync()) + { + sync_delivery(change, max_blocking_time); + should_notify_data_sent = true; + } + else + { + async_delivery(change, max_blocking_time); + } + + if (disable_positive_acks_) + { + auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns()); + auto now = system_clock::now(); + auto interval = source_timestamp - now + keep_duration_us_; + assert(interval.count() >= 0); - ack_event_->update_interval_millisec((double)duration_cast(interval).count()); - ack_event_->restart_timer(max_blocking_time); + ack_event_->update_interval_millisec((double)duration_cast(interval).count()); + ack_event_->restart_timer(max_blocking_time); + } + } + else + { + logInfo(RTPS_WRITER, "No reader proxy to add change."); + check_acked_status(); } } - else + + if (should_notify_data_sent) { - logInfo(RTPS_WRITER, "No reader proxy to add change."); - check_acked_status(); + on_data_sent(); } } @@ -764,39 +775,43 @@ bool StatefulWriter::change_removed_by_history( void StatefulWriter::send_any_unsent_changes() { - std::lock_guard guard(mp_mutex); + { + std::lock_guard guard(mp_mutex); - bool activateHeartbeatPeriod = false; - SequenceNumber_t max_sequence = mp_history->next_sequence_number(); + bool activateHeartbeatPeriod = false; + SequenceNumber_t max_sequence = mp_history->next_sequence_number(); - if (mp_history->getHistorySize() == 0 || getMatchedReadersSize() == 0) - { - send_heartbeat_to_all_readers(); - } - else if (m_separateSendingEnabled) - { - send_changes_separatedly(max_sequence, activateHeartbeatPeriod); - } - else - { - bool no_flow_controllers = m_controllers.empty() && mp_RTPSParticipant->getFlowControllers().empty(); - if (no_flow_controllers || !there_are_remote_readers_) + if (mp_history->getHistorySize() == 0 || getMatchedReadersSize() == 0) { - send_all_unsent_changes(max_sequence, activateHeartbeatPeriod); + send_heartbeat_to_all_readers(); + } + else if (m_separateSendingEnabled) + { + send_changes_separatedly(max_sequence, activateHeartbeatPeriod); } else { - send_unsent_changes_with_flow_control(max_sequence, activateHeartbeatPeriod); + bool no_flow_controllers = m_controllers.empty() && mp_RTPSParticipant->getFlowControllers().empty(); + if (no_flow_controllers || !there_are_remote_readers_) + { + send_all_unsent_changes(max_sequence, activateHeartbeatPeriod); + } + else + { + send_unsent_changes_with_flow_control(max_sequence, activateHeartbeatPeriod); + } + } + + if (activateHeartbeatPeriod) + { + periodic_hb_event_->restart_timer(); } - } - if (activateHeartbeatPeriod) - { - periodic_hb_event_->restart_timer(); + // On VOLATILE writers, remove auto-acked (best effort readers) changes + check_acked_status(); } - // On VOLATILE writers, remove auto-acked (best effort readers) changes - check_acked_status(); + on_data_sent(); logInfo(RTPS_WRITER, "Finish sending unsent changes"); } diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index bd2fd4103d8..69697e7a7fd 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -332,95 +332,105 @@ void StatelessWriter::unsent_change_added_to_history( CacheChange_t* change, const std::chrono::time_point& max_blocking_time) { - std::lock_guard guard(mp_mutex); + bool should_notify_data_sent = false; - if (liveliness_lease_duration_ < c_TimeInfinite) { - mp_RTPSParticipant->wlp()->assert_liveliness( - getGuid(), - liveliness_kind_, - liveliness_lease_duration_); - } + std::lock_guard guard(mp_mutex); - // Notify the datasharing readers - // This also prepares the metadata for late-joiners - if (is_datasharing_compatible()) - { - datasharing_delivery(change); - } + if (liveliness_lease_duration_ < c_TimeInfinite) + { + mp_RTPSParticipant->wlp()->assert_liveliness( + getGuid(), + liveliness_kind_, + liveliness_lease_duration_); + } - // Now for the rest of readers - if (!fixed_locators_.empty() || getMatchedReadersSize() > 0) - { - if (!isAsync()) + // Notify the datasharing readers + // This also prepares the metadata for late-joiners + if (is_datasharing_compatible()) + { + datasharing_delivery(change); + } + + // Now for the rest of readers + if (!fixed_locators_.empty() || getMatchedReadersSize() > 0) { - try + if (!isAsync()) { - if (m_separateSendingEnabled) + try { - std::vector guids(1); - for (std::unique_ptr& it : matched_local_readers_) - { - intraprocess_delivery(change, *it); - } - for (std::unique_ptr& it : matched_remote_readers_) + if (m_separateSendingEnabled) { - RTPSMessageGroup group(mp_RTPSParticipant, this, *it, max_blocking_time); - size_t num_locators = it->locators_size(); - send_data_or_fragments(group, change, is_inline_qos_expected_, - [num_locators]( - CacheChange_t* change, - FragmentNumber_t /*frag*/) - { - add_statistics_sent_submessage(change, num_locators); - }); + std::vector guids(1); + for (std::unique_ptr& it : matched_local_readers_) + { + intraprocess_delivery(change, *it); + } + for (std::unique_ptr& it : matched_remote_readers_) + { + RTPSMessageGroup group(mp_RTPSParticipant, this, *it, max_blocking_time); + size_t num_locators = it->locators_size(); + send_data_or_fragments(group, change, is_inline_qos_expected_, + [num_locators]( + CacheChange_t* change, + FragmentNumber_t /*frag*/) + { + add_statistics_sent_submessage(change, num_locators); + }); + } } - } - else - { - for (std::unique_ptr& it : matched_local_readers_) + else { - intraprocess_delivery(change, *it); + for (std::unique_ptr& it : matched_local_readers_) + { + intraprocess_delivery(change, *it); + } + + if (there_are_remote_readers_ || !fixed_locators_.empty()) + { + RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); + size_t num_locators = locator_selector_.selected_size() + fixed_locators_.size(); + send_data_or_fragments(group, change, is_inline_qos_expected_, + [num_locators]( + CacheChange_t* change, + FragmentNumber_t /*frag*/) + { + add_statistics_sent_submessage(change, num_locators); + }); + } } - if (there_are_remote_readers_ || !fixed_locators_.empty()) + on_sample_datas(change->write_params.sample_identity(), change->num_sent_submessages); + should_notify_data_sent = true; + if (mp_listener != nullptr) { - RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); - size_t num_locators = locator_selector_.selected_size() + fixed_locators_.size(); - send_data_or_fragments(group, change, is_inline_qos_expected_, - [num_locators]( - CacheChange_t* change, - FragmentNumber_t /*frag*/) - { - add_statistics_sent_submessage(change, num_locators); - }); + mp_listener->onWriterChangeReceivedByAll(this, change); } } - - on_sample_datas(change->write_params.sample_identity(), change->num_sent_submessages); - if (mp_listener != nullptr) + catch (const RTPSMessageGroup::timeout&) { - mp_listener->onWriterChangeReceivedByAll(this, change); + logError(RTPS_WRITER, "Max blocking time reached"); } } - catch (const RTPSMessageGroup::timeout&) + else { - logError(RTPS_WRITER, "Max blocking time reached"); + unsent_changes_.push_back(ChangeForReader_t(change)); + mp_RTPSParticipant->async_thread().wake_up(this, max_blocking_time); } } else { - unsent_changes_.push_back(ChangeForReader_t(change)); - mp_RTPSParticipant->async_thread().wake_up(this, max_blocking_time); + logInfo(RTPS_WRITER, "No reader to add change."); + if (mp_listener != nullptr) + { + mp_listener->onWriterChangeReceivedByAll(this, change); + } } } - else + + if (should_notify_data_sent) { - logInfo(RTPS_WRITER, "No reader to add change."); - if (mp_listener != nullptr) - { - mp_listener->onWriterChangeReceivedByAll(this, change); - } + on_data_sent(); } } @@ -550,19 +560,22 @@ void StatelessWriter::update_unsent_changes( void StatelessWriter::send_any_unsent_changes() { - std::lock_guard guard(mp_mutex); - - bool remote_destinations = there_are_remote_readers_ || !fixed_locators_.empty(); - bool no_flow_controllers = flow_controllers_.empty() && mp_RTPSParticipant->getFlowControllers().empty(); - if (!remote_destinations || no_flow_controllers) - { - send_all_unsent_changes(); - } - else { - send_unsent_changes_with_flow_control(); + std::lock_guard guard(mp_mutex); + + bool remote_destinations = there_are_remote_readers_ || !fixed_locators_.empty(); + bool no_flow_controllers = flow_controllers_.empty() && mp_RTPSParticipant->getFlowControllers().empty(); + if (!remote_destinations || no_flow_controllers) + { + send_all_unsent_changes(); + } + else + { + send_unsent_changes_with_flow_control(); + } } + on_data_sent(); logInfo(RTPS_WRITER, "Finish sending unsent changes"); // In case someone is waiting for changes to be sent diff --git a/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp b/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp index f27f120a793..32aacc6997f 100644 --- a/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp +++ b/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp @@ -79,8 +79,15 @@ void StatisticsWriterImpl::on_sample_datas( }); } -void StatisticsWriterImpl::on_data( +void StatisticsWriterImpl::on_data_generated( size_t num_destinations) +{ + std::lock_guard lock(get_statistics_mutex()); + auto members = get_members(); + members->data_counter += static_cast(num_destinations); +} + +void StatisticsWriterImpl::on_data_sent() { EntityCount notification; notification.guid(to_statistics_type(get_guid())); @@ -88,7 +95,6 @@ void StatisticsWriterImpl::on_data( { std::lock_guard lock(get_statistics_mutex()); auto members = get_members(); - members->data_counter += static_cast(num_destinations); notification.count(members->data_counter); } From c0e7095d80acd64c53dc1dbd9abaf6e946013d82 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Tue, 4 May 2021 16:36:24 +0200 Subject: [PATCH 4/5] Refs 11446. Removed unused vector matched_readers_. Signed-off-by: Miguel Company --- include/fastdds/rtps/writer/StatefulWriter.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/include/fastdds/rtps/writer/StatefulWriter.h b/include/fastdds/rtps/writer/StatefulWriter.h index 60324a57baf..3d46843fda3 100644 --- a/include/fastdds/rtps/writer/StatefulWriter.h +++ b/include/fastdds/rtps/writer/StatefulWriter.h @@ -106,8 +106,6 @@ class StatefulWriter : public RTPSWriter //!WriterTimes WriterTimes m_times; - //! Vector containing all the active ReaderProxies. - ResourceLimitedVector matched_readers_; //! Vector containing all the remote ReaderProxies. ResourceLimitedVector matched_remote_readers_; //! Vector containing all the inactive, ready for reuse, ReaderProxies. From 75aebbf335349f961a82e6b404daf97d5feb7115 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Fri, 7 May 2021 11:17:00 +0200 Subject: [PATCH 5/5] Refs 11446. Fix after rebase. Signed-off-by: Miguel Company --- src/cpp/rtps/writer/StatefulWriter.cpp | 2 +- src/cpp/rtps/writer/StatelessWriter.cpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 4cc965521f2..f42d1eee6f9 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -970,7 +970,7 @@ void StatefulWriter::send_changes_separatedly( else { SequenceNumber_t max_ack_seq = SequenceNumber_t::unknown(); - auto sent_fun = [num_locators]( + auto sent_fun = [this, num_locators]( CacheChange_t* change, FragmentNumber_t /*frag*/) { diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index 69697e7a7fd..f7de4bac1d8 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -371,7 +371,7 @@ void StatelessWriter::unsent_change_added_to_history( RTPSMessageGroup group(mp_RTPSParticipant, this, *it, max_blocking_time); size_t num_locators = it->locators_size(); send_data_or_fragments(group, change, is_inline_qos_expected_, - [num_locators]( + [this, num_locators]( CacheChange_t* change, FragmentNumber_t /*frag*/) { @@ -391,7 +391,7 @@ void StatelessWriter::unsent_change_added_to_history( RTPSMessageGroup group(mp_RTPSParticipant, this, *this, max_blocking_time); size_t num_locators = locator_selector_.selected_size() + fixed_locators_.size(); send_data_or_fragments(group, change, is_inline_qos_expected_, - [num_locators]( + [this, num_locators]( CacheChange_t* change, FragmentNumber_t /*frag*/) { @@ -649,7 +649,7 @@ void StatelessWriter::send_all_unsent_changes() { auto change = unsentChange.getChange(); bool sent = send_data_or_fragments(group, change, is_inline_qos_expected_, - [num_locators]( + [this, num_locators]( CacheChange_t* change, FragmentNumber_t /*frag*/) {