Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HISTORY2HISTORY_LATENCY test and implementation [10789] #1941

Merged
merged 5 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions include/fastdds/statistics/rtps/StatisticsCommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/SampleIdentity.h>
#include <fastdds/rtps/common/Time_t.h>
#include <fastdds/statistics/IListeners.hpp>
#include <fastrtps/utils/TimedMutex.hpp>

Expand Down Expand Up @@ -217,6 +218,15 @@ class StatisticsReaderImpl

// TODO: methods for listeners callbacks

/**
* @brief Report that a sample has been notified to the user.
* @param writer_guid GUID of the writer from where the sample was received.
* @param source_timestamp Source timestamp received from the writer for the sample being notified.
*/
void on_data_notify(
const fastrtps::rtps::GUID_t& writer_guid,
const fastrtps::rtps::Time_t& source_timestamp);

/**
* @brief Report that an ACKNACK message is sent
* @param count current count of ACKNACKs
Expand Down
13 changes: 13 additions & 0 deletions include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
#ifndef _FASTDDS_STATISTICS_RTPS_STATISTICSCOMMON_HPP_
#define _FASTDDS_STATISTICS_RTPS_STATISTICSCOMMON_HPP_

#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/common/SampleIdentity.h>
#include <fastdds/rtps/common/Time_t.h>
#include <fastdds/statistics/IListeners.hpp>

namespace eprosima {
Expand Down Expand Up @@ -96,6 +98,17 @@ class StatisticsReaderImpl

// TODO: methods for listeners callbacks

/**
* @brief Report that a sample has been notified to the user.
* @param GUID of the writer from where the sample was received.
* @param Source timestamp received from the writer for the sample being notified.
*/
inline void on_data_notify(
const fastrtps::rtps::GUID_t&,
const fastrtps::rtps::Time_t&)
{
}

