Skip to content

Commit

Permalink
Collection of data and notification to listeners EDP_PACKETS (#1948)
Browse files Browse the repository at this point in the history
* [10803] Extend EntityId_t api

* [10803] EDP_PACKETS callback implementation

* [10803] linter pass

* [10803] EDP_PACKETS testing update

* [10803] EDP_PACKETS addressing reviewers comments.

* [10803] Addressing cunning reviewer's comments.

* [10803] doxygen

* [10803] Addressing reviewer's comments

* [11498] Rebase corrections
  • Loading branch information
MiguelBarro authored May 10, 2021
1 parent 8a1999b commit 19c50af
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 33 deletions.
18 changes: 18 additions & 0 deletions include/fastdds/rtps/common/EntityId_t.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,24 @@ struct RTPS_DllAPI EntityId_t

#endif // if !FASTDDS_IS_BIG_ENDIAN_TARGET

/*!
* @brief conversion to uint32_t
* @return uint32_t representation
*/
uint32_t to_uint32() const
{
uint32_t res = *reinterpret_cast<const uint32_t*>(value);

#if !FASTDDS_IS_BIG_ENDIAN_TARGET
res = ( res >> 24 ) |
(0x0000ff00 & ( res >> 8)) |
(0x00ff0000 & ( res << 8)) |
( res << 24 );
#endif // if !FASTDDS_IS_BIG_ENDIAN_TARGET

return res;
}

static EntityId_t unknown()
{
return EntityId_t();
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ class RTPSParticipantImpl
destination_locators_end,
msg->length);

// checkout if sender is a pdp endpoint
on_pdp_packet(
// checkout if sender is a discovery endpoint
on_discovery_packet(
sender_guid,
destination_locators_begin,
destination_locators_end);
Expand Down
24 changes: 24 additions & 0 deletions src/cpp/statistics/rtps/StatisticsBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,27 @@ void StatisticsParticipantImpl::on_pdp_packet(
listener->on_statistics_data(data);
});
}

void StatisticsParticipantImpl::on_edp_packet(
const uint32_t packages)
{
EntityCount notification;
notification.guid(to_statistics_type(get_guid()));

{
std::lock_guard<std::recursive_mutex> lock(get_statistics_mutex());
edp_counter_ += packages;
notification.count(edp_counter_);
}

// Perform the callbacks
Data data;
// note that the setter sets RESENT_DATAS by default
data.entity_count(notification);
data._d(EventKind::EDP_PACKETS);

for_each_listener([&data](const std::shared_ptr<IListener>& listener)
{
listener->on_statistics_data(data);
});
}
101 changes: 71 additions & 30 deletions src/cpp/statistics/rtps/StatisticsBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class StatisticsParticipantImpl

// PDP_PACKETS ancillary
unsigned long long pdp_counter_ = {};
// EDP_PACKETS ancillary
unsigned long long edp_counter_ = {};

/*
* Retrieve the GUID_t from derived class
Expand Down Expand Up @@ -176,38 +178,41 @@ class StatisticsParticipantImpl
std::recursive_mutex& get_statistics_mutex();

/** Register a listener in participant RTPSWriter entities.
* @param listener, smart pointer to the listener interface to register
* @param guid, RTPSWriter identifier. If unknown the listener is registered in all enable ones
* @param listener smart pointer to the listener interface to register
* @param guid RTPSWriter identifier. If unknown the listener is registered in all enable ones
* @return true on success
*/
virtual bool register_in_writer(
std::shared_ptr<fastdds::statistics::IListener> listener,
GUID_t guid = GUID_t::unknown()) = 0;

/** Register a listener in participant RTPSReader entities.
* @param listener, smart pointer to the listener interface to register
* @param guid, RTPSReader identifier. If unknown the listener is registered in all enable ones
* @param listener smart pointer to the listener interface to register
* @param guid RTPSReader identifier. If unknown the listener is registered in all enable ones
* @return true on success
*/
virtual bool register_in_reader(
std::shared_ptr<fastdds::statistics::IListener> listener,
GUID_t guid = GUID_t::unknown()) = 0;

