Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[10796] RESENT_DATAS tests & implementation #1936

Merged
merged 11 commits into from
May 1, 2021
13 changes: 12 additions & 1 deletion .github/workflows/statistics_coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,27 @@ jobs:
run: |
cat src/Fast-DDS/.github/workflows/statistics_module.meta
colcon build \
--packages-up-to fastrtps --packages-skip fastrtps \
--event-handlers=console_direct+ \
--metas src/Fast-DDS/.github/workflows/statistics_module.meta \
--mixin coverage-gcc
for target in RTPSStatisticsTests StatisticsDomainParticipantTests StatisticsQosTests StatisticsDomainParticipantMockTests
do
colcon build \
--packages-select fastrtps \
--cmake-target $target \
--cmake-target-skip-unavailable \
--event-handlers=console_direct+ \
--metas src/Fast-DDS/.github/workflows/colcon.meta \
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
--mixin coverage-gcc
done

- name: Run tests
run: |
colcon test \
--packages-select fastrtps \
--event-handlers=console_direct+ \
--ctest-args -R Statistics
--ctest-args -R RTPSStatisticsTests -R Statistics

- name: Generate coverage report
run: |
Expand Down
8 changes: 4 additions & 4 deletions include/fastdds/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,9 @@ class ReaderProxy

/**
* Turns all REQUESTED changes into UNSENT.
* @return true if at least one change changed its status, false otherwise.
* @return the number of changes that changed its status.
*/
bool perform_acknack_response();
uint32_t perform_acknack_response();

/**
* Call this to inform a change was removed from history.
Expand Down Expand Up @@ -466,9 +466,9 @@ class ReaderProxy
* Converts all changes with a given status to a different status.
* @param previous Status to change.
* @param next Status to adopt.
* @return true when at least one change has been modified, false otherwise.
* @return the number of changes that have been modified.
*/
bool convert_status_on_all_changes(
uint32_t convert_status_on_all_changes(
ChangeForReaderStatus_t previous,
ChangeForReaderStatus_t next);

Expand Down
7 changes: 7 additions & 0 deletions include/fastdds/statistics/rtps/StatisticsCommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ class StatisticsWriterImpl

//! Report that a GAP message is sent
void on_gap();

/*
* @brief Report that several changes are marked for redelivery
* @param number of changes to redeliver
*/
void on_resent_data(
uint32_t to_send);
};

// Members are private details
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ class StatisticsWriterImpl
{
}

/*
* @brief Report that several changes are marked for redelivery
* @param number of changes to redeliver
*/
inline void on_resent_data(
uint32_t)
{
}

};

