Skip to content

Commit

Permalink
Collection of data and notification to listeners RTPS_PACKETS_LOST (#…
Browse files Browse the repository at this point in the history
…1956)

* Refs 11450. Added OutputTrafficManager.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11450. Added OutputTrafficManager to UDP transport.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11450. Added OutputTrafficManager to TCP transport.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10795. Sepparate processing of timestamp and sequence.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10795. Loss accountance and notification.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10795. Send destination locator on statistics submessage.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10795. Test transport updates sequencing info on packet drops.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11450. Explicitly use boolean expression for carry operations.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10795. Added RTPS_LOST callback to RTPSStatisticsTests.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10795. Linter.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Added test for unordered packets.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Avoid processing datagrams in the past.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Account for full message size.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Avoid unused var warning.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Avoid unused parameter warning.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Prevent side effects on length accounting.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Correct naming of private vars.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Use correct length on write_small_sample.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Fix asio include on ListenerTests.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
MiguelCompany authored May 18, 2021
1 parent 836e0d3 commit cc7be2f
Show file tree
Hide file tree
Showing 12 changed files with 468 additions and 38 deletions.
8 changes: 5 additions & 3 deletions src/cpp/rtps/messages/MessageReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,8 @@ void MessageReceiver::notify_network_statistics(

// Keep track of current position, so we can restore it later.
auto initial_pos = msg->pos;
while (msg->pos < msg->length)
auto msg_length = msg->length;
while (msg->pos < msg_length)
{
SubmessageHeader_t header;
if (!readSubmessageHeader(msg, &header))
Expand All @@ -1350,14 +1351,15 @@ void MessageReceiver::notify_network_statistics(
{
// Check submessage validity
if ((statistics_submessage_data_length != header.submessageLength) ||
((msg->pos + header.submessageLength) > msg->length))
((msg->pos + header.submessageLength) > msg_length))
{
break;
}

StatisticsSubmessageData data;
read_statistics_submessage(msg, data);
participant_->on_network_statistics(source_guid_prefix_, source_locator, reception_locator, data);
participant_->on_network_statistics(
source_guid_prefix_, source_locator, reception_locator, data, msg_length);
break;
}

Expand Down
5 changes: 3 additions & 2 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ bool TCPTransportInterface::OpenOutputChannel(
tcp_sender_resource->channel()->add_logical_port(logical_port, rtcp_message_manager_.get());
}

statistics_info_.add_entry(locator);
return true;
}
}
Expand Down Expand Up @@ -670,6 +671,7 @@ bool TCPTransportInterface::OpenOutputChannel(
channel->connect(channel_resources_[physical_locator]);
}

