Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[10803] Collection of data and notification to listeners EDP_PACKETS #1948

Merged
merged 9 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions include/fastdds/rtps/common/EntityId_t.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,24 @@ struct RTPS_DllAPI EntityId_t

#endif // if !FASTDDS_IS_BIG_ENDIAN_TARGET

/*!
* @brief conversion to uint32_t
* @return uint32_t representation
*/
uint32_t to_uint32() const
{
uint32_t res = *reinterpret_cast<const uint32_t*>(value);

#if !FASTDDS_IS_BIG_ENDIAN_TARGET
res = ( res >> 24 ) |
(0x0000ff00 & ( res >> 8)) |
(0x00ff0000 & ( res << 8)) |
( res << 24 );
#endif // if !FASTDDS_IS_BIG_ENDIAN_TARGET

return res;
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
}

static EntityId_t unknown()
{
return EntityId_t();
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ class RTPSParticipantImpl
destination_locators_end,
msg->length);

// checkout if sender is a pdp endpoint
on_pdp_packet(
// checkout if sender is a discovery endpoint
on_discovery_packet(
sender_guid,
destination_locators_begin,
destination_locators_end);
Expand Down
24 changes: 24 additions & 0 deletions src/cpp/statistics/rtps/StatisticsBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,27 @@ void StatisticsParticipantImpl::on_pdp_packet(
listener->on_statistics_data(data);
});
}

void StatisticsParticipantImpl::on_edp_packet(
const uint32_t packages)
{
EntityCount notification;
notification.guid(to_statistics_type(get_guid()));

{
std::lock_guard<std::recursive_mutex> lock(get_statistics_mutex());
edp_counter_ += packages;
notification.count(edp_counter_);
}

// Perform the callbacks
Data data;
// note that the setter sets RESENT_DATAS by default
data.entity_count(notification);
data._d(EventKind::EDP_PACKETS);

for_each_listener([&data](const std::shared_ptr<IListener>& listener)
{
listener->on_statistics_data(data);
});
}
101 changes: 71 additions & 30 deletions src/cpp/statistics/rtps/StatisticsBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class StatisticsParticipantImpl

// PDP_PACKETS ancillary
unsigned long long pdp_counter_ = {};
// EDP_PACKETS ancillary
unsigned long long edp_counter_ = {};

/*
* Retrieve the GUID_t from derived class
Expand Down Expand Up @@ -176,38 +178,41 @@ class StatisticsParticipantImpl
std::recursive_mutex& get_statistics_mutex();

/** Register a listener in participant RTPSWriter entities.
* @param listener, smart pointer to the listener interface to register
* @param guid, RTPSWriter identifier. If unknown the listener is registered in all enable ones
* @param listener smart pointer to the listener interface to register
* @param guid RTPSWriter identifier. If unknown the listener is registered in all enable ones
* @return true on success
*/
virtual bool register_in_writer(
std::shared_ptr<fastdds::statistics::IListener> listener,
GUID_t guid = GUID_t::unknown()) = 0;

/** Register a listener in participant RTPSReader entities.
* @param listener, smart pointer to the listener interface to register
* @param guid, RTPSReader identifier. If unknown the listener is registered in all enable ones
* @param listener smart pointer to the listener interface to register
* @param guid RTPSReader identifier. If unknown the listener is registered in all enable ones
* @return true on success
*/
virtual bool register_in_reader(
std::shared_ptr<fastdds::statistics::IListener> listener,
GUID_t guid = GUID_t::unknown()) = 0;

/** Unregister a listener in participant RTPSWriter entities.
* @param listener, smart pointer to the listener interface to unregister
* @param listener smart pointer to the listener interface to unregister
* @return true on success
*/
virtual bool unregister_in_writer(
std::shared_ptr<fastdds::statistics::IListener> listener) = 0;

/** Unregister a listener in participant RTPSReader entities.
* @param listener, smart pointer to the listener interface to unregister
* @param listener smart pointer to the listener interface to unregister
* @return true on success
*/
virtual bool unregister_in_reader(
std::shared_ptr<fastdds::statistics::IListener> listener) = 0;

// lambda function to traverse the listener collection
/** Auxiliary method to traverse the listener collection
* @param f functor to traverse the listener collection
* @return functor copy after traversal
*/
template<class Function>
Function for_each_listener(
Function f)
Expand All @@ -224,20 +229,26 @@ class StatisticsParticipantImpl
return f;
}

// returns if a mask statistics::EventKind may require participant writers update
/** Checks if callback events require writer specific callbacks
* @param mask callback events to be queried
* @return if a mask statistics::EventKind may require participant writers update
*/
bool are_writers_involved(
const uint32_t mask) const;

// returns if a mask statistics::EventKind may require participant readers update
/** Checks if callback events require reader specific callbacks
* @param mask callback events to be queried
* @return if a mask statistics::EventKind may require participant readers update
*/
bool are_readers_involved(
const uint32_t mask) const;

// TODO: methods for listeners callbacks