class StatisticsReaderImpl
Expand Down
14 changes: 7 additions & 7 deletions src/cpp/rtps/writer/ReaderProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ bool ReaderProxy::process_initial_acknack()
{
if (is_local_reader())
{
return convert_status_on_all_changes(UNACKNOWLEDGED, UNSENT);
return 0 != convert_status_on_all_changes(UNACKNOWLEDGED, UNSENT);
}

return true;
Expand Down Expand Up @@ -498,15 +498,15 @@ bool ReaderProxy::mark_fragment_as_sent_for_change(

bool ReaderProxy::perform_nack_supression()
{
return convert_status_on_all_changes(UNDERWAY, UNACKNOWLEDGED);
return 0 != convert_status_on_all_changes(UNDERWAY, UNACKNOWLEDGED);
}

bool ReaderProxy::perform_acknack_response()
uint32_t ReaderProxy::perform_acknack_response()
{
return convert_status_on_all_changes(REQUESTED, UNSENT);
}

bool ReaderProxy::convert_status_on_all_changes(
uint32_t ReaderProxy::convert_status_on_all_changes(
ChangeForReaderStatus_t previous,
ChangeForReaderStatus_t next)
{
Expand All @@ -515,17 +515,17 @@ bool ReaderProxy::convert_status_on_all_changes(
// NOTE: This is only called for REQUESTED=>UNSENT (acknack response) or
// UNDERWAY=>UNACKNOWLEDGED (nack supression)

bool at_least_one_modified = false;
uint32_t changed = 0;
for (ChangeForReader_t& change : changes_for_reader_)
{
if (change.getStatus() == previous)
{
at_least_one_modified = true;
++changed;
change.setStatus(next);
}
}

return at_least_one_modified;
return changed;
}

void ReaderProxy::change_has_been_removed(
Expand Down
15 changes: 13 additions & 2 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2379,14 +2379,20 @@ void StatefulWriter::perform_nack_response()
{
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
bool must_wake_up_async_thread = false;
uint32_t changes_to_resend = 0;

for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
[&must_wake_up_async_thread](ReaderProxy* reader)
[&must_wake_up_async_thread, &changes_to_resend](ReaderProxy* reader)
{
if (reader->perform_acknack_response() || reader->are_there_gaps())
uint32_t pending = reader->perform_acknack_response();
changes_to_resend += pending;

if ( pending > 0 || reader->are_there_gaps())
{
must_wake_up_async_thread = true;
// Do not exit the loop, perform_acknack_response must be executed for all readers
}

return false;
}
);
Expand All @@ -2395,6 +2401,11 @@ void StatefulWriter::perform_nack_response()
{
mp_RTPSParticipant->async_thread().wake_up(this);
}

lock.unlock();

// Notify the statistics module
on_resent_data(changes_to_resend);
}

void StatefulWriter::perform_nack_supression(
Expand Down
8 changes: 6 additions & 2 deletions src/cpp/statistics/rtps/StatisticsBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ struct StatisticsWriterAncillary
{
unsigned long long data_counter = {};
unsigned long long gap_counter = {};
unsigned long long resent_counter = {};
};

struct StatisticsReaderAncillary
Expand All @@ -69,11 +70,14 @@ template<class Function>
Function StatisticsListenersImpl::for_each_listener(
Function f)
{
std::lock_guard<fastrtps::RecursiveTimedMutex> lock(get_statistics_mutex());
// Use a collection copy to prevent locking on traversal
std::unique_lock<fastrtps::RecursiveTimedMutex> lock(get_statistics_mutex());
auto listeners = members_->listeners;
lock.unlock();

if (members_)
{
for (auto& listener : members_->listeners)
for (auto& listener : listeners)
{
f(listener);
}
Expand Down
27 changes: 27 additions & 0 deletions src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,30 @@ void StatisticsWriterImpl::on_gap()
listener->on_statistics_data(data);
});
}

void StatisticsWriterImpl::on_resent_data(
uint32_t to_send)
{
if ( 0 == to_send )
{
return;
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
}

EntityCount notification;
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
notification.guid(to_statistics_type(get_guid()));

{
std::lock_guard<fastrtps::RecursiveTimedMutex> lock(get_statistics_mutex());
notification.count(get_members()->resent_counter += to_send);
}

// Perform the callbacks
Data data;
// note that the setter sets RESENT_DATAS by default
data.entity_count(notification);

for_each_listener([&data](const std::shared_ptr<IListener>& listener)
{
listener->on_statistics_data(data);
});
}
55 changes: 50 additions & 5 deletions test/unittest/statistics/rtps/RTPSStatisticsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ struct MockListener : IListener
case DATA_COUNT:
on_data_count(data.entity_count());
break;
case RESENT_DATAS:
on_resent_count(data.entity_count());
break;
case GAP_COUNT:
on_gap_count(data.entity_count());
break;
Expand All @@ -87,6 +90,7 @@ struct MockListener : IListener
MOCK_METHOD1(on_heartbeat_count, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_acknack_count, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_data_count, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_resent_count, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_gap_count, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_nackfrag_count, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_entity_discovery, void(const eprosima::fastdds::statistics::DiscoveryTime&));
Expand Down Expand Up @@ -234,6 +238,8 @@ class RTPSStatisticsTestsImpl
writer_history_ = new WriterHistory(history_attributes);

WriterAttributes w_att;
w_att.times.heartbeatPeriod.seconds = 0;
w_att.times.heartbeatPeriod.nanosec = 250 * 1000 * 1000; // reduce acknowledgement wait
w_att.endpoint.reliabilityKind = reliability_qos;
w_att.endpoint.durabilityKind = durability_qos;

Expand Down Expand Up @@ -472,6 +478,7 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_management)
* This test checks RTPSParticipant, RTPSWriter and RTPSReader statistics module related APIs.
* - RTPS_SENT callbacks are performed
* - DATA_COUNT callbacks are performed for DATA submessages
* - RESENT_DATAS callbacks are performed for DATA submessages demanded by the readers
* - ACKNACK_COUNT callbacks are performed
* - HEARBEAT_COUNT callbacks are performed
*/
Expand All @@ -482,6 +489,39 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)
using namespace fastrtps::rtps;
using namespace std;

// make sure some messages are lost to assure the RESENT_DATAS callback
set_transport_filter(
DATA,
[](fastrtps::rtps::CDRMessage_t& msg)-> bool
{
static unsigned int samples_filtered = 0;
uint32_t old_pos = msg.pos;

// see RTPS DDS 9.4.5.3 Data Submessage
EntityId_t readerID, writerID;
SequenceNumber_t sn;

msg.pos += 2; // flags
msg.pos += 2; // octets to inline quos
CDRMessage::readEntityId(&msg, &readerID);
CDRMessage::readEntityId(&msg, &writerID);
CDRMessage::readSequenceNumber(&msg, &sn);

// restore buffer pos
msg.pos = old_pos;

// generate losses
if ( samples_filtered < 10 // only a few times (mind the interfaces)
&& (writerID.value[3] & 0xC0) == 0 // only user endpoints
&& (sn == SequenceNumber_t{0, 1})) // only first sample
{
++samples_filtered;
return true;
}

return false;
});

// create the testing endpoints
uint16_t length = 255;
create_endpoints(length, RELIABLE);
Expand All @@ -492,7 +532,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)

// writer callbacks through participant listener
auto participant_writer_listener = make_shared<MockListener>();
ASSERT_TRUE(participant_->add_statistics_listener(participant_writer_listener, EventKind::DATA_COUNT));
ASSERT_TRUE(participant_->add_statistics_listener(participant_writer_listener,
EventKind::DATA_COUNT | EventKind::RESENT_DATAS));

// writer specific callbacks
auto writer_listener = make_shared<MockListener>();
Expand All @@ -518,9 +559,13 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_data_count)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_resent_count)
.Times(AtLeast(1));

