diff --git a/src/cpp/rtps/messages/MessageReceiver.cpp b/src/cpp/rtps/messages/MessageReceiver.cpp index 585c43eb595..e61fbeae358 100644 --- a/src/cpp/rtps/messages/MessageReceiver.cpp +++ b/src/cpp/rtps/messages/MessageReceiver.cpp @@ -1338,7 +1338,8 @@ void MessageReceiver::notify_network_statistics( // Keep track of current position, so we can restore it later. auto initial_pos = msg->pos; - while (msg->pos < msg->length) + auto msg_length = msg->length; + while (msg->pos < msg_length) { SubmessageHeader_t header; if (!readSubmessageHeader(msg, &header)) @@ -1350,14 +1351,15 @@ void MessageReceiver::notify_network_statistics( { // Check submessage validity if ((statistics_submessage_data_length != header.submessageLength) || - ((msg->pos + header.submessageLength) > msg->length)) + ((msg->pos + header.submessageLength) > msg_length)) { break; } StatisticsSubmessageData data; read_statistics_submessage(msg, data); - participant_->on_network_statistics(source_guid_prefix_, source_locator, reception_locator, data); + participant_->on_network_statistics( + source_guid_prefix_, source_locator, reception_locator, data, msg_length); break; } diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 194bebb9624..d325905c2b5 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -613,6 +613,7 @@ bool TCPTransportInterface::OpenOutputChannel( tcp_sender_resource->channel()->add_logical_port(logical_port, rtcp_message_manager_.get()); } + statistics_info_.add_entry(locator); return true; } } @@ -670,6 +671,7 @@ bool TCPTransportInterface::OpenOutputChannel( channel->connect(channel_resources_[physical_locator]); } + statistics_info_.add_entry(locator); success = true; channel->add_logical_port(logical_port, rtcp_message_manager_.get()); send_resource_list.emplace_back( @@ -1156,8 +1158,7 @@ bool TCPTransportInterface::send( if (channel->is_logical_port_opened(logical_port)) { TCPHeader tcp_header; - StatisticsSubmessageData::Sequence seq; - set_statistics_submessage_from_transport(send_buffer, send_buffer_size, seq); + statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size); fill_rtcp_header(tcp_header, send_buffer, send_buffer_size, logical_port); { diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index e652e49cafb..c16d6a22044 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -26,6 +26,7 @@ #include #endif // if TLS_FOUND +#include #include #include @@ -95,6 +96,8 @@ class TCPTransportInterface : public TransportInterface std::map> acceptors_; + eprosima::fastdds::statistics::rtps::OutputTrafficManager statistics_info_; + TCPTransportInterface( int32_t transport_kind); diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 4ba7d059240..781ab3e81af 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -42,20 +42,6 @@ using PortParameters = fastrtps::rtps::PortParameters; using SenderResource = fastrtps::rtps::SenderResource; using Log = fastdds::dds::Log; -struct MultiUniLocatorsLinkage -{ - MultiUniLocatorsLinkage( - LocatorList&& m, - LocatorList&& u) - : multicast(std::move(m)) - , unicast(std::move(u)) - { - } - - LocatorList multicast; - LocatorList unicast; -}; - UDPTransportDescriptor::UDPTransportDescriptor() : SocketTransportDescriptor(s_maximumMessageSize, s_maximumInitialPeersRange) , m_output_udp_socket(0) @@ -286,6 +272,7 @@ bool UDPTransportInterface::OpenOutputChannel( if (udp_sender_resource) { + statistics_info_.add_entry(locator); return true; } } @@ -409,6 +396,7 @@ bool UDPTransportInterface::OpenOutputChannel( return false; } + statistics_info_.add_entry(locator); return true; } @@ -530,8 +518,7 @@ bool UDPTransportInterface::send( #endif // ifndef _WIN32 asio::error_code ec; - StatisticsSubmessageData::Sequence seq; - set_statistics_submessage_from_transport(send_buffer, send_buffer_size, seq); + statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size); bytesSent = getSocketPtr(socket)->send_to(asio::buffer(send_buffer, send_buffer_size), destinationEndpoint, 0, ec); if (!!ec) diff --git a/src/cpp/rtps/transport/UDPTransportInterface.h b/src/cpp/rtps/transport/UDPTransportInterface.h index b66a6d5f21d..aeabef8416c 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.h +++ b/src/cpp/rtps/transport/UDPTransportInterface.h @@ -21,7 +21,9 @@ #include #include #include + #include +#include #include #include @@ -170,6 +172,7 @@ class UDPTransportInterface : public TransportInterface uint32_t mSendBufferSize; uint32_t mReceiveBufferSize; + eprosima::fastdds::statistics::rtps::OutputTrafficManager statistics_info_; UDPTransportInterface( int32_t transport_kind); diff --git a/src/cpp/rtps/transport/test_UDPv4Transport.cpp b/src/cpp/rtps/transport/test_UDPv4Transport.cpp index 910332e6176..dfec9a32070 100644 --- a/src/cpp/rtps/transport/test_UDPv4Transport.cpp +++ b/src/cpp/rtps/transport/test_UDPv4Transport.cpp @@ -201,6 +201,7 @@ bool test_UDPv4Transport::send( { if (packet_should_drop(send_buffer, send_buffer_size)) { + statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size); log_drop(send_buffer, send_buffer_size); return true; } diff --git a/src/cpp/statistics/rtps/StatisticsBase.cpp b/src/cpp/statistics/rtps/StatisticsBase.cpp index df61448e2a4..3734578996b 100644 --- a/src/cpp/statistics/rtps/StatisticsBase.cpp +++ b/src/cpp/statistics/rtps/StatisticsBase.cpp @@ -31,6 +31,44 @@ namespace eprosima { namespace fastdds { namespace statistics { +static void add_bytes( + Entity2LocatorTraffic& traffic, + const rtps::StatisticsSubmessageData::Sequence& distance) +{ + uint64_t count = traffic.packet_count(); + int16_t high = traffic.byte_magnitude_order(); + uint64_t low = traffic.byte_count(); + + count += distance.sequence; + high += distance.bytes_high; + low += distance.bytes; + high += (low < traffic.byte_count()); + + traffic.packet_count(count); + traffic.byte_magnitude_order(high); + traffic.byte_count(low); +} + +static void sub_bytes( + Entity2LocatorTraffic& traffic, + uint64_t bytes) +{ + uint64_t count = traffic.packet_count(); + int16_t high = traffic.byte_magnitude_order(); + uint64_t low = traffic.byte_count(); + + if (count > 0) + { + count--; + low -= bytes; + high -= (low > traffic.byte_count()); + + traffic.packet_count(count); + traffic.byte_magnitude_order(high); + traffic.byte_count(low); + } +} + detail::Locator_s to_statistics_type( fastrtps::rtps::Locator_t locator) { @@ -292,16 +330,25 @@ void StatisticsParticipantImpl::on_network_statistics( const fastrtps::rtps::GuidPrefix_t& source_participant, const fastrtps::rtps::Locator_t& source_locator, const fastrtps::rtps::Locator_t& reception_locator, - const rtps::StatisticsSubmessageData& data) + const rtps::StatisticsSubmessageData& data, + uint64_t datagram_size) { - static_cast(source_participant); + static_cast(reception_locator); + process_network_timestamp(source_locator, data.destination, data.ts); + process_network_sequence(source_participant, data.destination, data.seq, datagram_size); +} +void StatisticsParticipantImpl::process_network_timestamp( + const fastrtps::rtps::Locator_t& source_locator, + const fastrtps::rtps::Locator_t& reception_locator, + const rtps::StatisticsSubmessageData::TimeStamp& ts) +{ using namespace eprosima::fastrtps::rtps; - Time_t ts(data.ts.seconds, data.ts.fraction); + Time_t source_ts(ts.seconds, ts.fraction); Time_t current_ts; Time_t::now(current_ts); - auto latency = static_cast((current_ts - ts).to_ns()); + auto latency = static_cast((current_ts - source_ts).to_ns()); Locator2LocatorData notification; notification.src_locator(to_statistics_type(source_locator)); @@ -319,6 +366,76 @@ void StatisticsParticipantImpl::on_network_statistics( }); } +void StatisticsParticipantImpl::process_network_sequence( + const fastrtps::rtps::GuidPrefix_t& source_participant, + const fastrtps::rtps::Locator_t& reception_locator, + const rtps::StatisticsSubmessageData::Sequence& seq, + uint64_t datagram_size) +{ + lost_traffic_key key(source_participant, reception_locator); + bool should_notify = false; + Entity2LocatorTraffic notification; + + { + std::lock_guard lock(get_statistics_mutex()); + lost_traffic_value& value = lost_traffic_[key]; + + if (value.first_sequence > seq.sequence) + { + // Datagrams before the first received one are ignored + return; + } + + if (value.first_sequence == 0) + { + // This is the first time we receive a statistics sequence from source_participant on reception_locator + GUID_t guid(source_participant, ENTITYID_RTPSParticipant); + value.data.src_guid(to_statistics_type(guid)); + value.data.dst_locator(to_statistics_type(reception_locator)); + value.first_sequence = seq.sequence; + } + else + { + // We shouldn't receive the same sequence twice + assert(seq.sequence != value.seq_data.sequence); + // Detect discontinuity. We will only notify in that case + should_notify = seq.sequence != (value.seq_data.sequence + 1); + if (should_notify) + { + if (seq.sequence > value.seq_data.sequence) + { + // Received sequence is higher, data has been lost + add_bytes(value.data, rtps::StatisticsSubmessageData::Sequence::distance(value.seq_data, seq)); + } + + // We should never count the current received datagram + sub_bytes(value.data, datagram_size); + + notification = value.data; + } + } + + if (seq.sequence > value.seq_data.sequence) + { + value.seq_data = seq; + } + } + + if (should_notify) + { + // Perform the callbacks + Data data; + // note that the setter sets RTPS_SENT by default + data.entity2locator_traffic(notification); + data._d(EventKind::RTPS_LOST); + + for_each_listener([&data](const Key& listener) + { + listener->on_statistics_data(data); + }); + } +} + void StatisticsParticipantImpl::on_rtps_sent( const fastrtps::rtps::Locator_t& loc, unsigned long payload_size) @@ -334,7 +451,7 @@ void StatisticsParticipantImpl::on_rtps_sent( { std::lock_guard lock(get_statistics_mutex()); - auto& val = traffic[loc]; + auto& val = traffic_[loc]; notification.packet_count(++val.packet_count); notification.byte_count(val.byte_count += payload_size); notification.byte_magnitude_order((int16_t)floor(log10(float(val.byte_count)))); diff --git a/src/cpp/statistics/rtps/StatisticsBase.hpp b/src/cpp/statistics/rtps/StatisticsBase.hpp index 07ca77346b2..32b5dc7b8f6 100644 --- a/src/cpp/statistics/rtps/StatisticsBase.hpp +++ b/src/cpp/statistics/rtps/StatisticsBase.hpp @@ -113,7 +113,17 @@ class StatisticsParticipantImpl unsigned long long byte_count = {}; }; - std::map traffic; + std::map traffic_; + + // RTPS_LOST ancillary + using lost_traffic_key = std::pair; + struct lost_traffic_value + { + uint64_t first_sequence = 0; + Entity2LocatorTraffic data{}; + rtps::StatisticsSubmessageData::Sequence seq_data{}; + }; + std::map lost_traffic_; // PDP_PACKETS ancillary unsigned long long pdp_counter_ = {}; @@ -255,12 +265,38 @@ class StatisticsParticipantImpl * @param [in] source_locator Locator indicating the sending address. * @param [in] reception_locator Locator indicating the listening address. * @param [in] data Statistics submessage received. + * @param [in] datagram_size The size in bytes of the received datagram. */ void on_network_statistics( const fastrtps::rtps::GuidPrefix_t& source_participant, const fastrtps::rtps::Locator_t& source_locator, const fastrtps::rtps::Locator_t& reception_locator, - const rtps::StatisticsSubmessageData& data); + const rtps::StatisticsSubmessageData& data, + uint64_t datagram_size); + + /* + * Process a received statistics submessage timestamp, informing of network latency. + * @param [in] source_locator Locator indicating the sending address. + * @param [in] reception_locator Locator indicating the listening address. + * @param [in] ts The timestamp of the statistics submessage received. + */ + void process_network_timestamp( + const fastrtps::rtps::Locator_t& source_locator, + const fastrtps::rtps::Locator_t& reception_locator, + const rtps::StatisticsSubmessageData::TimeStamp& ts); + + /* + * Process a received statistics submessage sequence, informing of network loss. + * @param [in] source_participant GUID prefix of the participant sending the message. + * @param [in] reception_locator Locator indicating the listening address. + * @param [in] seq The sequencing info ot the statistics submessage received. + * @param [in] datagram_size The size in bytes of the received datagram. + */ + void process_network_sequence( + const fastrtps::rtps::GuidPrefix_t& source_participant, + const fastrtps::rtps::Locator_t& reception_locator, + const rtps::StatisticsSubmessageData::Sequence& seq, + uint64_t datagram_size); /* * Report a message that is sent by the participant @@ -415,12 +451,14 @@ class StatisticsParticipantImpl * @param [in] Locator indicating the sending address. * @param [in] Locator indicating the listening address. * @param [in] Statistics submessage received. + * @param [in] The size in bytes of the received datagram. */ inline void on_network_statistics( const fastrtps::rtps::GuidPrefix_t&, const fastrtps::rtps::Locator_t&, const fastrtps::rtps::Locator_t&, - const rtps::StatisticsSubmessageData&) + const rtps::StatisticsSubmessageData&, + uint64_t) { } diff --git a/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp b/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp new file mode 100644 index 00000000000..175f82c4992 --- /dev/null +++ b/src/cpp/statistics/rtps/messages/OutputTrafficManager.hpp @@ -0,0 +1,110 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file OutputTrafficManager.hpp + */ + +#ifndef _STATISTICS_RTPS_MESSAGES_OUTPUTTRAFFICMANAGER_HPP_ +#define _STATISTICS_RTPS_MESSAGES_OUTPUTTRAFFICMANAGER_HPP_ + +#include +#include +#include +#include + +#include + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace statistics { +namespace rtps { + +/** + * A container for output locators sequencing information. + * @note This is non-thread-safe class. + */ +class OutputTrafficManager +{ +public: + + /** + * Adds an output locator to the collection. + * If the locator is already present in the collection, this method is a no-op. + * @param locator The locator for which sequencing information should be kept. + */ + inline void add_entry( + const eprosima::fastrtps::rtps::Locator_t& locator) + { + static_cast(locator); + +#ifdef FASTDDS_STATISTICS + auto search = [locator](const entry_type& entry) -> bool + { + return locator == entry.first; + }; + auto it = std::find_if(collection_.cbegin(), collection_.cend(), search); + if (it == collection_.cend()) + { + collection_.emplace_back(locator, value_type{}); + } +#endif // FASTDDS_STATISTICS + } + + /** + * + */ + inline void set_statistics_message_data( + const eprosima::fastrtps::rtps::Locator_t& locator, + const eprosima::fastrtps::rtps::octet* send_buffer, + uint32_t send_buffer_size) + { + static_cast(locator); + static_cast(send_buffer); + static_cast(send_buffer_size); + +#ifdef FASTDDS_STATISTICS + auto search = [locator](const entry_type& entry) -> bool + { + return locator == entry.first; + }; + auto it = std::find_if(collection_.begin(), collection_.end(), search); + assert(it != collection_.end()); + set_statistics_submessage_from_transport(locator, send_buffer, send_buffer_size, it->second); +#endif // FASTDDS_STATISTICS + } + +#ifdef FASTDDS_STATISTICS + +private: + + using key_type = eprosima::fastrtps::rtps::Locator_t; + using value_type = StatisticsSubmessageData::Sequence; + using entry_type = std::pair; + + std::list collection_; +#endif // FASTDDS_STATISTICS + +}; + +} // namespace rtps +} // namespace statistics +} // namespace fastdds +} // namespace eprosima + +#endif // _STATISTICS_RTPS_MESSAGES_OUTPUTTRAFFICMANAGER_HPP_ diff --git a/src/cpp/statistics/rtps/messages/RTPSStatisticsMessages.hpp b/src/cpp/statistics/rtps/messages/RTPSStatisticsMessages.hpp index 73aa100cf48..1691db146b2 100644 --- a/src/cpp/statistics/rtps/messages/RTPSStatisticsMessages.hpp +++ b/src/cpp/statistics/rtps/messages/RTPSStatisticsMessages.hpp @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -52,15 +53,31 @@ struct StatisticsSubmessageData { sequence++; auto new_bytes = bytes + message_size; - if (new_bytes < bytes) - { - bytes_high++; - } + bytes_high += (new_bytes < bytes); bytes = new_bytes; } + static Sequence distance( + const Sequence& from, + const Sequence& to) + { + // Check to >= from + assert(to.sequence >= from.sequence); + assert(to.bytes_high >= from.bytes_high); + assert((to.bytes_high > from.bytes_high) || (to.bytes >= from.bytes)); + + Sequence ret; + ret.sequence = to.sequence - from.sequence; + ret.bytes_high = to.bytes_high - from.bytes_high; + ret.bytes = to.bytes - from.bytes; + ret.bytes_high -= (ret.bytes > to.bytes); + + return ret; + } + }; + eprosima::fastrtps::rtps::Locator_t destination; TimeStamp ts{}; Sequence seq{}; }; @@ -113,6 +130,7 @@ inline void read_statistics_submessage( // Read all fields using namespace eprosima::fastrtps::rtps; + CDRMessage::readLocator(msg, &data.destination); CDRMessage::readInt32(msg, &data.ts.seconds); CDRMessage::readUInt32(msg, &data.ts.fraction); CDRMessage::readUInt64(msg, &data.seq.sequence); @@ -130,6 +148,9 @@ inline uint32_t get_statistics_message_pos( const eprosima::fastrtps::rtps::octet* send_buffer, uint32_t send_buffer_size) { + // Only used on an assert + static_cast(send_buffer); + // Message should contain RTPS header and statistic submessage assert(statistics_submessage_length + RTPSMESSAGE_HEADER_SIZE <= send_buffer_size); @@ -143,10 +164,12 @@ inline uint32_t get_statistics_message_pos( #endif // FASTDDS_STATISTICS inline void set_statistics_submessage_from_transport( + const eprosima::fastrtps::rtps::Locator_t& destination, const eprosima::fastrtps::rtps::octet* send_buffer, uint32_t send_buffer_size, StatisticsSubmessageData::Sequence& sequence) { + static_cast(destination); static_cast(send_buffer); static_cast(send_buffer_size); static_cast(sequence); @@ -167,6 +190,7 @@ inline void set_statistics_submessage_from_transport( Time_t ts; Time_t::now(ts); + submessage->destination = destination; submessage->ts.seconds = ts.seconds(); submessage->ts.fraction = ts.fraction(); submessage->seq.sequence = sequence.sequence; diff --git a/test/unittest/dds/status/CMakeLists.txt b/test/unittest/dds/status/CMakeLists.txt index 0a1802b9d18..14c593a8e7c 100644 --- a/test/unittest/dds/status/CMakeLists.txt +++ b/test/unittest/dds/status/CMakeLists.txt @@ -124,11 +124,15 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER)) add_executable(ListenerTests ${LISTENERTESTS_SOURCE}) target_compile_definitions(ListenerTests PRIVATE FASTRTPS_NO_LIB + BOOST_ASIO_STANDALONE + ASIO_STANDALONE + $<$,$>,$>>:ASIO_DISABLE_STD_STRING_VIEW> $<$>,$>:__DEBUG> $<$:__INTERNALDEBUG> # Internal debug activated. ) target_include_directories(ListenerTests PRIVATE ${GTEST_INCLUDE_DIRS} ${GMOCK_INCLUDE_DIRS} + ${Asio_INCLUDE_DIR} ${PROJECT_SOURCE_DIR}/test/mock/rtps/DataSharingPayloadPool ${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSReader ${PROJECT_SOURCE_DIR}/test/mock/rtps/RTPSWriter diff --git a/test/unittest/statistics/rtps/RTPSStatisticsTests.cpp b/test/unittest/statistics/rtps/RTPSStatisticsTests.cpp index b5c46f3c70e..b3dd32d3405 100644 --- a/test/unittest/statistics/rtps/RTPSStatisticsTests.cpp +++ b/test/unittest/statistics/rtps/RTPSStatisticsTests.cpp @@ -69,6 +69,9 @@ struct MockListener : IListener case RTPS_SENT: on_rtps_sent(data.entity2locator_traffic()); break; + case RTPS_LOST: + on_rtps_lost(data.entity2locator_traffic()); + break; case NETWORK_LATENCY: on_network_latency(data.locator2locator_data()); break; @@ -116,6 +119,7 @@ struct MockListener : IListener MOCK_METHOD1(on_history_latency, void(const eprosima::fastdds::statistics::WriterReaderData&)); MOCK_METHOD1(on_rtps_sent, void(const eprosima::fastdds::statistics::Entity2LocatorTraffic&)); + MOCK_METHOD1(on_rtps_lost, void(const eprosima::fastdds::statistics::Entity2LocatorTraffic&)); MOCK_METHOD1(on_network_latency, void(const eprosima::fastdds::statistics::Locator2LocatorData&)); MOCK_METHOD1(on_heartbeat_count, void(const eprosima::fastdds::statistics::EntityCount&)); MOCK_METHOD1(on_acknack_count, void(const eprosima::fastdds::statistics::EntityCount&)); @@ -259,6 +263,12 @@ class RTPSStatisticsTestsImpl r_att.endpoint.reliabilityKind = reliability_qos; r_att.endpoint.durabilityKind = durability_qos; + // Setting localhost as the only locator ensures that DATA submessages will be sent only once. + Locator_t local_locator; + IPLocator::setIPv4(local_locator, 127, 0, 0, 1); + r_att.endpoint.unicastLocatorList.clear(); + r_att.endpoint.unicastLocatorList.push_back(local_locator); + reader_ = RTPSDomain::createRTPSReader(participant_, r_att, reader_history_); } @@ -381,8 +391,9 @@ class RTPSStatisticsTestsImpl ASSERT_NE(nullptr, writer_change); std::string str("https://github.com/eProsima/Fast-DDS.git"); - memcpy(writer_change->serializedPayload.data, str.c_str(), str.length()); - writer_change->serializedPayload.length = (uint32_t)str.length(); + uint32_t change_length = std::min(length, static_cast(str.length())); + memcpy(writer_change->serializedPayload.data, str.c_str(), change_length); + writer_change->serializedPayload.length = change_length; ASSERT_TRUE(writer_history_->add_change(writer_change)); } @@ -535,6 +546,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_management) /* * This test checks RTPSParticipant, RTPSWriter and RTPSReader statistics module related APIs. * - RTPS_SENT callbacks are performed + * - RTPS_LOST callbacks are performed + * - NETWORK_LATENCY callbacks are performed * - HISTORY2HISTORY_LATENCY callbacks are performed * - DATA_COUNT callbacks are performed for DATA submessages * - RESENT_DATAS callbacks are performed for DATA submessages demanded by the readers @@ -591,7 +604,7 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks) // participant specific callbacks auto participant_listener = make_shared(); ASSERT_TRUE(participant_->add_statistics_listener(participant_listener, - EventKind::RTPS_SENT | EventKind::NETWORK_LATENCY)); + EventKind::RTPS_SENT | EventKind::NETWORK_LATENCY | EventKind::RTPS_LOST)); // writer callbacks through participant listener auto participant_writer_listener = make_shared(); @@ -613,9 +626,11 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks) auto reader_listener = make_shared(); ASSERT_TRUE(reader_->add_statistics_listener(reader_listener)); - // we must received the RTPS_SENT and NETWORK_LATENCY notifications + // we must received the RTPS_SENT, RTPS_LOST and NETWORK_LATENCY notifications EXPECT_CALL(*participant_listener, on_rtps_sent) .Times(AtLeast(1)); + EXPECT_CALL(*participant_listener, on_rtps_lost) + .Times(AtLeast(1)); EXPECT_CALL(*participant_listener, on_network_latency) .Times(AtLeast(1)); @@ -681,7 +696,8 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_listener_callbacks) EXPECT_TRUE(writer_->remove_statistics_listener(writer_listener)); EXPECT_TRUE(reader_->remove_statistics_listener(reader_listener)); - EXPECT_TRUE(participant_->remove_statistics_listener(participant_listener, EventKind::RTPS_SENT)); + EXPECT_TRUE(participant_->remove_statistics_listener(participant_listener, + EventKind::RTPS_SENT | EventKind::NETWORK_LATENCY | EventKind::RTPS_LOST)); EXPECT_TRUE(participant_->remove_statistics_listener(participant_writer_listener, EventKind::DATA_COUNT | EventKind::RESENT_DATAS | EventKind::PUBLICATION_THROUGHPUT | EventKind::SAMPLE_DATAS)); @@ -1013,6 +1029,130 @@ TEST_F(RTPSStatisticsTests, statistics_rpts_avoid_empty_resent_callbacks) EXPECT_TRUE(writer_->remove_statistics_listener(writer_listener)); } +/* + * This test checks additional cases for RTPS_LOST when RTPS datagrams are received unordered + */ +TEST_F(RTPSStatisticsTests, statistics_rpts_unordered_datagrams) +{ + using namespace ::testing; + using namespace fastrtps; + using namespace fastrtps::rtps; + using namespace std; + + using test_UDPv4Transport = eprosima::fastdds::rtps::test_UDPv4Transport; + + constexpr uint16_t num_messages = 10; + constexpr std::array message_order { + 2, 5, 1, 3, 4, 7, 8, 6, 9, 0 + }; + uint64_t lost_count_notified[] = + { + // seq received description notify + // 2 first sequence NO + // 5 3 & 4 lost 2 + 2, + // 1 before first NO + // 3 3 recovered 1 + 1, + // 4 4 recovered 0 + 0, + // 7 6 lost 1 + 1, + // 8 in sequence NO + // 6 6 recovered 0 + 0 + // 9 in sequence NO + // 0 before first NO + }; + + // A filter to add the first `num_messages` user DATA_FRAG into `user_data` + test_UDPv4Transport::test_UDPv4Transport_DropLogLength = num_messages; + set_transport_filter( + DATA_FRAG, + [](fastrtps::rtps::CDRMessage_t& msg)-> bool + { + uint32_t old_pos = msg.pos; + + // see RTPS DDS 9.4.5.3 Data Submessage + EntityId_t readerID, writerID; + + msg.pos += 2; // flags + msg.pos += 2; // octets to inline quos + CDRMessage::readEntityId(&msg, &readerID); + CDRMessage::readEntityId(&msg, &writerID); + + // restore buffer pos + msg.pos = old_pos; + + // Let non-user traffic pass + return (writerID.value[3] & 0xC0) == 0; + }); + + uint16_t length = 100; + create_endpoints(num_messages * length, BEST_EFFORT); + match_endpoints(false, "string", "statisticsSmallTopic"); + + // This will send `num_messages` DATA_FRAG nessages, which will be backed up on `user_data` + for (uint16_t i = 0; i < num_messages; ++i) + { + write_large_sample(num_messages * length, length); + } + + // create the listener and set expectations + auto participant_listener = make_shared(); + ASSERT_TRUE(participant_->add_statistics_listener(participant_listener, EventKind::RTPS_LOST)); + + std::vector lost_callback_data; + auto callback_action = [&lost_callback_data](const Entity2LocatorTraffic& data) -> void + { + const Locator_t& loc = *(reinterpret_cast(&data.dst_locator())); + if (IPLocator::isLocal(loc)) + { + std::cout << "RTPS_LOST " << data.packet_count() << std::endl; + lost_callback_data.push_back(data); + } + }; + EXPECT_CALL(*participant_listener, on_rtps_lost).Times(AtLeast(1)).WillRepeatedly(callback_action); + + // Calculate destination where datagrams should be sent to + const Locator_t& locator = *(reader_->getAttributes().unicastLocatorList.begin()); + auto locator_ip = IPLocator::getIPv4(locator); + auto locator_port = IPLocator::getPhysicalPort(locator); + asio::ip::address_v4::bytes_type asio_ip{ { locator_ip[0], locator_ip[1], locator_ip[2], locator_ip[3] } }; + asio::ip::udp::endpoint destination(asio::ip::address_v4(asio_ip), locator_port); + + // Prepare sending socket + asio::error_code ec; + asio::io_context ctx; + asio::ip::udp::socket sender(ctx); + sender.open(asio::ip::udp::v4()); + + // Send messages in different order + for (size_t idx : message_order) + { + const std::vector& msg = test_UDPv4Transport::test_UDPv4Transport_DropLog[idx]; + EXPECT_EQ(msg.size(), sender.send_to(asio::buffer(msg.data(), msg.size()), destination, 0, ec)) << ec; + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // release the listener + EXPECT_TRUE(participant_->remove_statistics_listener(participant_listener, EventKind::RTPS_LOST)); + + // Check reported callbacks + EXPECT_EQ(sizeof(lost_count_notified) / sizeof(lost_count_notified[0]), lost_callback_data.size()); + for (size_t i = 0; i < lost_callback_data.size(); ++i) + { + EXPECT_EQ(lost_count_notified[i], lost_callback_data[i].packet_count()); + } + + // Last reported callback should be 0, as all packets have been sent + const Entity2LocatorTraffic& last_lost_data = lost_callback_data.back(); + EXPECT_EQ(0u, last_lost_data.packet_count()); + EXPECT_EQ(0u, last_lost_data.byte_count()); + EXPECT_EQ(0, last_lost_data.byte_magnitude_order()); +} + } // namespace rtps } // namespace statistics } // namespace fastdds