diff --git a/.github/workflows/statistics_coverage.yml b/.github/workflows/statistics_coverage.yml index 3f3966f3c0a..f916bd664b4 100644 --- a/.github/workflows/statistics_coverage.yml +++ b/.github/workflows/statistics_coverage.yml @@ -30,16 +30,27 @@ jobs: run: | cat src/Fast-DDS/.github/workflows/statistics_module.meta colcon build \ + --packages-up-to fastrtps --packages-skip fastrtps \ --event-handlers=console_direct+ \ --metas src/Fast-DDS/.github/workflows/statistics_module.meta \ --mixin coverage-gcc + for target in RTPSStatisticsTests StatisticsDomainParticipantTests StatisticsQosTests DomainParticipantStatisticsListenerTests StatisticsDomainParticipantMockTests BlackboxTests_DDS_PIM + do + colcon build \ + --packages-select fastrtps \ + --cmake-target $target \ + --cmake-target-skip-unavailable \ + --event-handlers=console_direct+ \ + --metas src/Fast-DDS/.github/workflows/statistics_module.meta \ + --mixin coverage-gcc + done - name: Run tests run: | colcon test \ --packages-select fastrtps \ --event-handlers=console_direct+ \ - --ctest-args -R Statistics + --ctest-args -R RTPSStatisticsTests -R Statistics - name: Generate coverage report run: | diff --git a/include/fastdds/rtps/writer/ReaderProxy.h b/include/fastdds/rtps/writer/ReaderProxy.h index c1ecc1089d6..13644eb7e80 100644 --- a/include/fastdds/rtps/writer/ReaderProxy.h +++ b/include/fastdds/rtps/writer/ReaderProxy.h @@ -235,9 +235,9 @@ class ReaderProxy /** * Turns all REQUESTED changes into UNSENT. - * @return true if at least one change changed its status, false otherwise. + * @return the number of changes that changed its status. */ - bool perform_acknack_response(); + uint32_t perform_acknack_response(); /** * Call this to inform a change was removed from history. @@ -466,9 +466,9 @@ class ReaderProxy * Converts all changes with a given status to a different status. * @param previous Status to change. * @param next Status to adopt. - * @return true when at least one change has been modified, false otherwise. + * @return the number of changes that have been modified. */ - bool convert_status_on_all_changes( + uint32_t convert_status_on_all_changes( ChangeForReaderStatus_t previous, ChangeForReaderStatus_t next); diff --git a/include/fastdds/statistics/rtps/StatisticsCommon.hpp b/include/fastdds/statistics/rtps/StatisticsCommon.hpp index ae40dc92ecf..fbcf1bee85f 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommon.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommon.hpp @@ -159,6 +159,13 @@ class StatisticsWriterImpl //! Report that a GAP message is sent void on_gap(); + + /* + * @brief Report that several changes are marked for redelivery + * @param number of changes to redeliver + */ + void on_resent_data( + uint32_t to_send); }; // Members are private details diff --git a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp index 0ff091972bb..f1ed1afb7dc 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp @@ -65,6 +65,15 @@ class StatisticsWriterImpl { } + /* + * @brief Report that several changes are marked for redelivery + * @param number of changes to redeliver + */ + inline void on_resent_data( + uint32_t) + { + } + }; class StatisticsReaderImpl diff --git a/src/cpp/rtps/writer/ReaderProxy.cpp b/src/cpp/rtps/writer/ReaderProxy.cpp index c7b5b5470f1..8a55ac56409 100644 --- a/src/cpp/rtps/writer/ReaderProxy.cpp +++ b/src/cpp/rtps/writer/ReaderProxy.cpp @@ -401,7 +401,7 @@ bool ReaderProxy::process_initial_acknack() { if (is_local_reader()) { - return convert_status_on_all_changes(UNACKNOWLEDGED, UNSENT); + return 0 != convert_status_on_all_changes(UNACKNOWLEDGED, UNSENT); } return true; @@ -498,15 +498,15 @@ bool ReaderProxy::mark_fragment_as_sent_for_change( bool ReaderProxy::perform_nack_supression() { - return convert_status_on_all_changes(UNDERWAY, UNACKNOWLEDGED); + return 0 != convert_status_on_all_changes(UNDERWAY, UNACKNOWLEDGED); } -bool ReaderProxy::perform_acknack_response() +uint32_t ReaderProxy::perform_acknack_response() { return convert_status_on_all_changes(REQUESTED, UNSENT); } -bool ReaderProxy::convert_status_on_all_changes( +uint32_t ReaderProxy::convert_status_on_all_changes( ChangeForReaderStatus_t previous, ChangeForReaderStatus_t next) { @@ -515,17 +515,17 @@ bool ReaderProxy::convert_status_on_all_changes( // NOTE: This is only called for REQUESTED=>UNSENT (acknack response) or // UNDERWAY=>UNACKNOWLEDGED (nack supression) - bool at_least_one_modified = false; + uint32_t changed = 0; for (ChangeForReader_t& change : changes_for_reader_) { if (change.getStatus() == previous) { - at_least_one_modified = true; + ++changed; change.setStatus(next); } } - return at_least_one_modified; + return changed; } void ReaderProxy::change_has_been_removed( diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 99bfe1fcfc3..189b33cad6a 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -2379,14 +2379,20 @@ void StatefulWriter::perform_nack_response() { std::unique_lock lock(mp_mutex); bool must_wake_up_async_thread = false; + uint32_t changes_to_resend = 0; + for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, - [&must_wake_up_async_thread](ReaderProxy* reader) + [&must_wake_up_async_thread, &changes_to_resend](ReaderProxy* reader) { - if (reader->perform_acknack_response() || reader->are_there_gaps()) + uint32_t pending = reader->perform_acknack_response(); + changes_to_resend += pending; + + if ( pending > 0 || reader->are_there_gaps()) { must_wake_up_async_thread = true; // Do not exit the loop, perform_acknack_response must be executed for all readers } + return false; } ); @@ -2395,6 +2401,11 @@ void StatefulWriter::perform_nack_response() { mp_RTPSParticipant->async_thread().wake_up(this); } + + lock.unlock(); + + // Notify the statistics module + on_resent_data(changes_to_resend); } void StatefulWriter::perform_nack_supression( diff --git a/src/cpp/statistics/rtps/StatisticsBase.hpp b/src/cpp/statistics/rtps/StatisticsBase.hpp index 058dc08e6b4..9c75aa5e008 100644 --- a/src/cpp/statistics/rtps/StatisticsBase.hpp +++ b/src/cpp/statistics/rtps/StatisticsBase.hpp @@ -57,6 +57,7 @@ struct StatisticsWriterAncillary { unsigned long long data_counter = {}; unsigned long long gap_counter = {}; + unsigned long long resent_counter = {}; }; struct StatisticsReaderAncillary @@ -69,11 +70,14 @@ template Function StatisticsListenersImpl::for_each_listener( Function f) { - std::lock_guard lock(get_statistics_mutex()); + // Use a collection copy to prevent locking on traversal + std::unique_lock lock(get_statistics_mutex()); + auto listeners = members_->listeners; + lock.unlock(); if (members_) { - for (auto& listener : members_->listeners) + for (auto& listener : listeners) { f(listener); } diff --git a/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp b/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp index 6bef809256e..080760573dc 100644 --- a/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp +++ b/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp @@ -128,3 +128,30 @@ void StatisticsWriterImpl::on_gap() listener->on_statistics_data(data); }); } + +void StatisticsWriterImpl::on_resent_data( + uint32_t to_send) +{ + if ( 0 == to_send ) + { + return; + } + + EntityCount notification; + notification.guid(to_statistics_type(get_guid())); + + { + std::lock_guard lock(get_statistics_mutex()); + notification.count(get_members()->resent_counter += to_send); + } + + // Perform the callbacks + Data data; + // note that the setter sets RESENT_DATAS by default + data.entity_count(notification); + + for_each_listener([&data](const std::shared_ptr& listener) + { + listener->on_statistics_data(data); + }); +} diff --git a/test/unittest/statistics/rtps/RTPSStatisticsTests.cpp b/test/unittest/statistics/rtps/RTPSStatisticsTests.cpp index cd084c54820..5fd64c90872 100644 --- a/test/unittest/statistics/rtps/RTPSStatisticsTests.cpp +++ b/test/unittest/statistics/rtps/RTPSStatisticsTests.cpp @@ -69,6 +69,9 @@ struct MockListener : IListener case DATA_COUNT: on_data_count(data.entity_count()); break; + case RESENT_DATAS: + on_resent_count(data.entity_count()); + break; case GAP_COUNT: on_gap_count(data.entity_count()); break; @@ -87,6 +90,7 @@ struct MockListener : IListener MOCK_METHOD1(on_heartbeat_count, void(const eprosima::fastdds::statistics::EntityCount&)); MOCK_METHOD1(on_acknack_count, void(const eprosima::fastdds::statistics::EntityCount&)); MOCK_METHOD1(on_data_count, void(const eprosima::fastdds::statistics::EntityCount&)); + MOCK_METHOD1(on_resent_count, void(const eprosima::fastdds::statistics::EntityCount&)); MOCK_METHOD1(on_gap_count, void(const eprosima::fastdds::statistics::EntityCount&)); MOCK_METHOD1(on_nackfrag_count, void(const eprosima::fastdds::statistics::EntityCount&)); MOCK_METHOD1(on_entity_discovery, void(const eprosima::fastdds::statistics::DiscoveryTime&)); @@ -234,6 +238,30 @@ class RTPSStatisticsTestsImpl writer_history_ = new WriterHistory(history_attributes); WriterAttributes w_att; + w_att.times.heartbeatPeriod.seconds = 0; + w_att.times.heartbeatPeriod.nanosec = 250 * 1000 * 1000; // reduce acknowledgement wait + w_att.endpoint.reliabilityKind = reliability_qos; + w_att.endpoint.durabilityKind = durability_qos; + + writer_ = RTPSDomain::createRTPSWriter(participant_, w_att, writer_history_); + } + + void create_lazy_writer( + uint32_t payloadMaxSize, + fastrtps::rtps::ReliabilityKind_t reliability_qos = fastrtps::rtps::ReliabilityKind_t::RELIABLE, + fastrtps::rtps::DurabilityKind_t durability_qos = fastrtps::rtps::DurabilityKind_t::TRANSIENT_LOCAL) + { + using namespace fastrtps::rtps; + + HistoryAttributes history_attributes; + history_attributes.payloadMaxSize = payloadMaxSize; + writer_history_ = new WriterHistory(history_attributes); + + WriterAttributes w_att; + w_att.times.heartbeatPeriod.seconds = 3; + w_att.times.heartbeatPeriod.nanosec = 0; + w_att.times.nackResponseDelay.seconds = 0; + w_att.times.nackResponseDelay.nanosec = 300 * 1000 * 1000; // increase ACKNACK response delay w_att.endpoint.reliabilityKind = reliability_qos; w_att.endpoint.durabilityKind = durability_qos; @@ -472,6 +500,7 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_management) * This test checks RTPSParticipant, RTPSWriter and RTPSReader statistics module related APIs. * - RTPS_SENT callbacks are performed * - DATA_COUNT callbacks are performed for DATA submessages + * - RESENT_DATAS callbacks are performed for DATA submessages demanded by the readers * - ACKNACK_COUNT callbacks are performed * - HEARBEAT_COUNT callbacks are performed */ @@ -482,6 +511,39 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks) using namespace fastrtps::rtps; using namespace std; + // make sure some messages are lost to assure the RESENT_DATAS callback + set_transport_filter( + DATA, + [](fastrtps::rtps::CDRMessage_t& msg)-> bool + { + static unsigned int samples_filtered = 0; + uint32_t old_pos = msg.pos; + + // see RTPS DDS 9.4.5.3 Data Submessage + EntityId_t readerID, writerID; + SequenceNumber_t sn; + + msg.pos += 2; // flags + msg.pos += 2; // octets to inline quos + CDRMessage::readEntityId(&msg, &readerID); + CDRMessage::readEntityId(&msg, &writerID); + CDRMessage::readSequenceNumber(&msg, &sn); + + // restore buffer pos + msg.pos = old_pos; + + // generate losses + if ( samples_filtered < 10 // only a few times (mind the interfaces) + && (writerID.value[3] & 0xC0) == 0 // only user endpoints + && (sn == SequenceNumber_t{0, 1})) // only first sample + { + ++samples_filtered; + return true; + } + + return false; + }); + // create the testing endpoints uint16_t length = 255; create_endpoints(length, RELIABLE); @@ -492,7 +554,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks) // writer callbacks through participant listener auto participant_writer_listener = make_shared(); - ASSERT_TRUE(participant_->add_statistics_listener(participant_writer_listener, EventKind::DATA_COUNT)); + ASSERT_TRUE(participant_->add_statistics_listener(participant_writer_listener, + EventKind::DATA_COUNT | EventKind::RESENT_DATAS)); // writer specific callbacks auto writer_listener = make_shared(); @@ -518,9 +581,13 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks) .Times(AtLeast(1)); EXPECT_CALL(*writer_listener, on_data_count) .Times(AtLeast(1)); + EXPECT_CALL(*writer_listener, on_resent_count) + .Times(AtLeast(1)); EXPECT_CALL(*participant_writer_listener, on_data_count) .Times(AtLeast(1)); + EXPECT_CALL(*participant_writer_listener, on_resent_count) + .Times(AtLeast(1)); // + RTPSReader: SUBSCRIPTION_THROUGHPUT, // SAMPLE_DATAS & PHYSICAL_DATA @@ -552,7 +619,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks) EXPECT_TRUE(reader_->remove_statistics_listener(reader_listener)); EXPECT_TRUE(participant_->remove_statistics_listener(participant_listener, EventKind::RTPS_SENT)); - EXPECT_TRUE(participant_->remove_statistics_listener(participant_writer_listener, EventKind::DATA_COUNT)); + EXPECT_TRUE(participant_->remove_statistics_listener(participant_writer_listener, + EventKind::DATA_COUNT | EventKind::RESENT_DATAS)); EXPECT_TRUE(participant_->remove_statistics_listener(participant_reader_listener, EventKind::ACKNACK_COUNT)); } @@ -658,6 +726,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_gap_callback) .Times(AtLeast(1)); EXPECT_CALL(*writer_listener, on_data_count) .Times(AtLeast(1)); + EXPECT_CALL(*writer_listener, on_resent_count) + .Times(AtLeast(1)); EXPECT_CALL(*participant_writer_listener, on_gap_count) .Times(AtLeast(1)); @@ -684,9 +754,6 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_gap_callback) // match writer and reader on a dummy topic match_endpoints(false, "string", "statisticsSmallTopic"); - // std::this_thread::sleep_for(std::chrono::seconds(10)); - - // wait for reception EXPECT_TRUE(reader_->wait_for_unread_cache(Duration_t(5, 0))); @@ -758,6 +825,112 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_discovery_callbacks) EXPECT_TRUE(participant_->remove_statistics_listener(participant_listener, EventKind::DISCOVERED_ENTITY)); } +/* + * This test checks improves coverity by asserting that every RESENT_DATAS callback is associated with a message + */ +TEST_F(RTPSStatisticsTests, statistics_rpts_avoid_empty_resent_callbacks) +{ + using namespace ::testing; + using namespace fastrtps; + using namespace fastrtps::rtps; + using namespace std; + + // The history must be cleared after the acknack is received + // but before the response is processed + atomic_bool acknack_sent(false); + + // filter out all user DATAs + set_transport_filter( + DATA, + [](fastrtps::rtps::CDRMessage_t& msg)-> bool + { + uint32_t old_pos = msg.pos; + + // see RTPS DDS 9.4.5.3 Data Submessage + EntityId_t readerID, writerID; + SequenceNumber_t sn; + + msg.pos += 2; // flags + msg.pos += 2; // octets to inline quos + CDRMessage::readEntityId(&msg, &readerID); + CDRMessage::readEntityId(&msg, &writerID); + CDRMessage::readSequenceNumber(&msg, &sn); + + // restore buffer pos + msg.pos = old_pos; + + // filter user traffic + return ((writerID.value[3] & 0xC0) == 0); + }); + + set_transport_filter( + ACKNACK, + [&acknack_sent](fastrtps::rtps::CDRMessage_t& msg)-> bool + { + uint32_t old_pos = msg.pos; + + // see RTPS DDS 9.4.5.2 Acknack Submessage + EntityId_t readerID, writerID; + SequenceNumberSet_t snSet; + + CDRMessage::readEntityId(&msg, &readerID); + CDRMessage::readEntityId(&msg, &writerID); + snSet = CDRMessage::readSequenceNumberSet(&msg); + + // restore buffer pos + msg.pos = old_pos; + + // The acknack has been sent + acknack_sent = !snSet.empty(); + + // we are not filtering + return false; + }); + + // create the listeners and set expectations + auto writer_listener = make_shared(); + + // check callbacks on data exchange + EXPECT_CALL(*writer_listener, on_heartbeat_count) + .Times(AtLeast(1)); + EXPECT_CALL(*writer_listener, on_data_count) + .Times(AtLeast(1)); + EXPECT_CALL(*writer_listener, on_resent_count) + .Times(0); // never called + + // create the writer, reader is a late joiner + uint16_t length = 255; + create_lazy_writer(length, RELIABLE, TRANSIENT_LOCAL); + + // writer specific callbacks + ASSERT_TRUE(writer_->add_statistics_listener(writer_listener)); + + // create the late joiner as VOLATILE + create_reader(length, RELIABLE, VOLATILE); + + // match writer and reader on a dummy topic + match_endpoints(false, "string", "statisticsSmallTopic"); + + // add a sample to the writer history that cannot be delivered + // due to the filter + write_small_sample(length); + + // wait for the acknack to be sent before clearing the history + while (!acknack_sent) + { + this_thread::sleep_for(chrono::milliseconds(100)); + } + + // remove the sample from the writer + ASSERT_TRUE(writer_history_->remove_change(SequenceNumber_t{ 0, 1 })); + + // give room to process the ACKNACK + this_thread::sleep_for(chrono::milliseconds(400)); + + // release the listeners + EXPECT_TRUE(writer_->remove_statistics_listener(writer_listener)); +} + } // namespace rtps } // namespace statistics } // namespace fastdds