Skip to content

Commit

Permalink
Fix subscription throughput data generation (#2050)
Browse files Browse the repository at this point in the history
* Refs #11964: Failing test with SUBSCRIPTION_THROUGHPUT_TOPIC

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #11964: SUBSCRIPTION_THROUGHPUT data generation  fixed in Readers

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #11964: add new datakind topics to test

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #11964: add warning comments

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #11964: uncrustify

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #11964: remove sporioous comments

Signed-off-by: jparisu <javierparis@eprosima.com>

* Fix comment

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
  • Loading branch information
jparisu and MiguelCompany authored Jul 8, 2021
1 parent dc19fe3 commit 7d52421
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 29 deletions.
5 changes: 4 additions & 1 deletion src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,8 @@ bool StatefulReader::change_received(
// inside the call to mp_history->received_change
if (mp_history->received_change(a_change, unknown_missing_changes_up_to))
{
auto payload_length = a_change->serializedPayload.length;

Time_t::now(a_change->receptionTimestamp);
GUID_t proxGUID = prox->guid();

Expand All @@ -851,10 +853,11 @@ bool StatefulReader::change_received(
ret = prox->received_change_set(a_change->sequenceNumber);
}

// WARNING! This method could destroy a_change
NotifyChanges(prox);

// statistics callback
on_subscribe_throughput(a_change->serializedPayload.length);
on_subscribe_throughput(payload_length);

return ret;
}
Expand Down
7 changes: 5 additions & 2 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,14 @@ bool StatelessReader::matched_writer_is_matched(
bool StatelessReader::change_received(
CacheChange_t* change)
{
// Only make visible the change if there is not other with bigger sequence number.
// Only make the change visible if there is not another with a bigger sequence number.
// TODO Revisar si no hay que incluirlo.
if (!thereIsUpperRecordOf(change->writerGUID, change->sequenceNumber))
{
if (mp_history->received_change(change, 0))
{
auto payload_length = change->serializedPayload.length;

Time_t::now(change->receptionTimestamp);
update_last_notified(change->writerGUID, change->sequenceNumber);
++total_unread_;
Expand All @@ -252,13 +254,14 @@ bool StatelessReader::change_received(

if (getListener() != nullptr)
{
// WARNING! This method could destroy the change
getListener()->onNewCacheChangeAdded(this, change);
}

new_notification_cv_.notify_all();

// statistics callback
on_subscribe_throughput(change->serializedPayload.length);
on_subscribe_throughput(payload_length);

return true;
}
Expand Down
101 changes: 75 additions & 26 deletions test/blackbox/common/DDSBlackboxTestsStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <list>
#include <string>
#include <thread>
#include <tuple>

#include <gtest/gtest.h>

Expand Down Expand Up @@ -136,6 +137,7 @@ TEST(DDSStatistics, simple_statistics_datareaders)
data_reader.wait_discovery();
data_writer.wait_discovery();

// Get Participants and Subscribers from pub and sub
auto w_participant = data_writer.getParticipant();
ASSERT_NE(nullptr, w_participant);

Expand All @@ -145,18 +147,6 @@ TEST(DDSStatistics, simple_statistics_datareaders)
auto w_subscriber = w_participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
ASSERT_NE(nullptr, w_subscriber);

// Enable DataWriter related statistics
auto data_stats_reader = enable_statistics(w_statistics_participant, w_subscriber, statistics::DATA_COUNT_TOPIC);
ASSERT_NE(nullptr, data_stats_reader);

// Enable DomainParticipant related statistics
auto rtps_stats_reader = enable_statistics(w_statistics_participant, w_subscriber, statistics::RTPS_SENT_TOPIC);
ASSERT_NE(nullptr, rtps_stats_reader);

auto physical_data_reader = enable_statistics(w_statistics_participant, w_subscriber,
statistics::PHYSICAL_DATA_TOPIC);
ASSERT_NE(nullptr, physical_data_reader);

auto r_subscriber = const_cast<Subscriber*>(data_reader.get_native_reader().get_subscriber());
ASSERT_NE(nullptr, r_subscriber);

Expand All @@ -166,9 +156,50 @@ TEST(DDSStatistics, simple_statistics_datareaders)
auto r_statistics_participant = statistics::dds::DomainParticipant::narrow(r_participant);
ASSERT_NE(nullptr, r_statistics_participant);

// Enable DataReader related statistics
auto ack_stats_reader = enable_statistics(r_statistics_participant, r_subscriber, statistics::ACKNACK_COUNT_TOPIC);
ASSERT_NE(nullptr, ack_stats_reader);
// TODO: some topics get stuck in infinite loop in an error (generally if they are included twice):
// [SUBSCRIBER Error] Change not found on this key, something is wrong -> Function remove_change_sub
// These topics are commented in test params
// TODO: some topics could be used in both participants, but they lead to the same error

// Create parameters to iterate over every Statistics kind
// The test is separated between the statistics retrieved by a DataWriter or a DataReader
std::vector<std::tuple<std::string, std::string, std::size_t>> writer_statistics_kinds = {
{"DATA_COUNT_TOPIC", statistics::DATA_COUNT_TOPIC, num_samples},
{"RTPS_SENT_TOPIC", statistics::RTPS_SENT_TOPIC, num_samples},
{"NETWORK_LATENCY_TOPIC", statistics::NETWORK_LATENCY_TOPIC, num_samples},
{"PUBLICATION_THROUGHPUT_TOPIC", statistics::PUBLICATION_THROUGHPUT_TOPIC, num_samples},
{"HEARTBEAT_COUNT_TOPIC", statistics::HEARTBEAT_COUNT_TOPIC, num_samples},
{"SAMPLE_DATAS_TOPIC", statistics::SAMPLE_DATAS_TOPIC, num_samples},
{"DISCOVERY_TOPIC", statistics::DISCOVERY_TOPIC, 1},
{"PDP_PACKETS_TOPIC", statistics::PDP_PACKETS_TOPIC, 1},
{"EDP_PACKETS_TOPIC", statistics::EDP_PACKETS_TOPIC, 1},
{"PHYSICAL_DATA_TOPIC", statistics::PHYSICAL_DATA_TOPIC, 1}
};

std::vector<std::tuple<std::string, std::string, std::size_t>> reader_statistics_kinds = {
{"HISTORY_LATENCY_TOPIC", statistics::HISTORY_LATENCY_TOPIC, num_samples},
{"SUBSCRIPTION_THROUGHPUT_TOPIC", statistics::SUBSCRIPTION_THROUGHPUT_TOPIC, num_samples},
{"ACKNACK_COUNT_TOPIC", statistics::ACKNACK_COUNT_TOPIC, 1},
// {"PHYSICAL_DATA_TOPIC", statistics::PHYSICAL_DATA_TOPIC, 1}
};

std::vector<DataReader*> readers_datawriter;
std::vector<DataReader*> readers_datareader;

// Enable Statistics Readers
for (auto kind : writer_statistics_kinds)
{
auto new_reader = enable_statistics(w_statistics_participant, w_subscriber, std::get<1>(kind));
ASSERT_NE(nullptr, new_reader);
readers_datawriter.push_back(new_reader);
}

for (auto kind : reader_statistics_kinds)
{
auto new_reader = enable_statistics(r_statistics_participant, r_subscriber, std::get<1>(kind));
ASSERT_NE(nullptr, new_reader);
readers_datareader.push_back(new_reader);
}

// Perform communication
data_reader.startReception(data);
Expand All @@ -177,19 +208,37 @@ TEST(DDSStatistics, simple_statistics_datareaders)
data_reader.block_for_all();
EXPECT_TRUE(data_writer.waitForAllAcked(std::chrono::seconds(10)));

wait_statistics(data_stats_reader, num_samples, "DATA_COUNT_TOPIC", 10u);
disable_statistics(w_statistics_participant, w_subscriber, data_stats_reader, statistics::DATA_COUNT_TOPIC);

wait_statistics(ack_stats_reader, 1, "ACKNACK_COUNT_TOPIC", 10u);
disable_statistics(r_statistics_participant, r_subscriber, ack_stats_reader, statistics::ACKNACK_COUNT_TOPIC);

wait_statistics(rtps_stats_reader, num_samples, "RTPS_SENT_TOPIC", 10u);
disable_statistics(w_statistics_participant, w_subscriber, rtps_stats_reader, statistics::RTPS_SENT_TOPIC);

wait_statistics(physical_data_reader, 1, "PHYSICAL_DATA_TOPIC", 10u);
disable_statistics(w_statistics_participant, w_subscriber, physical_data_reader, statistics::PHYSICAL_DATA_TOPIC);
// Check that messages have been received
for (std::size_t i = 0; i < readers_datawriter.size(); ++i)
{
wait_statistics(
readers_datawriter[i],
std::get<2>(writer_statistics_kinds[i]),
std::get<0>(writer_statistics_kinds[i]).c_str(),
10u);
disable_statistics(
w_statistics_participant,
w_subscriber,
readers_datawriter[i],
std::get<1>(writer_statistics_kinds[i]));
}

for (std::size_t i = 0; i < readers_datareader.size(); ++i)
{
wait_statistics(
readers_datareader[i],
std::get<2>(reader_statistics_kinds[i]),
std::get<0>(reader_statistics_kinds[i]).c_str(),
10u);
disable_statistics(
r_statistics_participant,
r_subscriber,
readers_datareader[i],
std::get<1>(reader_statistics_kinds[i]));
}

w_participant->delete_subscriber(w_subscriber);
w_participant->delete_subscriber(r_subscriber);

#endif // FASTDDS_STATISTICS
}
Expand Down

0 comments on commit 7d52421

Please sign in to comment.