From 08ef0e127c60e9e16973b0de0397500a968528b8 Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Wed, 12 May 2021 15:52:51 +0200 Subject: [PATCH 1/4] [10792] PUBLICATION_THROUGHPUT implementation --- .../statistics/rtps/StatisticsCommon.hpp | 7 +++ .../statistics/rtps/StatisticsCommonEmpty.hpp | 9 ++++ src/cpp/rtps/writer/StatefulWriter.cpp | 3 ++ src/cpp/rtps/writer/StatelessWriter.cpp | 3 ++ src/cpp/statistics/rtps/StatisticsBase.hpp | 1 + .../rtps/writer/StatisticsWriterImpl.cpp | 43 ++++++++++++++++--- 6 files changed, 61 insertions(+), 5 deletions(-) diff --git a/include/fastdds/statistics/rtps/StatisticsCommon.hpp b/include/fastdds/statistics/rtps/StatisticsCommon.hpp index 8aed89d281e..bb8a160e03f 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommon.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommon.hpp @@ -172,6 +172,13 @@ class StatisticsWriterImpl /// Notify listeners of DATA / DATA_FRAG counts void on_data_sent(); + /** + * @brief Reports throughtput based on last added sample to history + * @param payload size of the message sent + */ + void on_publish_throughput( + uint32_t payload); + /// Report that a GAP message is sent void on_gap(); diff --git a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp index 47301e6ce59..c4d5aafd01e 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp @@ -78,6 +78,15 @@ class StatisticsWriterImpl { } + /** + * @brief Reports throughtput based on last added sample to history + * @param size of the message sent + */ + inline void on_publish_throughput( + uint32_t) + { + } + /// Report that a GAP message is sent inline void on_gap() { diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index f42d1eee6f9..950e74685f7 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -657,6 +657,9 @@ void StatefulWriter::unsent_change_added_to_history( { on_data_sent(); } + + // Throughput should be notified even if no matches are available + on_publish_throughput(change->serializedPayload.length); } bool StatefulWriter::intraprocess_delivery( diff --git a/src/cpp/rtps/writer/StatelessWriter.cpp b/src/cpp/rtps/writer/StatelessWriter.cpp index f7de4bac1d8..661aef2c90a 100644 --- a/src/cpp/rtps/writer/StatelessWriter.cpp +++ b/src/cpp/rtps/writer/StatelessWriter.cpp @@ -432,6 +432,9 @@ void StatelessWriter::unsent_change_added_to_history( { on_data_sent(); } + + // Throughput should be notified even if no matches are available + on_publish_throughput(change->serializedPayload.length); } bool StatelessWriter::intraprocess_delivery( diff --git a/src/cpp/statistics/rtps/StatisticsBase.hpp b/src/cpp/statistics/rtps/StatisticsBase.hpp index 87c117b34c7..5e01cae5f51 100644 --- a/src/cpp/statistics/rtps/StatisticsBase.hpp +++ b/src/cpp/statistics/rtps/StatisticsBase.hpp @@ -59,6 +59,7 @@ struct StatisticsWriterAncillary unsigned long long data_counter = {}; unsigned long long gap_counter = {}; unsigned long long resent_counter = {}; + std::chrono::time_point last_history_change_ = std::chrono::steady_clock::now(); }; struct StatisticsReaderAncillary diff --git a/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp b/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp index 32aacc6997f..2f96e06490a 100644 --- a/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp +++ b/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp @@ -71,7 +71,7 @@ void StatisticsWriterImpl::on_sample_datas( // Perform the callbacks Data data; // note that the setter sets SAMPLE_DATAS by default - data.sample_identity_count(notification); + data.sample_identity_count(std::move(notification)); for_each_listener([&data](const std::shared_ptr& listener) { @@ -101,7 +101,7 @@ void StatisticsWriterImpl::on_data_sent() // Perform the callbacks Data data; // note that the setter sets RESENT_DATAS by default - data.entity_count(notification); + data.entity_count(std::move(notification)); data._d(EventKind::DATA_COUNT); for_each_listener([&data](const std::shared_ptr& listener) @@ -120,7 +120,7 @@ void StatisticsWriterImpl::on_heartbeat( // Perform the callbacks Data data; // note that the setter sets RESENT_DATAS by default - data.entity_count(notification); + data.entity_count(std::move(notification)); data._d(EventKind::HEARTBEAT_COUNT); for_each_listener([&data](const std::shared_ptr& listener) @@ -142,7 +142,7 @@ void StatisticsWriterImpl::on_gap() // Perform the callbacks Data data; // note that the setter sets RESENT_DATAS by default - data.entity_count(notification); + data.entity_count(std::move(notification)); data._d(EventKind::GAP_COUNT); for_each_listener([&data](const std::shared_ptr& listener) @@ -170,10 +170,43 @@ void StatisticsWriterImpl::on_resent_data( // Perform the callbacks Data data; // note that the setter sets RESENT_DATAS by default - data.entity_count(notification); + data.entity_count(std::move(notification)); for_each_listener([&data](const std::shared_ptr& listener) { listener->on_statistics_data(data); }); } + +void StatisticsWriterImpl::on_publish_throughput( + uint32_t payload) +{ + using namespace std; + using namespace chrono; + + if (payload > 0 ) + { + // update state + time_point former_timepoint; + auto & current_timepoint = get_members()->last_history_change_; + { + lock_guard lock(get_statistics_mutex()); + former_timepoint = current_timepoint; + current_timepoint = steady_clock::now(); + } + + EntityData notification; + notification.guid(to_statistics_type(get_guid())); + notification.data(payload / duration_cast>(current_timepoint - former_timepoint).count()); + + // Perform the callbacks + Data data; + // note that the setter sets PUBLICATION_THROUGHPUT by default + data.entity_data(std::move(notification)); + + for_each_listener([&data](const std::shared_ptr& listener) + { + listener->on_statistics_data(data); + }); + } +} From 04e51c9f3922eb30c5721a98f33ada5a80521083 Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Wed, 12 May 2021 15:53:17 +0200 Subject: [PATCH 2/4] [10792] PUBLICATION_THROUGHPUT testing --- .../statistics/rtps/RTPSStatisticsTests.cpp | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/test/unittest/statistics/rtps/RTPSStatisticsTests.cpp b/test/unittest/statistics/rtps/RTPSStatisticsTests.cpp index 067a5dffbab..3c9c23cc0bd 100644 --- a/test/unittest/statistics/rtps/RTPSStatisticsTests.cpp +++ b/test/unittest/statistics/rtps/RTPSStatisticsTests.cpp @@ -99,6 +99,9 @@ struct MockListener : IListener case SAMPLE_DATAS: on_sample_datas(data.sample_identity_count()); break; + case PUBLICATION_THROUGHPUT: + on_publisher_throughput(data.entity_data()); + break; default: on_unexpected_kind(kind); break; @@ -117,6 +120,7 @@ struct MockListener : IListener MOCK_METHOD1(on_pdp_packets, void(const eprosima::fastdds::statistics::EntityCount&)); MOCK_METHOD1(on_edp_packets, void(const eprosima::fastdds::statistics::EntityCount&)); MOCK_METHOD1(on_sample_datas, void(const eprosima::fastdds::statistics::SampleIdentityCount&)); + MOCK_METHOD1(on_publisher_throughput, void(const eprosima::fastdds::statistics::EntityData&)); MOCK_METHOD1(on_unexpected_kind, void(eprosima::fastdds::statistics::EventKind)); }; @@ -529,6 +533,7 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_management) * - ACKNACK_COUNT callbacks are performed * - HEARBEAT_COUNT callbacks are performed * - SAMPLE_DATAS callbacks are performed + * - PUBLICATION_THROUGHPUT callbacks are performed */ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks) { @@ -581,7 +586,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks) // writer callbacks through participant listener auto participant_writer_listener = make_shared(); ASSERT_TRUE(participant_->add_statistics_listener(participant_writer_listener, - EventKind::DATA_COUNT | EventKind::RESENT_DATAS | EventKind::SAMPLE_DATAS)); + EventKind::DATA_COUNT | EventKind::RESENT_DATAS | + EventKind::PUBLICATION_THROUGHPUT | EventKind::SAMPLE_DATAS)); // writer specific callbacks auto writer_listener = make_shared(); @@ -612,6 +618,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks) .Times(AtLeast(1)); EXPECT_CALL(*writer_listener, on_sample_datas) .Times(AtLeast(1)); + EXPECT_CALL(*writer_listener, on_publisher_throughput) + .Times(AtLeast(1)); EXPECT_CALL(*participant_writer_listener, on_data_count) .Times(AtLeast(1)); @@ -619,6 +627,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks) .Times(AtLeast(1)); EXPECT_CALL(*participant_writer_listener, on_sample_datas) .Times(AtLeast(1)); + EXPECT_CALL(*participant_writer_listener, on_publisher_throughput) + .Times(AtLeast(1)); // + RTPSReader: SUBSCRIPTION_THROUGHPUT, // SAMPLE_DATAS & PHYSICAL_DATA @@ -655,7 +665,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks) EXPECT_TRUE(participant_->remove_statistics_listener(participant_listener, EventKind::RTPS_SENT)); EXPECT_TRUE(participant_->remove_statistics_listener(participant_writer_listener, - EventKind::DATA_COUNT | EventKind::RESENT_DATAS | EventKind::SAMPLE_DATAS)); + EventKind::DATA_COUNT | EventKind::RESENT_DATAS | + EventKind::PUBLICATION_THROUGHPUT | EventKind::SAMPLE_DATAS)); EXPECT_TRUE(participant_->remove_statistics_listener(participant_reader_listener, EventKind::ACKNACK_COUNT | EventKind::HISTORY2HISTORY_LATENCY)); } @@ -769,6 +780,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_gap_callback) .Times(AtLeast(1)); EXPECT_CALL(*writer_listener, on_sample_datas) .Times(AtLeast(1)); + EXPECT_CALL(*writer_listener, on_publisher_throughput) + .Times(AtLeast(1)); EXPECT_CALL(*participant_writer_listener, on_gap_count) .Times(AtLeast(1)); @@ -943,6 +956,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_avoid_empty_resent_callbacks) .Times(AtLeast(1)); EXPECT_CALL(*writer_listener, on_sample_datas) .Times(AtLeast(1)); + EXPECT_CALL(*writer_listener, on_publisher_throughput) + .Times(AtLeast(1)); EXPECT_CALL(*writer_listener, on_resent_count) .Times(0); // never called From 8bd40368f10cb303dbcc318a320fba1347e1468b Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Wed, 12 May 2021 16:03:10 +0200 Subject: [PATCH 3/4] [10792] linter --- include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp | 2 +- src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp index c4d5aafd01e..5e0181a7f54 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp @@ -84,7 +84,7 @@ class StatisticsWriterImpl */ inline void on_publish_throughput( uint32_t) - { + { } /// Report that a GAP message is sent diff --git a/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp b/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp index 2f96e06490a..c201aee783d 100644 --- a/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp +++ b/src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp @@ -188,7 +188,7 @@ void StatisticsWriterImpl::on_publish_throughput( { // update state time_point former_timepoint; - auto & current_timepoint = get_members()->last_history_change_; + auto& current_timepoint = get_members()->last_history_change_; { lock_guard lock(get_statistics_mutex()); former_timepoint = current_timepoint; From 1fced99bb940212bca27173c06e8d88b665881f6 Mon Sep 17 00:00:00 2001 From: Miguel Barro Date: Mon, 17 May 2021 12:48:27 +0200 Subject: [PATCH 4/4] [10792] addressing reviewer's comments --- include/fastdds/statistics/rtps/StatisticsCommon.hpp | 2 +- include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/fastdds/statistics/rtps/StatisticsCommon.hpp b/include/fastdds/statistics/rtps/StatisticsCommon.hpp index bb8a160e03f..2b0aa4f550f 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommon.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommon.hpp @@ -173,7 +173,7 @@ class StatisticsWriterImpl void on_data_sent(); /** - * @brief Reports throughtput based on last added sample to history + * @brief Reports publication throughtput based on last added sample to writer's history * @param payload size of the message sent */ void on_publish_throughput( diff --git a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp index 5e0181a7f54..c68fe16cc2d 100644 --- a/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp +++ b/include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp @@ -79,7 +79,7 @@ class StatisticsWriterImpl } /** - * @brief Reports throughtput based on last added sample to history + * @brief Reports publication throughtput based on last added sample to writer's history * @param size of the message sent */ inline void on_publish_throughput(