statistics_info_.add_entry(locator);
success = true;
channel->add_logical_port(logical_port, rtcp_message_manager_.get());
send_resource_list.emplace_back(
Expand Down Expand Up @@ -1156,8 +1158,7 @@ bool TCPTransportInterface::send(
if (channel->is_logical_port_opened(logical_port))
{
TCPHeader tcp_header;
StatisticsSubmessageData::Sequence seq;
set_statistics_submessage_from_transport(send_buffer, send_buffer_size, seq);
statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size);
fill_rtcp_header(tcp_header, send_buffer, send_buffer_size, logical_port);

{
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <asio/ssl.hpp>
#endif // if TLS_FOUND

#include <statistics/rtps/messages/OutputTrafficManager.hpp>

#include <asio.hpp>
#include <thread>
Expand Down Expand Up @@ -95,6 +96,8 @@ class TCPTransportInterface : public TransportInterface

std::map<Locator, std::shared_ptr<TCPAcceptor>> acceptors_;

eprosima::fastdds::statistics::rtps::OutputTrafficManager statistics_info_;

TCPTransportInterface(
int32_t transport_kind);

Expand Down
19 changes: 3 additions & 16 deletions src/cpp/rtps/transport/UDPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,6 @@ using PortParameters = fastrtps::rtps::PortParameters;
using SenderResource = fastrtps::rtps::SenderResource;
using Log = fastdds::dds::Log;

struct MultiUniLocatorsLinkage
{
MultiUniLocatorsLinkage(
LocatorList&& m,
LocatorList&& u)
: multicast(std::move(m))
, unicast(std::move(u))
{
}

LocatorList multicast;
LocatorList unicast;
};

UDPTransportDescriptor::UDPTransportDescriptor()
: SocketTransportDescriptor(s_maximumMessageSize, s_maximumInitialPeersRange)
, m_output_udp_socket(0)
Expand Down Expand Up @@ -286,6 +272,7 @@ bool UDPTransportInterface::OpenOutputChannel(

if (udp_sender_resource)
{
statistics_info_.add_entry(locator);
return true;
}
}
Expand Down Expand Up @@ -409,6 +396,7 @@ bool UDPTransportInterface::OpenOutputChannel(
return false;
}

statistics_info_.add_entry(locator);
return true;
}

Expand Down Expand Up @@ -530,8 +518,7 @@ bool UDPTransportInterface::send(
#endif // ifndef _WIN32

asio::error_code ec;
StatisticsSubmessageData::Sequence seq;
set_statistics_submessage_from_transport(send_buffer, send_buffer_size, seq);
statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size);
bytesSent = getSocketPtr(socket)->send_to(asio::buffer(send_buffer,
send_buffer_size), destinationEndpoint, 0, ec);
if (!!ec)
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/transport/UDPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <fastdds/rtps/transport/TransportInterface.h>
#include <fastdds/rtps/transport/UDPTransportDescriptor.h>
#include <fastrtps/utils/IPFinder.h>

#include <rtps/transport/UDPChannelResource.h>
#include <statistics/rtps/messages/OutputTrafficManager.hpp>

#include <vector>
#include <memory>
Expand Down Expand Up @@ -170,6 +172,7 @@ class UDPTransportInterface : public TransportInterface

uint32_t mSendBufferSize;
uint32_t mReceiveBufferSize;
eprosima::fastdds::statistics::rtps::OutputTrafficManager statistics_info_;

UDPTransportInterface(
int32_t transport_kind);
Expand Down
1 change: 1 addition & 0 deletions src/cpp/rtps/transport/test_UDPv4Transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ bool test_UDPv4Transport::send(
{
if (packet_should_drop(send_buffer, send_buffer_size))
{
statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size);
log_drop(send_buffer, send_buffer_size);
return true;
}
Expand Down
127 changes: 122 additions & 5 deletions src/cpp/statistics/rtps/StatisticsBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,44 @@ namespace eprosima {
namespace fastdds {
namespace statistics {

static void add_bytes(
Entity2LocatorTraffic& traffic,
const rtps::StatisticsSubmessageData::Sequence& distance)
{
uint64_t count = traffic.packet_count();
int16_t high = traffic.byte_magnitude_order();
uint64_t low = traffic.byte_count();

count += distance.sequence;
high += distance.bytes_high;
low += distance.bytes;
high += (low < traffic.byte_count());

traffic.packet_count(count);
traffic.byte_magnitude_order(high);
traffic.byte_count(low);
}

static void sub_bytes(
Entity2LocatorTraffic& traffic,
uint64_t bytes)
{
uint64_t count = traffic.packet_count();
int16_t high = traffic.byte_magnitude_order();
uint64_t low = traffic.byte_count();

if (count > 0)
{
count--;
low -= bytes;
high -= (low > traffic.byte_count());

traffic.packet_count(count);
traffic.byte_magnitude_order(high);
traffic.byte_count(low);
}
}

detail::Locator_s to_statistics_type(
fastrtps::rtps::Locator_t locator)
{
Expand Down Expand Up @@ -292,16 +330,25 @@ void StatisticsParticipantImpl::on_network_statistics(
const fastrtps::rtps::GuidPrefix_t& source_participant,
const fastrtps::rtps::Locator_t& source_locator,
const fastrtps::rtps::Locator_t& reception_locator,
const rtps::StatisticsSubmessageData& data)
const rtps::StatisticsSubmessageData& data,
uint64_t datagram_size)
{
static_cast<void>(source_participant);
static_cast<void>(reception_locator);
process_network_timestamp(source_locator, data.destination, data.ts);
process_network_sequence(source_participant, data.destination, data.seq, datagram_size);
}

void StatisticsParticipantImpl::process_network_timestamp(
const fastrtps::rtps::Locator_t& source_locator,
const fastrtps::rtps::Locator_t& reception_locator,
const rtps::StatisticsSubmessageData::TimeStamp& ts)
{
using namespace eprosima::fastrtps::rtps;

Time_t ts(data.ts.seconds, data.ts.fraction);
Time_t source_ts(ts.seconds, ts.fraction);
Time_t current_ts;
Time_t::now(current_ts);
auto latency = static_cast<float>((current_ts - ts).to_ns());
auto latency = static_cast<float>((current_ts - source_ts).to_ns());

Locator2LocatorData notification;
notification.src_locator(to_statistics_type(source_locator));
Expand All @@ -319,6 +366,76 @@ void StatisticsParticipantImpl::on_network_statistics(
});
}

void StatisticsParticipantImpl::process_network_sequence(
const fastrtps::rtps::GuidPrefix_t& source_participant,
const fastrtps::rtps::Locator_t& reception_locator,
const rtps::StatisticsSubmessageData::Sequence& seq,
uint64_t datagram_size)
{
lost_traffic_key key(source_participant, reception_locator);
bool should_notify = false;
Entity2LocatorTraffic notification;

{
std::lock_guard<std::recursive_mutex> lock(get_statistics_mutex());
lost_traffic_value& value = lost_traffic_[key];

if (value.first_sequence > seq.sequence)
{
// Datagrams before the first received one are ignored
return;
}

if (value.first_sequence == 0)
{
// This is the first time we receive a statistics sequence from source_participant on reception_locator
GUID_t guid(source_participant, ENTITYID_RTPSParticipant);
value.data.src_guid(to_statistics_type(guid));
value.data.dst_locator(to_statistics_type(reception_locator));
value.first_sequence = seq.sequence;
}
else
{
// We shouldn't receive the same sequence twice
assert(seq.sequence != value.seq_data.sequence);
// Detect discontinuity. We will only notify in that case
should_notify = seq.sequence != (value.seq_data.sequence + 1);
if (should_notify)
{
if (seq.sequence > value.seq_data.sequence)
{
// Received sequence is higher, data has been lost
add_bytes(value.data, rtps::StatisticsSubmessageData::Sequence::distance(value.seq_data, seq));
}

// We should never count the current received datagram
sub_bytes(value.data, datagram_size);

notification = value.data;
}
}

if (seq.sequence > value.seq_data.sequence)
{
value.seq_data = seq;
}
}

if (should_notify)
{
// Perform the callbacks
Data data;
// note that the setter sets RTPS_SENT by default
data.entity2locator_traffic(notification);
data._d(EventKind::RTPS_LOST);

for_each_listener([&data](const Key& listener)
{
listener->on_statistics_data(data);
});
}
}

void StatisticsParticipantImpl::on_rtps_sent(
const fastrtps::rtps::Locator_t& loc,
unsigned long payload_size)
Expand All @@ -334,7 +451,7 @@ void StatisticsParticipantImpl::on_rtps_sent(
{
std::lock_guard<std::recursive_mutex> lock(get_statistics_mutex());

auto& val = traffic[loc];
auto& val = traffic_[loc];
notification.packet_count(++val.packet_count);
notification.byte_count(val.byte_count += payload_size);
notification.byte_magnitude_order((int16_t)floor(log10(float(val.byte_count))));
Expand Down
44 changes: 41 additions & 3 deletions src/cpp/statistics/rtps/StatisticsBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,17 @@ class StatisticsParticipantImpl
unsigned long long byte_count = {};
};

std::map<fastrtps::rtps::Locator_t, rtps_sent_data> traffic;
std::map<fastrtps::rtps::Locator_t, rtps_sent_data> traffic_;

// RTPS_LOST ancillary
using lost_traffic_key = std::pair<fastrtps::rtps::GuidPrefix_t, fastrtps::rtps::Locator_t>;
struct lost_traffic_value
{
uint64_t first_sequence = 0;
Entity2LocatorTraffic data{};
rtps::StatisticsSubmessageData::Sequence seq_data{};
};
std::map<lost_traffic_key, lost_traffic_value> lost_traffic_;

// PDP_PACKETS ancillary
unsigned long long pdp_counter_ = {};
Expand Down Expand Up @@ -255,12 +265,38 @@ class StatisticsParticipantImpl
* @param [in] source_locator Locator indicating the sending address.
* @param [in] reception_locator Locator indicating the listening address.
* @param [in] data Statistics submessage received.
* @param [in] datagram_size The size in bytes of the received datagram.
*/
void on_network_statistics(
const fastrtps::rtps::GuidPrefix_t& source_participant,
const fastrtps::rtps::Locator_t& source_locator,
const fastrtps::rtps::Locator_t& reception_locator,
const rtps::StatisticsSubmessageData& data);
const rtps::StatisticsSubmessageData& data,
uint64_t datagram_size);

/*
* Process a received statistics submessage timestamp, informing of network latency.
* @param [in] source_locator Locator indicating the sending address.
* @param [in] reception_locator Locator indicating the listening address.
* @param [in] ts The timestamp of the statistics submessage received.
*/
void process_network_timestamp(
const fastrtps::rtps::Locator_t& source_locator,
const fastrtps::rtps::Locator_t& reception_locator,
const rtps::StatisticsSubmessageData::TimeStamp& ts);

/*
* Process a received statistics submessage sequence, informing of network loss.
* @param [in] source_participant GUID prefix of the participant sending the message.
* @param [in] reception_locator Locator indicating the listening address.
* @param [in] seq The sequencing info ot the statistics submessage received.
* @param [in] datagram_size The size in bytes of the received datagram.
*/
void process_network_sequence(
const fastrtps::rtps::GuidPrefix_t& source_participant,
const fastrtps::rtps::Locator_t& reception_locator,
const rtps::StatisticsSubmessageData::Sequence& seq,
uint64_t datagram_size);

/*
* Report a message that is sent by the participant
Expand Down Expand Up @@ -415,12 +451,14 @@ class StatisticsParticipantImpl
* @param [in] Locator indicating the sending address.
* @param [in] Locator indicating the listening address.
* @param [in] Statistics submessage received.
* @param [in] The size in bytes of the received datagram.
*/
inline void on_network_statistics(
const fastrtps::rtps::GuidPrefix_t&,
const fastrtps::rtps::Locator_t&,
const fastrtps::rtps::Locator_t&,
const rtps::StatisticsSubmessageData&)
const rtps::StatisticsSubmessageData&,
uint64_t)
{
}

Expand Down
Loading

0 comments on commit cc7be2f

Please sign in to comment.