EXPECT_CALL(*participant_writer_listener, on_data_count)
.Times(AtLeast(1));
EXPECT_CALL(*participant_writer_listener, on_resent_count)
.Times(AtLeast(1));

// + RTPSReader: SUBSCRIPTION_THROUGHPUT,
// SAMPLE_DATAS & PHYSICAL_DATA
Expand Down Expand Up @@ -552,7 +597,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)
EXPECT_TRUE(reader_->remove_statistics_listener(reader_listener));

EXPECT_TRUE(participant_->remove_statistics_listener(participant_listener, EventKind::RTPS_SENT));
EXPECT_TRUE(participant_->remove_statistics_listener(participant_writer_listener, EventKind::DATA_COUNT));
EXPECT_TRUE(participant_->remove_statistics_listener(participant_writer_listener,
EventKind::DATA_COUNT | EventKind::RESENT_DATAS));
EXPECT_TRUE(participant_->remove_statistics_listener(participant_reader_listener, EventKind::ACKNACK_COUNT));
}

Expand Down Expand Up @@ -658,6 +704,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_gap_callback)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_data_count)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_resent_count)
.Times(AtLeast(1));

EXPECT_CALL(*participant_writer_listener, on_gap_count)
.Times(AtLeast(1));
Expand All @@ -684,9 +732,6 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_gap_callback)
// match writer and reader on a dummy topic
match_endpoints(false, "string", "statisticsSmallTopic");

// std::this_thread::sleep_for(std::chrono::seconds(10));


// wait for reception
EXPECT_TRUE(reader_->wait_for_unread_cache(Duration_t(5, 0)));

Expand Down