Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[10793] Collection of data and notification to listeners SUBSCRIPTION_THROUGHPUT #1964

Merged
merged 3 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions include/fastdds/statistics/rtps/StatisticsCommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,9 @@ class StatisticsReaderImpl

/**
* Create the auxiliary structure
* TODO: enable when a member is added to StatisticsReaderAncillary
* @return nullptr on failure
*/
StatisticsReaderAncillary* get_members() const
{
return nullptr;
}
StatisticsReaderAncillary* get_members() const;

/**
* Retrieve endpoint mutexes from derived class
Expand Down Expand Up @@ -251,6 +247,13 @@ class StatisticsReaderImpl
*/
void on_nackfrag(
int32_t count);

/**
* @brief Reports subscription throughtput based on last added sample to reader's history
* @param payload size of the message received
*/
void on_subscribe_throughput(
uint32_t payload);
};

} // namespace statistics
Expand Down
9 changes: 9 additions & 0 deletions include/fastdds/statistics/rtps/StatisticsCommonEmpty.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ class StatisticsReaderImpl
{
}

/**
* @brief Reports subscription throughtput based on last added sample to reader's history
* @param size of the message received
*/
inline void on_subscribe_throughput(
uint32_t)
{
}

};

} // namespace statistics
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,9 @@ bool StatefulReader::change_received(

NotifyChanges(prox);

// statistics callback
on_subscribe_throughput(a_change->serializedPayload.length);

return ret;
}

Expand Down
4 changes: 4 additions & 0 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ bool StatelessReader::change_received(
}

new_notification_cv_.notify_all();

// statistics callback
on_subscribe_throughput(change->serializedPayload.length);

return true;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/cpp/statistics/rtps/StatisticsBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct StatisticsWriterAncillary
struct StatisticsReaderAncillary
: public StatisticsAncillary
{
std::chrono::time_point<std::chrono::steady_clock> last_history_change_ = std::chrono::steady_clock::now();
};

// lambda function to traverse the listener collection
Expand Down
46 changes: 39 additions & 7 deletions src/cpp/statistics/rtps/reader/StatisticsReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ StatisticsReaderImpl::StatisticsReaderImpl()
init_statistics<StatisticsReaderAncillary>();
}

/* Uncomment when a member is added to the StatisticsReaderAncillary
StatisticsReaderAncillary* StatisticsReaderImpl::get_members() const
{
StatisticsReaderAncillary* StatisticsReaderImpl::get_members() const
{
static_assert(
std::is_base_of<StatisticsAncillary,StatisticsReaderAncillary>::value,
"Auxiliary structure must derive from StatisticsAncillary");
std::is_base_of<StatisticsAncillary, StatisticsReaderAncillary>::value,
"Auxiliary structure must derive from StatisticsAncillary");

return static_cast<StatisticsReaderAncillary*>(get_aux_members());
}
*/
}

RecursiveTimedMutex& StatisticsReaderImpl::get_statistics_mutex()
{
Expand Down Expand Up @@ -124,3 +122,37 @@ void StatisticsReaderImpl::on_nackfrag(
listener->on_statistics_data(data);
});
}

void StatisticsReaderImpl::on_subscribe_throughput(
uint32_t payload)
{
using namespace std;
using namespace chrono;

if (payload > 0 )
{
// update state
time_point<steady_clock> former_timepoint;
auto& current_timepoint = get_members()->last_history_change_;
{
lock_guard<fastrtps::RecursiveTimedMutex> lock(get_statistics_mutex());
former_timepoint = current_timepoint;
current_timepoint = steady_clock::now();
}

EntityData notification;
notification.guid(to_statistics_type(get_guid()));
notification.data(payload / duration_cast<duration<float>>(current_timepoint - former_timepoint).count());

// Perform the callbacks
Data data;
// note that the setter sets PUBLICATION_THROUGHPUT by default
data.entity_data(std::move(notification));
data._d(EventKind::SUBSCRIPTION_THROUGHPUT);

for_each_listener([&data](const std::shared_ptr<IListener>& listener)
{
listener->on_statistics_data(data);
});
}
}
16 changes: 14 additions & 2 deletions test/unittest/statistics/rtps/RTPSStatisticsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ struct MockListener : IListener
case PUBLICATION_THROUGHPUT:
on_publisher_throughput(data.entity_data());
break;
case SUBSCRIPTION_THROUGHPUT:
on_subscriber_throughput(data.entity_data());
break;
default:
on_unexpected_kind(kind);
break;
Expand All @@ -125,6 +128,7 @@ struct MockListener : IListener
MOCK_METHOD1(on_edp_packets, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_sample_datas, void(const eprosima::fastdds::statistics::SampleIdentityCount&));
MOCK_METHOD1(on_publisher_throughput, void(const eprosima::fastdds::statistics::EntityData&));
MOCK_METHOD1(on_subscriber_throughput, void(const eprosima::fastdds::statistics::EntityData&));
MOCK_METHOD1(on_unexpected_kind, void(eprosima::fastdds::statistics::EventKind));
};

Expand Down Expand Up @@ -538,6 +542,7 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_management)
* - HEARBEAT_COUNT callbacks are performed
* - SAMPLE_DATAS callbacks are performed
* - PUBLICATION_THROUGHPUT callbacks are performed
* - SUBSCRIPTION_THROUGHPUT callbacks are performed
*/
TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)
{
Expand Down Expand Up @@ -601,7 +606,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)
// reader callbacks through participant listener
auto participant_reader_listener = make_shared<MockListener>();
ASSERT_TRUE(participant_->add_statistics_listener(participant_reader_listener,
EventKind::ACKNACK_COUNT | EventKind::HISTORY2HISTORY_LATENCY));
EventKind::ACKNACK_COUNT | EventKind::HISTORY2HISTORY_LATENCY |
EventKind::SUBSCRIPTION_THROUGHPUT));

// reader specific callbacks
auto reader_listener = make_shared<MockListener>();
Expand Down Expand Up @@ -644,10 +650,15 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)
.Times(AtLeast(1));
EXPECT_CALL(*reader_listener, on_history_latency)
.Times(AtLeast(1));
EXPECT_CALL(*reader_listener, on_subscriber_throughput)
.Times(AtLeast(1));

EXPECT_CALL(*participant_reader_listener, on_acknack_count)
.Times(AtLeast(1));
EXPECT_CALL(*participant_reader_listener, on_history_latency)
.Times(AtLeast(1));
EXPECT_CALL(*participant_reader_listener, on_subscriber_throughput)
.Times(AtLeast(1));

// match writer and reader on a dummy topic
match_endpoints(false, "string", "statisticsSmallTopic");
Expand Down Expand Up @@ -675,7 +686,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks)
EventKind::DATA_COUNT | EventKind::RESENT_DATAS |
EventKind::PUBLICATION_THROUGHPUT | EventKind::SAMPLE_DATAS));
EXPECT_TRUE(participant_->remove_statistics_listener(participant_reader_listener,
EventKind::ACKNACK_COUNT | EventKind::HISTORY2HISTORY_LATENCY));
EventKind::ACKNACK_COUNT | EventKind::HISTORY2HISTORY_LATENCY |
EventKind::SUBSCRIPTION_THROUGHPUT));
}

/*
Expand Down