Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collection of data and notification to listeners RTPS_PACKETS_LOST [10795] #1956

Merged
merged 20 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1cff7df
Refs 11450. Added OutputTrafficManager.
MiguelCompany May 11, 2021
2dcf6ff
Refs 11450. Added OutputTrafficManager to UDP transport.
MiguelCompany May 11, 2021
f72d730
Refs 11450. Added OutputTrafficManager to TCP transport.
MiguelCompany May 12, 2021
4a4990a
Refs 10795. Sepparate processing of timestamp and sequence.
MiguelCompany May 12, 2021
dfbf25f
Refs 10795. Loss accountance and notification.
MiguelCompany May 12, 2021
57c6bf1
Refs 10795. Send destination locator on statistics submessage.
MiguelCompany May 12, 2021
ad86b42
Refs 10795. Test transport updates sequencing info on packet drops.
MiguelCompany May 12, 2021
5f55cd9
Refs 11450. Explicitly use boolean expression for carry operations.
MiguelCompany May 13, 2021
c0cf150
Refs 10795. Added RTPS_LOST callback to RTPSStatisticsTests.
MiguelCompany May 13, 2021
38410b0
Refs 10795. Linter.
MiguelCompany May 13, 2021
1282b17
Refs 11562. Added test for unordered packets.
MiguelCompany May 17, 2021
7f0b26b
Refs 11562. Avoid processing datagrams in the past.
MiguelCompany May 17, 2021
a1db13e
Refs 11562. Account for full message size.
MiguelCompany May 17, 2021
8b042d6
Refs 11562. Linters.
MiguelCompany May 18, 2021
36f1c16
Refs 11562. Avoid unused var warning.
MiguelCompany May 18, 2021
b2530b3
Refs 11562. Avoid unused parameter warning.
MiguelCompany May 18, 2021
e0f5d41
Refs 11562. Prevent side effects on length accounting.
MiguelCompany May 18, 2021
e291178
Refs 11562. Correct naming of private vars.
MiguelCompany May 18, 2021
0e33ab7
Refs 11562. Use correct length on write_small_sample.
MiguelCompany May 18, 2021
e24587d
Refs 11562. Fix asio include on ListenerTests.
MiguelCompany May 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/cpp/rtps/messages/MessageReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,8 @@ void MessageReceiver::notify_network_statistics(

// Keep track of current position, so we can restore it later.
auto initial_pos = msg->pos;
while (msg->pos < msg->length)
auto msg_length = msg->length;
while (msg->pos < msg_length)
{
SubmessageHeader_t header;
if (!readSubmessageHeader(msg, &header))
Expand All @@ -1350,14 +1351,15 @@ void MessageReceiver::notify_network_statistics(
{
// Check submessage validity
if ((statistics_submessage_data_length != header.submessageLength) ||
((msg->pos + header.submessageLength) > msg->length))
((msg->pos + header.submessageLength) > msg_length))
{
break;
}

StatisticsSubmessageData data;
read_statistics_submessage(msg, data);
participant_->on_network_statistics(source_guid_prefix_, source_locator, reception_locator, data);
participant_->on_network_statistics(
source_guid_prefix_, source_locator, reception_locator, data, msg_length);
break;
}

Expand Down
5 changes: 3 additions & 2 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ bool TCPTransportInterface::OpenOutputChannel(
tcp_sender_resource->channel()->add_logical_port(logical_port, rtcp_message_manager_.get());
}

statistics_info_.add_entry(locator);
return true;
}
}
Expand Down Expand Up @@ -670,6 +671,7 @@ bool TCPTransportInterface::OpenOutputChannel(
channel->connect(channel_resources_[physical_locator]);
}

