Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[10792] Collection of data and notification to listeners PUBLICATION_THROUGHPUT #1955

Merged
merged 4 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/fastdds/statistics/rtps/StatisticsCommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ class StatisticsWriterImpl
/// Notify listeners of DATA / DATA_FRAG counts
void on_data_sent();

/**
* @brief Reports throughtput based on last added sample to history
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @brief Reports throughtput based on last added sample to history
* @brief Reports publication throughtput based on last added sample to writer's history

* @param payload size of the message sent
*/
void on_publish_throughput(
uint32_t payload);

/// Report that a GAP message is sent
void on_gap();

Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ class StatisticsWriterImpl
{
}

/**
* @brief Reports throughtput based on last added sample to history
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @brief Reports throughtput based on last added sample to history
* @brief Reports publication throughtput based on last added sample to writer's history

* @param size of the message sent
*/
inline void on_publish_throughput(
uint32_t)
{
}

/// Report that a GAP message is sent
inline void on_gap()
{
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,9 @@ void StatefulWriter::unsent_change_added_to_history(
{
on_data_sent();
}

// Throughput should be notified even if no matches are available
on_publish_throughput(change->serializedPayload.length);
}

bool StatefulWriter::intraprocess_delivery(
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/writer/StatelessWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,9 @@ void StatelessWriter::unsent_change_added_to_history(
{
on_data_sent();
}

// Throughput should be notified even if no matches are available
on_publish_throughput(change->serializedPayload.length);
}

bool StatelessWriter::intraprocess_delivery(
Expand Down
1 change: 1 addition & 0 deletions src/cpp/statistics/rtps/StatisticsBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct StatisticsWriterAncillary
unsigned long long data_counter = {};
unsigned long long gap_counter = {};
unsigned long long resent_counter = {};
std::chrono::time_point<std::chrono::steady_clock> last_history_change_ = std::chrono::steady_clock::now();
};

struct StatisticsReaderAncillary
Expand Down
43 changes: 38 additions & 5 deletions src/cpp/statistics/rtps/writer/StatisticsWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void StatisticsWriterImpl::on_sample_datas(
// Perform the callbacks
Data data;
// note that the setter sets SAMPLE_DATAS by default
data.sample_identity_count(notification);
data.sample_identity_count(std::move(notification));

for_each_listener([&data](const std::shared_ptr<IListener>& listener)
{
Expand Down Expand Up @@ -101,7 +101,7 @@ void StatisticsWriterImpl::on_data_sent()
// Perform the callbacks
Data data;
// note that the setter sets RESENT_DATAS by default
data.entity_count(notification);
data.entity_count(std::move(notification));
data._d(EventKind::DATA_COUNT);

for_each_listener([&data](const std::shared_ptr<IListener>& listener)
Expand All @@ -120,7 +120,7 @@ void StatisticsWriterImpl::on_heartbeat(
// Perform the callbacks
Data data;
// note that the setter sets RESENT_DATAS by default
data.entity_count(notification);
data.entity_count(std::move(notification));
data._d(EventKind::HEARTBEAT_COUNT);

for_each_listener([&data](const std::shared_ptr<IListener>& listener)
Expand All @@ -142,7 +142,7 @@ void StatisticsWriterImpl::on_gap()
// Perform the callbacks
Data data;
// note that the setter sets RESENT_DATAS by default
data.entity_count(notification);
data.entity_count(std::move(notification));
data._d(EventKind::GAP_COUNT);

for_each_listener([&data](const std::shared_ptr<IListener>& listener)
Expand Down Expand Up @@ -170,10 +170,43 @@ void StatisticsWriterImpl::on_resent_data(
// Perform the callbacks
Data data;
// note that the setter sets RESENT_DATAS by default
data.entity_count(notification);
data.entity_count(std::move(notification));

for_each_listener([&data](const std::shared_ptr<IListener>& listener)
{
listener->on_statistics_data(data);
});
}

void StatisticsWriterImpl::on_publish_throughput(
uint32_t payload)
{
using namespace std;
using namespace chrono;

if (payload > 0 )
{
// update state
time_point<steady_clock> former_timepoint;
auto& current_timepoint = get_members()->last_history_change_;
{
lock_guard<fastrtps::RecursiveTimedMutex> lock(get_statistics_mutex());
former_timepoint = current_timepoint;
current_timepoint = steady_clock::now();
}

EntityData notification;
notification.guid(to_statistics_type(get_guid()));
notification.data(payload / duration_cast<duration<float>>(current_timepoint - former_timepoint).count());

// Perform the callbacks
Data data;
// note that the setter sets PUBLICATION_THROUGHPUT by default
data.entity_data(std::move(notification));

for_each_listener([&data](const std::shared_ptr<IListener>& listener)
{
listener->on_statistics_data(data);
});
}
}
19 changes: 17 additions & 2 deletions test/unittest/statistics/rtps/RTPSStatisticsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ struct MockListener : IListener
case SAMPLE_DATAS:
on_sample_datas(data.sample_identity_count());
break;
case PUBLICATION_THROUGHPUT:
on_publisher_throughput(data.entity_data());
break;
default:
on_unexpected_kind(kind);
break;
Expand All @@ -117,6 +120,7 @@ struct MockListener : IListener
MOCK_METHOD1(on_pdp_packets, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_edp_packets, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_sample_datas, void(const eprosima::fastdds::statistics::SampleIdentityCount&));
MOCK_METHOD1(on_publisher_throughput, void(const eprosima::fastdds::statistics::EntityData&));
MOCK_METHOD1(on_unexpected_kind, void(eprosima::fastdds::statistics::EventKind));
};

Expand Down Expand Up @@ -529,6 +533,7 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_management)
* - ACKNACK_COUNT callbacks are performed
* - HEARBEAT_COUNT callbacks are performed
* - SAMPLE_DATAS callbacks are performed
* - PUBLICATION_THROUGHPUT callbacks are performed
*/
TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)
{
Expand Down Expand Up @@ -581,7 +586,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)
// writer callbacks through participant listener
auto participant_writer_listener = make_shared<MockListener>();
ASSERT_TRUE(participant_->add_statistics_listener(participant_writer_listener,
EventKind::DATA_COUNT | EventKind::RESENT_DATAS | EventKind::SAMPLE_DATAS));
EventKind::DATA_COUNT | EventKind::RESENT_DATAS |
EventKind::PUBLICATION_THROUGHPUT | EventKind::SAMPLE_DATAS));

// writer specific callbacks
auto writer_listener = make_shared<MockListener>();
Expand Down Expand Up @@ -612,13 +618,17 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_sample_datas)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_publisher_throughput)
.Times(AtLeast(1));

EXPECT_CALL(*participant_writer_listener, on_data_count)
.Times(AtLeast(1));
EXPECT_CALL(*participant_writer_listener, on_resent_count)
.Times(AtLeast(1));
EXPECT_CALL(*participant_writer_listener, on_sample_datas)
.Times(AtLeast(1));
EXPECT_CALL(*participant_writer_listener, on_publisher_throughput)
.Times(AtLeast(1));

// + RTPSReader: SUBSCRIPTION_THROUGHPUT,
// SAMPLE_DATAS & PHYSICAL_DATA
Expand Down Expand Up @@ -655,7 +665,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)

EXPECT_TRUE(participant_->remove_statistics_listener(participant_listener, EventKind::RTPS_SENT));
EXPECT_TRUE(participant_->remove_statistics_listener(participant_writer_listener,
EventKind::DATA_COUNT | EventKind::RESENT_DATAS | EventKind::SAMPLE_DATAS));
EventKind::DATA_COUNT | EventKind::RESENT_DATAS |
EventKind::PUBLICATION_THROUGHPUT | EventKind::SAMPLE_DATAS));
EXPECT_TRUE(participant_->remove_statistics_listener(participant_reader_listener,
EventKind::ACKNACK_COUNT | EventKind::HISTORY2HISTORY_LATENCY));
}
Expand Down Expand Up @@ -769,6 +780,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_gap_callback)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_sample_datas)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_publisher_throughput)
.Times(AtLeast(1));

EXPECT_CALL(*participant_writer_listener, on_gap_count)
.Times(AtLeast(1));
Expand Down Expand Up @@ -943,6 +956,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_avoid_empty_resent_callbacks)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_sample_datas)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_publisher_throughput)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_resent_count)
.Times(0); // never called

Expand Down