Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DATA_COUNT to account for number of destinations [11446] #1949

Merged
merged 5 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ class RTPSWriter
return sent_ok;
}

static void add_statistics_sent_submessage(
void add_statistics_sent_submessage(
CacheChange_t* change,
size_t num_locators);

Expand Down
2 changes: 0 additions & 2 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ class StatefulWriter : public RTPSWriter
//!WriterTimes
WriterTimes m_times;

//! Vector containing all the active ReaderProxies.
ResourceLimitedVector<ReaderProxy*> matched_readers_;
//! Vector containing all the remote ReaderProxies.
ResourceLimitedVector<ReaderProxy*> matched_remote_readers_;
//! Vector containing all the inactive, ready for reuse, ReaderProxies.
Expand Down
12 changes: 8 additions & 4 deletions include/fastdds/statistics/rtps/StatisticsCommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,15 @@ class StatisticsWriterImpl
void on_heartbeat(
uint32_t count);

/// Report that a DATA message is sent
void on_data();
/**
* @brief Report that a DATA / DATA_FRAG message is generated
* @param num_destinations number of locators to which the message will be sent
*/
void on_data_generated(
size_t num_destinations);

/// Report that a DATA_FRAG message is sent
void on_data_frag();
/// Notify listeners of DATA / DATA_FRAG counts
void on_data_sent();

/// Report that a GAP message is sent
void on_gap();
Expand Down
12 changes: 8 additions & 4 deletions include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,17 @@ class StatisticsWriterImpl
{
}

/// Report that a DATA message is sent
inline void on_data()
/**
* @brief Report that a DATA / DATA_FRAG message is generated
* @param number of locators to which the message will be sent
*/
inline void on_data_generated(
size_t)
{
}

/// Report that a DATA_FRAG message is sent
inline void on_data_frag()
/// Notify listeners of DATA / DATA_FRAG counts
inline void on_data_sent()
{
}

Expand Down
8 changes: 0 additions & 8 deletions src/cpp/rtps/messages/RTPSMessageGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,6 @@ bool RTPSMessageGroup::add_data(
}
#endif // if HAVE_SECURITY

// Notify the statistics module, note that only writers add DATAs
assert(nullptr != dynamic_cast<RTPSWriter*>(endpoint_));
static_cast<RTPSWriter*>(endpoint_)->on_data();

return insert_submessage(is_big_submessage);
}

Expand Down Expand Up @@ -530,10 +526,6 @@ bool RTPSMessageGroup::add_data_frag(
}
#endif // if HAVE_SECURITY

// Notify the statistics module, note that only writers add DATAs
assert(nullptr != dynamic_cast<RTPSWriter*>(endpoint_));
static_cast<RTPSWriter*>(endpoint_)->on_data_frag();

return insert_submessage(false);
}

Expand Down
1 change: 1 addition & 0 deletions src/cpp/rtps/writer/RTPSWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ void RTPSWriter::add_statistics_sent_submessage(

#ifdef FASTDDS_STATISTICS
change->num_sent_submessages += num_locators;
on_data_generated(num_locators);
#endif // ifdef FASTDDS_STATISTICS
}

Expand Down
123 changes: 69 additions & 54 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,49 +602,60 @@ void StatefulWriter::unsent_change_added_to_history(
CacheChange_t* change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
bool should_notify_data_sent = false;

if (liveliness_lease_duration_ < c_TimeInfinite)
{
mp_RTPSParticipant->wlp()->assert_liveliness(
getGuid(),
liveliness_kind_,
liveliness_lease_duration_);
}

// Prepare the metadata for datasharing
if (is_datasharing_compatible())
{
prepare_datasharing_delivery(change);
}
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);

// Now for the rest of readers
if (!matched_remote_readers_.empty() || !matched_datasharing_readers_.empty() || !matched_local_readers_.empty())
{
if (!isAsync())
if (liveliness_lease_duration_ < c_TimeInfinite)
{
sync_delivery(change, max_blocking_time);
mp_RTPSParticipant->wlp()->assert_liveliness(
getGuid(),
liveliness_kind_,
liveliness_lease_duration_);
}
else

// Prepare the metadata for datasharing
if (is_datasharing_compatible())
{
async_delivery(change, max_blocking_time);
prepare_datasharing_delivery(change);
}

if (disable_positive_acks_)
// Now for the rest of readers
if (!matched_remote_readers_.empty() || !matched_datasharing_readers_.empty() ||
!matched_local_readers_.empty())
{
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);
if (!isAsync())
{
sync_delivery(change, max_blocking_time);
should_notify_data_sent = true;
}
else
{
async_delivery(change, max_blocking_time);
}

if (disable_positive_acks_)
{
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->restart_timer(max_blocking_time);
ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->restart_timer(max_blocking_time);
}
}
else
{
logInfo(RTPS_WRITER, "No reader proxy to add change.");
check_acked_status();
}
}
else

if (should_notify_data_sent)
{
logInfo(RTPS_WRITER, "No reader proxy to add change.");
check_acked_status();
on_data_sent();
}
}

Expand Down Expand Up @@ -764,39 +775,43 @@ bool StatefulWriter::change_removed_by_history(

void StatefulWriter::send_any_unsent_changes()
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);

bool activateHeartbeatPeriod = false;
SequenceNumber_t max_sequence = mp_history->next_sequence_number();
bool activateHeartbeatPeriod = false;
SequenceNumber_t max_sequence = mp_history->next_sequence_number();

if (mp_history->getHistorySize() == 0 || getMatchedReadersSize() == 0)
{
send_heartbeat_to_all_readers();
}
else if (m_separateSendingEnabled)
{
send_changes_separatedly(max_sequence, activateHeartbeatPeriod);
}
else
{
bool no_flow_controllers = m_controllers.empty() && mp_RTPSParticipant->getFlowControllers().empty();
if (no_flow_controllers || !there_are_remote_readers_)
if (mp_history->getHistorySize() == 0 || getMatchedReadersSize() == 0)
{
send_all_unsent_changes(max_sequence, activateHeartbeatPeriod);
send_heartbeat_to_all_readers();
}
else if (m_separateSendingEnabled)
{
send_changes_separatedly(max_sequence, activateHeartbeatPeriod);
}
else
{
send_unsent_changes_with_flow_control(max_sequence, activateHeartbeatPeriod);
bool no_flow_controllers = m_controllers.empty() && mp_RTPSParticipant->getFlowControllers().empty();
if (no_flow_controllers || !there_are_remote_readers_)
{
send_all_unsent_changes(max_sequence, activateHeartbeatPeriod);
}
else
{
send_unsent_changes_with_flow_control(max_sequence, activateHeartbeatPeriod);
}
}

if (activateHeartbeatPeriod)
{
periodic_hb_event_->restart_timer();
}
}

if (activateHeartbeatPeriod)
{
periodic_hb_event_->restart_timer();
// On VOLATILE writers, remove auto-acked (best effort readers) changes
check_acked_status();
}

// On VOLATILE writers, remove auto-acked (best effort readers) changes
check_acked_status();
on_data_sent();

logInfo(RTPS_WRITER, "Finish sending unsent changes");
}
Expand Down Expand Up @@ -955,7 +970,7 @@ void StatefulWriter::send_changes_separatedly(
else
{
SequenceNumber_t max_ack_seq = SequenceNumber_t::unknown();
auto sent_fun = [num_locators](
auto sent_fun = [this, num_locators](
CacheChange_t* change,
FragmentNumber_t /*frag*/)
{
Expand Down
Loading