/**
* @brief Report that an ACKNACK message is sent
* @param current count of ACKNACKs
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,8 @@ void StatefulReader::NotifyChanges(
{
++total_unread_;

on_data_notify(ch_to_give->writerGUID, ch_to_give->sourceTimestamp);

if (getListener() != nullptr)
{
getListener()->onNewCacheChangeAdded((RTPSReader*)this, ch_to_give);
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ bool StatelessReader::change_received(
update_last_notified(change->writerGUID, change->sequenceNumber);
++total_unread_;

on_data_notify(change->writerGUID, change->sourceTimestamp);

if (getListener() != nullptr)
{
getListener()->onNewCacheChangeAdded(this, change);
Expand Down
3 changes: 1 addition & 2 deletions src/cpp/statistics/rtps/StatisticsBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ bool StatisticsParticipantImpl::are_writers_involved(
{
using namespace fastdds::statistics;

constexpr uint32_t writers_maks = HISTORY2HISTORY_LATENCY \
| PUBLICATION_THROUGHPUT \
constexpr uint32_t writers_maks = PUBLICATION_THROUGHPUT \
| RESENT_DATAS \
| HEARTBEAT_COUNT \
| GAP_COUNT \
Expand Down
27 changes: 27 additions & 0 deletions src/cpp/statistics/rtps/reader/StatisticsReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,33 @@ const GUID_t& StatisticsReaderImpl::get_guid() const
return static_cast<const RTPSReader*>(this)->getGuid();
}

void StatisticsReaderImpl::on_data_notify(
const fastrtps::rtps::GUID_t& writer_guid,
const fastrtps::rtps::Time_t& source_timestamp)
{
// Get current timestamp
fastrtps::rtps::Time_t current_time;
fastrtps::rtps::Time_t::now(current_time);

// Calc latency
auto ns = (current_time - source_timestamp).to_ns();

WriterReaderData notification;
notification.reader_guid(to_statistics_type(get_guid()));
notification.writer_guid(to_statistics_type(writer_guid));
notification.data(static_cast<float>(ns));

// Perform the callback
Data data;
// note that the setter sets HISTORY2HISTORY_LATENCY by default
data.writer_reader_data(notification);

for_each_listener([&data](const std::shared_ptr<IListener>& listener)
{
listener->on_statistics_data(data);
});
}

void StatisticsReaderImpl::on_acknack(
int32_t count)
{
Expand Down
20 changes: 17 additions & 3 deletions test/unittest/statistics/rtps/RTPSStatisticsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ struct MockListener : IListener
auto kind = data._d();
switch (kind)
{
case HISTORY2HISTORY_LATENCY:
on_history_latency(data.writer_reader_data());
break;
case RTPS_SENT:
on_rtps_sent(data.entity2locator_traffic());
break;
Expand Down Expand Up @@ -102,6 +105,7 @@ struct MockListener : IListener
}
}

MOCK_METHOD1(on_history_latency, void(const eprosima::fastdds::statistics::WriterReaderData&));
MOCK_METHOD1(on_rtps_sent, void(const eprosima::fastdds::statistics::Entity2LocatorTraffic&));
MOCK_METHOD1(on_heartbeat_count, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_acknack_count, void(const eprosima::fastdds::statistics::EntityCount&));
Expand Down Expand Up @@ -519,6 +523,7 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_management)
/*
* This test checks RTPSParticipant, RTPSWriter and RTPSReader statistics module related APIs.
* - RTPS_SENT callbacks are performed
* - HISTORY2HISTORY_LATENCY callbacks are performed
* - DATA_COUNT callbacks are performed for DATA submessages
* - RESENT_DATAS callbacks are performed for DATA submessages demanded by the readers
* - ACKNACK_COUNT callbacks are performed
Expand Down Expand Up @@ -584,7 +589,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)

// reader callbacks through participant listener
auto participant_reader_listener = make_shared<MockListener>();
ASSERT_TRUE(participant_->add_statistics_listener(participant_reader_listener, EventKind::ACKNACK_COUNT));
ASSERT_TRUE(participant_->add_statistics_listener(participant_reader_listener,
EventKind::ACKNACK_COUNT | EventKind::HISTORY2HISTORY_LATENCY));

// reader specific callbacks
auto reader_listener = make_shared<MockListener>();
Expand Down Expand Up @@ -619,8 +625,12 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)
// optionally: ACKNACK_COUNT
EXPECT_CALL(*reader_listener, on_acknack_count)
.Times(AtLeast(1));
EXPECT_CALL(*reader_listener, on_history_latency)
.Times(AtLeast(1));
EXPECT_CALL(*participant_reader_listener, on_acknack_count)
.Times(AtLeast(1));
EXPECT_CALL(*participant_reader_listener, on_history_latency)
.Times(AtLeast(1));

// match writer and reader on a dummy topic
match_endpoints(false, "string", "statisticsSmallTopic");
Expand All @@ -646,12 +656,14 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)
EXPECT_TRUE(participant_->remove_statistics_listener(participant_listener, EventKind::RTPS_SENT));
EXPECT_TRUE(participant_->remove_statistics_listener(participant_writer_listener,
EventKind::DATA_COUNT | EventKind::RESENT_DATAS | EventKind::SAMPLE_DATAS));
EXPECT_TRUE(participant_->remove_statistics_listener(participant_reader_listener, EventKind::ACKNACK_COUNT));
EXPECT_TRUE(participant_->remove_statistics_listener(participant_reader_listener,
EventKind::ACKNACK_COUNT | EventKind::HISTORY2HISTORY_LATENCY));
}

/*
* This test checks RTPSParticipant, RTPSWriter and RTPSReader statistics module related APIs.
* - participant listeners management with late joiners
* - HISTORY2HISTORY_LATENCY callbacks are performed
* - DATA_COUNT callbacks with DATA_FRAGS are performed
* - NACK_FRAG callbacks assessment
*/
Expand Down Expand Up @@ -694,7 +706,7 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks_fragmented)
// writer callbacks through participant listener
auto participant_listener = make_shared<MockListener>();
uint32_t mask = EventKind::DATA_COUNT | EventKind::HEARTBEAT_COUNT
| EventKind::ACKNACK_COUNT | EventKind::NACKFRAG_COUNT;
| EventKind::ACKNACK_COUNT | EventKind::NACKFRAG_COUNT | EventKind::HISTORY2HISTORY_LATENCY;
ASSERT_TRUE(participant_->add_statistics_listener(participant_listener, mask));

EXPECT_CALL(*participant_listener, on_data_count)
Expand All @@ -703,6 +715,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks_fragmented)
.Times(AtLeast(1));
EXPECT_CALL(*participant_listener, on_acknack_count)
.Times(AtLeast(1));
EXPECT_CALL(*participant_listener, on_history_latency)
.Times(AtLeast(1));
EXPECT_CALL(*participant_listener, on_nackfrag_count)
.Times(AtLeast(1));

Expand Down