/** Unregister a listener in participant RTPSWriter entities.
* @param listener, smart pointer to the listener interface to unregister
* @param listener smart pointer to the listener interface to unregister
* @return true on success
*/
virtual bool unregister_in_writer(
std::shared_ptr<fastdds::statistics::IListener> listener) = 0;

/** Unregister a listener in participant RTPSReader entities.
* @param listener, smart pointer to the listener interface to unregister
* @param listener smart pointer to the listener interface to unregister
* @return true on success
*/
virtual bool unregister_in_reader(
std::shared_ptr<fastdds::statistics::IListener> listener) = 0;

// lambda function to traverse the listener collection
/** Auxiliary method to traverse the listener collection
* @param f functor to traverse the listener collection
* @return functor copy after traversal
*/
template<class Function>
Function for_each_listener(
Function f)
Expand All @@ -224,20 +229,26 @@ class StatisticsParticipantImpl
return f;
}

// returns if a mask statistics::EventKind may require participant writers update
/** Checks if callback events require writer specific callbacks
* @param mask callback events to be queried
* @return if a mask statistics::EventKind may require participant writers update
*/
bool are_writers_involved(
const uint32_t mask) const;

// returns if a mask statistics::EventKind may require participant readers update
/** Checks if callback events require reader specific callbacks
* @param mask callback events to be queried
* @return if a mask statistics::EventKind may require participant readers update
*/
bool are_readers_involved(
const uint32_t mask) const;

// TODO: methods for listeners callbacks

