Skip to content

Commit

Permalink
Synchronize TCP reception to wait for RTCP header (#1957)
Browse files Browse the repository at this point in the history
* Refs 9141. Add unit test for TCP reception of out of sequence header.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 9141. New mutex to protect asio::write calls on TCP.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 9141. Synchronize reception to RTCP.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Fix warning.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Do not show warning on EOF error.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Tests improvements.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Fix number of bytes to skip.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
MiguelCompany authored May 14, 2021
1 parent 6afc1e4 commit 5a23fc4
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 77 deletions.
1 change: 1 addition & 0 deletions src/cpp/rtps/transport/TCPChannelResourceBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ size_t TCPChannelResourceBasic::send(

if (eConnecting < connection_status_)
{
std::lock_guard<std::mutex> send_guard(send_mutex_);
if (header_size > 0)
{
std::array<asio::const_buffer, 2> buffers;
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResourceBasic.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef _FASTDDS_TCP_CHANNEL_RESOURCE_BASIC_
#define _FASTDDS_TCP_CHANNEL_RESOURCE_BASIC_

#include <mutex>
#include <asio.hpp>
#include <rtps/transport/TCPChannelResource.h>

Expand All @@ -25,6 +26,8 @@ namespace rtps {
class TCPChannelResourceBasic : public TCPChannelResource
{
asio::io_service& service_;

std::mutex send_mutex_;
std::shared_ptr<asio::ip::tcp::socket> socket_;

public:
Expand Down
194 changes: 119 additions & 75 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,68 @@ bool TCPTransportInterface::read_body(
return true;
}

bool receive_header(
std::shared_ptr<TCPChannelResource>& channel,
TCPHeader& tcp_header,
asio::error_code& ec)
{
// Cleanup header
octet* ptr = tcp_header.address();
memset(ptr, 0, sizeof(TCPHeader));

// Prepare read position
octet* read_pos = ptr;
size_t bytes_needed = 4;

// Wait for sync
while (bytes_needed > 0)
{
size_t bytes_read = channel->read(read_pos, bytes_needed, ec);
if (bytes_read > 0)
{
read_pos += bytes_read;
bytes_needed -= bytes_read;
if (0 == bytes_needed)
{
size_t skip = // Text Next possible match Skip to next match
(tcp_header.rtcp[0] != 'R') ? 1 : // X--- XRTCP 1
(tcp_header.rtcp[1] != 'T') ? 1 : // RX-- RRTCP 1
(tcp_header.rtcp[2] != 'C') ? 2 : // RTX- RTRTCP 2
(tcp_header.rtcp[3] != 'P') ? 3 : 0; // RTCX RTCRTCP 3

if (skip)
{
memmove(ptr, &ptr[skip], 4 - skip);
}

read_pos -= skip;
bytes_needed = skip;
}
}
else if (ec)
{
return false;
}
}

bytes_needed = TCPHeader::size() - 4;
while (bytes_needed > 0)
{
size_t bytes_read = channel->read(read_pos, bytes_needed, ec);
if (bytes_read > 0)
{
read_pos += bytes_read;
bytes_needed -= bytes_read;
}
else if (ec)
{
return false;
}
}

return true;
}

/**
* On TCP, we must receive the header (14 Bytes) and then,
* the rest of the message, whose length is on the header.
Expand All @@ -886,117 +948,99 @@ bool TCPTransportInterface::Receive(
TCPHeader tcp_header;
asio::error_code ec;

size_t bytes_received = channel->read(reinterpret_cast<octet*>(&tcp_header),
TCPHeader::size(), ec);
bool header_found = false;

remote_locator = channel->locator();
do
{
header_found = receive_header(channel, tcp_header, ec);
} while (!header_found && !ec);

if (bytes_received != TCPHeader::size())
if (ec)
{
if (bytes_received > 0)
{
logError(RTCP_MSG_IN, "Bad TCP header size: " << bytes_received << " (expected: : "
<< TCPHeader::size() << ")" << ec.message());
close_tcp_socket(channel);
}
else if (ec)
if (ec != asio::error::eof)
{
logWarning(DEBUG, "Error reading TCP header: " << ec.message());
close_tcp_socket(channel);
}

close_tcp_socket(channel);
success = false;
}
else
{
// Check RTPC Header
if (tcp_header.rtcp[0] != 'R'
|| tcp_header.rtcp[1] != 'T'
|| tcp_header.rtcp[2] != 'C'
|| tcp_header.rtcp[3] != 'P')
size_t body_size = tcp_header.length - static_cast<uint32_t>(TCPHeader::size());

if (body_size > receive_buffer_capacity)
{
logError(RTCP_MSG_IN, "Bad RTCP header identifier, closing connection.");
close_tcp_socket(channel);
logError(RTCP_MSG_IN, "Size of incoming TCP message is bigger than buffer capacity: "
<< static_cast<uint32_t>(body_size) << " vs. " << receive_buffer_capacity << ". "
<< "The full message will be dropped.");
success = false;
// Drop the message
size_t to_read = body_size;
size_t read_block = receive_buffer_capacity;
uint32_t readed;
while (read_block > 0)
{
read_body(receive_buffer, receive_buffer_capacity, &readed, channel,
read_block);
to_read -= readed;
read_block = (to_read >= receive_buffer_capacity) ? receive_buffer_capacity : to_read;
}
}
else
{
size_t body_size = tcp_header.length - static_cast<uint32_t>(TCPHeader::size());
logInfo(RTCP_MSG_IN, "Received RTCP MSG. Logical Port " << tcp_header.logical_port);
success = read_body(receive_buffer, receive_buffer_capacity, &receive_buffer_size,
channel, body_size);

if (body_size > receive_buffer_capacity)
if (success)
{
logError(RTCP_MSG_IN, "Size of incoming TCP message is bigger than buffer capacity: "
<< static_cast<uint32_t>(body_size) << " vs. " << receive_buffer_capacity << ". "
<< "The full message will be dropped.");
success = false;
// Drop the message
size_t to_read = body_size;
size_t read_block = receive_buffer_capacity;
uint32_t readed;
while (read_block > 0)
if (configuration()->check_crc
&& !check_crc(tcp_header, receive_buffer, receive_buffer_size))
{
read_body(receive_buffer, receive_buffer_capacity, &readed, channel,
read_block);
to_read -= readed;
read_block = (to_read >= receive_buffer_capacity) ? receive_buffer_capacity : to_read;
logWarning(RTCP_MSG_IN, "Bad TCP header CRC");
}
}
else
{
logInfo(RTCP_MSG_IN, "Received RTCP MSG. Logical Port " << tcp_header.logical_port);
success = read_body(receive_buffer, receive_buffer_capacity, &receive_buffer_size,
channel, body_size);

if (success)
if (tcp_header.logical_port == 0)
{
if (configuration()->check_crc
&& !check_crc(tcp_header, receive_buffer, receive_buffer_size))
std::shared_ptr<RTCPMessageManager> rtcp_message_manager;
if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status())

{
logWarning(RTCP_MSG_IN, "Bad TCP header CRC");
std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_);
rtcp_message_manager = rtcp_manager.lock();
}

if (tcp_header.logical_port == 0)
if (rtcp_message_manager)
{
std::shared_ptr<RTCPMessageManager> rtcp_message_manager;
if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status())

{
std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_);
rtcp_message_manager = rtcp_manager.lock();
}
// The channel is not going to be deleted because we lock it for reading.
ResponseCode responseCode = rtcp_message_manager->processRTCPMessage(
channel, receive_buffer, body_size);

if (rtcp_message_manager)
if (responseCode != RETCODE_OK)
{
// The channel is not going to be deleted because we lock it for reading.
ResponseCode responseCode = rtcp_message_manager->processRTCPMessage(
channel, receive_buffer, body_size);

if (responseCode != RETCODE_OK)
{
close_tcp_socket(channel);
}
success = false;

std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_);
rtcp_message_manager.reset();
rtcp_message_manager_cv_.notify_one();
}
else
{
success = false;
close_tcp_socket(channel);
}
success = false;

std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_);
rtcp_message_manager.reset();
rtcp_message_manager_cv_.notify_one();
}
else
{
IPLocator::setLogicalPort(remote_locator, tcp_header.logical_port);
logInfo(RTCP_MSG_IN, "[RECEIVE] From: " << remote_locator \
<< " - " << receive_buffer_size << " bytes.");
success = false;
close_tcp_socket(channel);
}

}
else
{
IPLocator::setLogicalPort(remote_locator, tcp_header.logical_port);
logInfo(RTCP_MSG_IN, "[RECEIVE] From: " << remote_locator \
<< " - " << receive_buffer_size << " bytes.");
}
// Error message already shown by read_body method.
}
// Error message already shown by read_body method.
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/tcp/RTCPHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct TCPHeader
uint16_t logical_port;

TCPHeader()
: length(sizeof(TCPHeader))
: length(TCPHEADER_SIZE)
, crc(0)
, logical_port(0)
{
Expand Down
Loading

0 comments on commit 5a23fc4

Please sign in to comment.