statistics_info_.add_entry(locator);
success = true;
channel->add_logical_port(logical_port, rtcp_message_manager_.get());
send_resource_list.emplace_back(
Expand Down Expand Up @@ -1156,8 +1158,7 @@ bool TCPTransportInterface::send(
if (channel->is_logical_port_opened(logical_port))
{
TCPHeader tcp_header;
StatisticsSubmessageData::Sequence seq;
set_statistics_submessage_from_transport(send_buffer, send_buffer_size, seq);
statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size);
fill_rtcp_header(tcp_header, send_buffer, send_buffer_size, logical_port);

{
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <asio/ssl.hpp>
#endif // if TLS_FOUND

#include <statistics/rtps/messages/OutputTrafficManager.hpp>

#include <asio.hpp>
#include <thread>
Expand Down Expand Up @@ -95,6 +96,8 @@ class TCPTransportInterface : public TransportInterface

std::map<Locator, std::shared_ptr<TCPAcceptor>> acceptors_;

eprosima::fastdds::statistics::rtps::OutputTrafficManager statistics_info_;

TCPTransportInterface(
int32_t transport_kind);

Expand Down
19 changes: 3 additions & 16 deletions src/cpp/rtps/transport/UDPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,6 @@ using PortParameters = fastrtps::rtps::PortParameters;
using SenderResource = fastrtps::rtps::SenderResource;
using Log = fastdds::dds::Log;

struct MultiUniLocatorsLinkage
{
MultiUniLocatorsLinkage(
LocatorList&& m,
LocatorList&& u)
: multicast(std::move(m))
, unicast(std::move(u))
{
}

LocatorList multicast;
LocatorList unicast;
};

UDPTransportDescriptor::UDPTransportDescriptor()
: SocketTransportDescriptor(s_maximumMessageSize, s_maximumInitialPeersRange)
, m_output_udp_socket(0)
Expand Down Expand Up @@ -286,6 +272,7 @@ bool UDPTransportInterface::OpenOutputChannel(

if (udp_sender_resource)
{
statistics_info_.add_entry(locator);
return true;
}
}
Expand Down Expand Up @@ -409,6 +396,7 @@ bool UDPTransportInterface::OpenOutputChannel(
return false;
}

statistics_info_.add_entry(locator);
return true;
}

Expand Down Expand Up @@ -530,8 +518,7 @@ bool UDPTransportInterface::send(
#endif // ifndef _WIN32

asio::error_code ec;
StatisticsSubmessageData::Sequence seq;
set_statistics_submessage_from_transport(send_buffer, send_buffer_size, seq);
statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size);
bytesSent = getSocketPtr(socket)->send_to(asio::buffer(send_buffer,
send_buffer_size), destinationEndpoint, 0, ec);
if (!!ec)
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/transport/UDPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <fastdds/rtps/transport/TransportInterface.h>
#include <fastdds/rtps/transport/UDPTransportDescriptor.h>
#include <fastrtps/utils/IPFinder.h>

#include <rtps/transport/UDPChannelResource.h>
#include <statistics/rtps/messages/OutputTrafficManager.hpp>

#include <vector>
#include <memory>
Expand Down Expand Up @@ -170,6 +172,7 @@ class UDPTransportInterface : public TransportInterface

uint32_t mSendBufferSize;
uint32_t mReceiveBufferSize;
eprosima::fastdds::statistics::rtps::OutputTrafficManager statistics_info_;

UDPTransportInterface(
int32_t transport_kind);
Expand Down
1 change: 1 addition & 0 deletions src/cpp/rtps/transport/test_UDPv4Transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ bool test_UDPv4Transport::send(
{
if (packet_should_drop(send_buffer, send_buffer_size))
{
statistics_info_.set_statistics_message_data(remote_locator, send_buffer, send_buffer_size);
log_drop(send_buffer, send_buffer_size);
return true;
}
Expand Down
127 changes: 122 additions & 5 deletions src/cpp/statistics/rtps/StatisticsBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,44 @@ namespace eprosima {
namespace fastdds {
namespace statistics {

static void add_bytes(
Entity2LocatorTraffic& traffic,
const rtps::StatisticsSubmessageData::Sequence& distance)
{
uint64_t count = traffic.packet_count();
int16_t high = traffic.byte_magnitude_order();
uint64_t low = traffic.byte_count();

count += distance.sequence;
high += distance.bytes_high;
low += distance.bytes;
high += (low < traffic.byte_count());

traffic.packet_count(count);
traffic.byte_magnitude_order(high);
traffic.byte_count(low);
}

static void sub_bytes(
Entity2LocatorTraffic& traffic,
uint64_t bytes)
{
uint64_t count = traffic.packet_count();
int16_t high = traffic.byte_magnitude_order();
uint64_t low = traffic.byte_count();

if (count > 0)
{
count--;
low -= bytes;
high -= (low > traffic.byte_count());

traffic.packet_count(count);
traffic.byte_magnitude_order(high);
traffic.byte_count(low);
}
}

detail::Locator_s to_statistics_type(
fastrtps::rtps::Locator_t locator)
{
Expand Down Expand Up @@ -292,16 +330,25 @@ void StatisticsParticipantImpl::on_network_statistics(
const fastrtps::rtps::GuidPrefix_t& source_participant,
const fastrtps::rtps::Locator_t& source_locator,
const fastrtps::rtps::Locator_t& reception_locator,
const rtps::StatisticsSubmessageData& data)
const rtps::StatisticsSubmessageData& data,
uint64_t datagram_size)
{
static_cast<void>(source_participant);
static_cast<void>(reception_locator);
process_network_timestamp(source_locator, data.destination, data.ts);
process_network_sequence(source_participant, data.destination, data.seq, datagram_size);
}

void StatisticsParticipantImpl::process_network_timestamp(
const fastrtps::rtps::Locator_t& source_locator,
const fastrtps::rtps::Locator_t& reception_locator,
const rtps::StatisticsSubmessageData::TimeStamp& ts)
{
using namespace eprosima::fastrtps::rtps;

Time_t ts(data.ts.seconds, data.ts.fraction);
Time_t source_ts(ts.seconds, ts.fraction);
Time_t current_ts;
Time_t::now(current_ts);
auto latency = static_cast<float>((current_ts - ts).to_ns());
auto latency = static_cast<float>((current_ts - source_ts).to_ns());

Locator2LocatorData notification;
notification.src_locator(to_statistics_type(source_locator));
Expand All @@ -319,6 +366,76 @@ void StatisticsParticipantImpl::on_network_statistics(
});
}

void StatisticsParticipantImpl::process_network_sequence(
const fastrtps::rtps::GuidPrefix_t& source_participant,
const fastrtps::rtps::Locator_t& reception_locator,
const rtps::StatisticsSubmessageData::Sequence& seq,
uint64_t datagram_size)
{
lost_traffic_key key(source_participant, reception_locator);
bool should_notify = false;
Entity2LocatorTraffic notification;

{
std::lock_guard<std::recursive_mutex> lock(get_statistics_mutex());
lost_traffic_value& value = lost_traffic_[key];

if (value.first_sequence > seq.sequence)
{
// Datagrams before the first received one are ignored
return;
}

if (value.first_sequence == 0)
{
// This is the first time we receive a statistics sequence from source_participant on reception_locator
GUID_t guid(source_participant, ENTITYID_RTPSParticipant);
value.data.src_guid(to_statistics_type(guid));
value.data.dst_locator(to_statistics_type(reception_locator));
value.first_sequence = seq.sequence;
}
else
{
// We shouldn't receive the same sequence twice
assert(seq.sequence != value.seq_data.sequence);
// Detect discontinuity. We will only notify in that case
should_notify = seq.sequence != (value.seq_data.sequence + 1);
if (should_notify)
{
if (seq.sequence > value.seq_data.sequence)
{
// Received sequence is higher, data has been lost
add_bytes(value.data, rtps::StatisticsSubmessageData::Sequence::distance(value.seq_data, seq));
}

// We should never count the current received datagram
sub_bytes(value.data, datagram_size);

notification = value.data;
}
}

if (seq.sequence > value.seq_data.sequence)
{
value.seq_data = seq;
}
}

if (should_notify)
{
// Perform the callbacks
Data data;
// note that the setter sets RTPS_SENT by default
data.entity2locator_traffic(notification);
data._d(EventKind::RTPS_LOST);

for_each_listener([&data](const Key& listener)
{
listener->on_statistics_data(data);
});
}
}

void StatisticsParticipantImpl::on_rtps_sent(
const fastrtps::rtps::Locator_t& loc,
unsigned long payload_size)
Expand All @@ -334,7 +451,7 @@ void StatisticsParticipantImpl::on_rtps_sent(
{
std::lock_guard<std::recursive_mutex> lock(get_statistics_mutex());

auto& val = traffic[loc];
auto& val = traffic_[loc];
notification.packet_count(++val.packet_count);
notification.byte_count(val.byte_count += payload_size);
notification.byte_magnitude_order((int16_t)floor(log10(float(val.byte_count))));
Expand Down
44 changes: 41 additions & 3 deletions src/cpp/statistics/rtps/StatisticsBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,17 @@ class StatisticsParticipantImpl
unsigned long long byte_count = {};
};

std::map<fastrtps::rtps::Locator_t, rtps_sent_data> traffic;
std::map<fastrtps::rtps::Locator_t, rtps_sent_data> traffic_;

// RTPS_LOST ancillary
using lost_traffic_key = std::pair<fastrtps::rtps::GuidPrefix_t, fastrtps::rtps::Locator_t>;
struct lost_traffic_value
{
uint64_t first_sequence = 0;
Entity2LocatorTraffic data{};
rtps::StatisticsSubmessageData::Sequence seq_data{};
};
std::map<lost_traffic_key, lost_traffic_value> lost_traffic_;

// PDP_PACKETS ancillary
unsigned long long pdp_counter_ = {};
Expand Down Expand Up @@ -255,12 +265,38 @@ class StatisticsParticipantImpl
* @param [in] source_locator Locator indicating the sending address.
* @param [in] reception_locator Locator indicating the listening address.
* @param [in] data Statistics submessage received.
* @param [in] datagram_size The size in bytes of the received datagram.
*/
void on_network_statistics(
const fastrtps::rtps::GuidPrefix_t& source_participant,
const fastrtps::rtps::Locator_t& source_locator,
const fastrtps::rtps::Locator_t& reception_locator,
const rtps::StatisticsSubmessageData& data);
const rtps::StatisticsSubmessageData& data,
uint64_t datagram_size);

/*
* Process a received statistics submessage timestamp, informing of network latency.
* @param [in] source_locator Locator indicating the sending address.
* @param [in] reception_locator Locator indicating the listening address.
* @param [in] ts The timestamp of the statistics submessage received.
*/
void process_network_timestamp(
const fastrtps::rtps::Locator_t& source_locator,
const fastrtps::rtps::Locator_t& reception_locator,
const rtps::StatisticsSubmessageData::TimeStamp& ts);

/*
* Process a received statistics submessage sequence, informing of network loss.
* @param [in] source_participant GUID prefix of the participant sending the message.
* @param [in] reception_locator Locator indicating the listening address.
* @param [in] seq The sequencing info ot the statistics submessage received.
* @param [in] datagram_size The size in bytes of the received datagram.
*/
void process_network_sequence(
const fastrtps::rtps::GuidPrefix_t& source_participant,
const fastrtps::rtps::Locator_t& reception_locator,
const rtps::StatisticsSubmessageData::Sequence& seq,
uint64_t datagram_size);

/*
* Report a message that is sent by the participant
Expand Down Expand Up @@ -415,12 +451,14 @@ class StatisticsParticipantImpl
* @param [in] Locator indicating the sending address.
* @param [in] Locator indicating the listening address.
* @param [in] Statistics submessage received.
* @param [in] The size in bytes of the received datagram.
*/
inline void on_network_statistics(
const fastrtps::rtps::GuidPrefix_t&,
const fastrtps::rtps::Locator_t&,
const fastrtps::rtps::Locator_t&,
const rtps::StatisticsSubmessageData&)
const rtps::StatisticsSubmessageData&,
uint64_t)
{
}

Expand Down
Loading