/*
* Report a message that is sent by the participant
* @param loc, destination
* @param payload_size, size of the current message
* @param loc destination
* @param payload_size size of the current message
*/
void on_rtps_sent(
const fastrtps::rtps::Locator_t& loc,
Expand All @@ -246,9 +257,9 @@ class StatisticsParticipantImpl
/*
* Report a message that is sent by the participant
* @param sender_guid GUID of the entity producing the message
* @param destination_locators_begin, start of locators range
* @param destination_locators_end, end of locators range
* @param payload_size, size of the current message
* @param destination_locators_begin start of locators range
* @param destination_locators_end end of locators range
* @param payload_size size of the current message
*/
template<class LocatorIteratorT>
void on_rtps_send(
Expand All @@ -270,7 +281,7 @@ class StatisticsParticipantImpl

/*
* Report that a new entity is discovered
* @param id, discovered entity GUID_t
* @param id discovered entity GUID_t
*/
void on_entity_discovery(
const GUID_t& id);
Expand All @@ -283,21 +294,50 @@ class StatisticsParticipantImpl
const uint32_t packages);

/*
* Report PDP message exchange.
* We filtered the non-pdp traffic here to minimize presence of statistics code in endpoints implementation.
* Auxiliary method to report EDP message exchange.
* @param packages number of edp packages sent
*/
void on_edp_packet(
const uint32_t packages);

/*
* Report discovery protocols message exchange.
* We filtered the non discovery traffic here to minimize presence of statistics code in endpoints implementation.
* @param sender_guid GUID_t to filter
* @param destination_locators_begin, start of locators range
* @param destination_locators_end, end of locators range
* @param destination_locators_begin start of locators range
* @param destination_locators_end end of locators range
*/
template<class LocatorIteratorT>
void on_pdp_packet(
void on_discovery_packet(
const GUID_t& sender_guid,
const LocatorIteratorT& destination_locators_begin,
const LocatorIteratorT& destination_locators_end)
{
if (sender_guid.entityId == fastrtps::rtps::c_EntityId_SPDPWriter
&& destination_locators_begin != destination_locators_end)
if ( destination_locators_begin != destination_locators_end )
{
void (StatisticsParticipantImpl::* discovery_callback)(
const uint32_t) = nullptr;

switch (sender_guid.entityId.to_uint32())
{
case ENTITYID_SPDP_BUILTIN_RTPSParticipant_WRITER:
case ENTITYID_SPDP_BUILTIN_RTPSParticipant_READER:
discovery_callback = &StatisticsParticipantImpl::on_pdp_packet;
break;
case ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER:
case ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER:
case ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER:
case ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER:
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
case ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER:
case ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER:
case ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER:
case ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER:
discovery_callback = &StatisticsParticipantImpl::on_edp_packet;
break;
default:
return; // ignore
}

uint32_t packages = 0;
auto it = destination_locators_begin;
while (it != destination_locators_end)
Expand All @@ -306,7 +346,7 @@ class StatisticsParticipantImpl
++packages;
}

on_pdp_packet(packages);
(this->*discovery_callback)(packages);
}
}

Expand Down Expand Up @@ -352,9 +392,10 @@ class StatisticsParticipantImpl

/*
* Report a message that is sent by the participant
* @param destination_locators_begin, start of locators range
* @param destination_locators_end, end of locators range
* @param payload_size, size of the current message
* @param participant identity
* @param start of locators range
* @param end of locators range
* @param size of the current message
*/
template<class LocatorIteratorT>
inline void on_rtps_send(
Expand All @@ -367,22 +408,22 @@ class StatisticsParticipantImpl

/*
* Report that a new entity is discovered
* @param id, discovered entity GUID_t
* @param discovered entity GUID_t
*/
inline void on_entity_discovery(
const fastrtps::rtps::GUID_t&)
{
}

/*
* Report PDP message exchange.
* We filtered the non-pdp traffic here to minimize presence of statistics code in endpoints implementation.
* Report discovery protocols message exchange.
* We filtered the non discovery traffic here to minimize presence of statistics code in endpoints implementation.
* @param GUID_t to filter
* @param start of locators range
* @param end of locators range
*/
template<class LocatorIteratorT>
inline void on_pdp_packet(
inline void on_discovery_packet(
const fastrtps::rtps::GUID_t&,
const LocatorIteratorT&,
const LocatorIteratorT&)
Expand Down
10 changes: 9 additions & 1 deletion test/unittest/statistics/rtps/RTPSStatisticsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ struct MockListener : IListener
case PDP_PACKETS:
on_pdp_packets(data.entity_count());
break;
case EDP_PACKETS:
on_edp_packets(data.entity_count());
break;
case SAMPLE_DATAS:
on_sample_datas(data.sample_identity_count());
break;
Expand All @@ -108,6 +111,7 @@ struct MockListener : IListener
MOCK_METHOD1(on_nackfrag_count, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_entity_discovery, void(const eprosima::fastdds::statistics::DiscoveryTime&));
MOCK_METHOD1(on_pdp_packets, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_edp_packets, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_sample_datas, void(const eprosima::fastdds::statistics::SampleIdentityCount&));
MOCK_METHOD1(on_unexpected_kind, void(eprosima::fastdds::statistics::EventKind));
};
Expand Down Expand Up @@ -806,7 +810,7 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_discovery_callbacks)
// create the listener and set expectations
auto participant_listener = make_shared<MockListener>();
ASSERT_TRUE(participant_->add_statistics_listener(participant_listener,
EventKind::DISCOVERED_ENTITY | EventKind::PDP_PACKETS));
EventKind::DISCOVERED_ENTITY | EventKind::PDP_PACKETS | EventKind::EDP_PACKETS));

// check callbacks on data exchange
atomic_int callbacks(0);
Expand All @@ -819,6 +823,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_discovery_callbacks)
.Times(AtLeast(5));
EXPECT_CALL(*participant_listener, on_pdp_packets)
.Times(AtLeast(1));
EXPECT_CALL(*participant_listener, on_edp_packets)
.Times(AtLeast(1));

// create local endpoints
uint16_t length = 255;
Expand Down Expand Up @@ -921,6 +927,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_avoid_empty_resent_callbacks)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_data_count)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_sample_datas)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_resent_count)
.Times(0); // never called

Expand Down