/*
* Report a message that is sent by the participant
* @param loc, destination
* @param payload_size, size of the current message
* @param loc destination
* @param payload_size size of the current message
*/
void on_rtps_sent(
const fastrtps::rtps::Locator_t& loc,
Expand All @@ -246,9 +257,9 @@ class StatisticsParticipantImpl
/*
* Report a message that is sent by the participant
* @param sender_guid GUID of the entity producing the message
* @param destination_locators_begin, start of locators range
* @param destination_locators_end, end of locators range
* @param payload_size, size of the current message
* @param destination_locators_begin start of locators range
* @param destination_locators_end end of locators range
* @param payload_size size of the current message
*/
template<class LocatorIteratorT>
void on_rtps_send(
Expand All @@ -270,7 +281,7 @@ class StatisticsParticipantImpl

/*
* Report that a new entity is discovered
* @param id, discovered entity GUID_t
* @param id discovered entity GUID_t
*/
void on_entity_discovery(
const GUID_t& id);
Expand All @@ -283,21 +294,50 @@ class StatisticsParticipantImpl
const uint32_t packages);

/*
* Report PDP message exchange.
* We filtered the non-pdp traffic here to minimize presence of statistics code in endpoints implementation.
* Auxiliary method to report EDP message exchange.
* @param packages number of edp packages sent
*/
void on_edp_packet(
const uint32_t packages);

/*
* Report discovery protocols message exchange.
* We filtered the non discovery traffic here to minimize presence of statistics code in endpoints implementation.
* @param sender_guid GUID_t to filter
* @param destination_locators_begin, start of locators range
* @param destination_locators_end, end of locators range
* @param destination_locators_begin start of locators range
* @param destination_locators_end end of locators range
*/
template<class LocatorIteratorT>
void on_pdp_packet(
void on_discovery_packet(
const GUID_t& sender_guid,
const LocatorIteratorT& destination_locators_begin,
const LocatorIteratorT& destination_locators_end)
{
if (sender_guid.entityId == fastrtps::rtps::c_EntityId_SPDPWriter
&& destination_locators_begin != destination_locators_end)
if ( destination_locators_begin != destination_locators_end )
{
void (StatisticsParticipantImpl::* discovery_callback)(
const uint32_t) = nullptr;

switch (sender_guid.entityId.to_uint32())
{
case ENTITYID_SPDP_BUILTIN_RTPSParticipant_WRITER:
case ENTITYID_SPDP_BUILTIN_RTPSParticipant_READER:
discovery_callback = &StatisticsParticipantImpl::on_pdp_packet;
break;
case ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER:
case ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER:
case ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER:
case ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER:
case ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER:
case ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER:
case ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER:
case ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER:
discovery_callback = &StatisticsParticipantImpl::on_edp_packet;
break;
default:
return; // ignore
}

uint32_t packages = 0;
auto it = destination_locators_begin;
while (it != destination_locators_end)
Expand All @@ -306,7 +346,7 @@ class StatisticsParticipantImpl
++packages;
}

on_pdp_packet(packages);
(this->*discovery_callback)(packages);
}
}

Expand Down Expand Up @@ -352,9 +392,10 @@ class StatisticsParticipantImpl

/*
* Report a message that is sent by the participant
* @param destination_locators_begin, start of locators range
* @param destination_locators_end, end of locators range
* @param payload_size, size of the current message
* @param participant identity
* @param start of locators range
* @param end of locators range
* @param size of the current message
*/
template<class LocatorIteratorT>
inline void on_rtps_send(
Expand All @@ -367,22 +408,22 @@ class StatisticsParticipantImpl

/*
* Report that a new entity is discovered
* @param id, discovered entity GUID_t
* @param discovered entity GUID_t
*/
inline void on_entity_discovery(
const fastrtps::rtps::GUID_t&)
{
}

/*
* Report PDP message exchange.
* We filtered the non-pdp traffic here to minimize presence of statistics code in endpoints implementation.
* Report discovery protocols message exchange.
* We filtered the non discovery traffic here to minimize presence of statistics code in endpoints implementation.
* @param GUID_t to filter
* @param start of locators range
* @param end of locators range
*/
template<class LocatorIteratorT>
inline void on_pdp_packet(
inline void on_discovery_packet(
const fastrtps::rtps::GUID_t&,
const LocatorIteratorT&,
const LocatorIteratorT&)
Expand Down
10 changes: 9 additions & 1 deletion test/unittest/statistics/rtps/RTPSStatisticsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ struct MockListener : IListener
case PDP_PACKETS:
on_pdp_packets(data.entity_count());
break;
case EDP_PACKETS:
on_edp_packets(data.entity_count());
break;
case SAMPLE_DATAS:
on_sample_datas(data.sample_identity_count());
break;
Expand All @@ -108,6 +111,7 @@ struct MockListener : IListener
MOCK_METHOD1(on_nackfrag_count, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_entity_discovery, void(const eprosima::fastdds::statistics::DiscoveryTime&));
MOCK_METHOD1(on_pdp_packets, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_edp_packets, void(const eprosima::fastdds::statistics::EntityCount&));
MOCK_METHOD1(on_sample_datas, void(const eprosima::fastdds::statistics::SampleIdentityCount&));
MOCK_METHOD1(on_unexpected_kind, void(eprosima::fastdds::statistics::EventKind));
};
Expand Down Expand Up @@ -806,7 +810,7 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_discovery_callbacks)
// create the listener and set expectations
auto participant_listener = make_shared<MockListener>();
ASSERT_TRUE(participant_->add_statistics_listener(participant_listener,
EventKind::DISCOVERED_ENTITY | EventKind::PDP_PACKETS));
EventKind::DISCOVERED_ENTITY | EventKind::PDP_PACKETS | EventKind::EDP_PACKETS));

// check callbacks on data exchange
atomic_int callbacks(0);
Expand All @@ -819,6 +823,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_discovery_callbacks)
.Times(AtLeast(5));
EXPECT_CALL(*participant_listener, on_pdp_packets)
.Times(AtLeast(1));
EXPECT_CALL(*participant_listener, on_edp_packets)
.Times(AtLeast(1));

// create local endpoints
uint16_t length = 255;
Expand Down Expand Up @@ -921,6 +927,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_avoid_empty_resent_callbacks)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_data_count)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_sample_datas)
.Times(AtLeast(1));
EXPECT_CALL(*writer_listener, on_resent_count)
.Times(0); // never called

Expand Down

0 comments on commit 19c50af

Please sign in to comment.