Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize TCP reception to wait for RTCP header. [11562] #1957

Merged
merged 9 commits into from
May 14, 2021
1 change: 1 addition & 0 deletions src/cpp/rtps/transport/TCPChannelResourceBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ size_t TCPChannelResourceBasic::send(

if (eConnecting < connection_status_)
{
std::lock_guard<std::mutex> send_guard(send_mutex_);
if (header_size > 0)
{
std::array<asio::const_buffer, 2> buffers;
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/transport/TCPChannelResourceBasic.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef _FASTDDS_TCP_CHANNEL_RESOURCE_BASIC_
#define _FASTDDS_TCP_CHANNEL_RESOURCE_BASIC_

#include <mutex>
#include <asio.hpp>
#include <rtps/transport/TCPChannelResource.h>

Expand All @@ -25,6 +26,8 @@ namespace rtps {
class TCPChannelResourceBasic : public TCPChannelResource
{
asio::io_service& service_;

std::mutex send_mutex_;
std::shared_ptr<asio::ip::tcp::socket> socket_;

public:
Expand Down
200 changes: 121 additions & 79 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,69 @@ bool TCPTransportInterface::read_body(
return true;
}

bool receive_header(
std::shared_ptr<TCPChannelResource>& channel,
TCPHeader& tcp_header,
asio::error_code& ec)
{
// Cleanup header
octet* ptr = tcp_header.address();
char* begin = (char*)ptr;
memset(ptr, 0, sizeof(TCPHeader));

// Prepare read position
octet* read_pos = ptr;
size_t bytes_needed = 4; // TCPHeader::size();

// Wait for sync
while (bytes_needed > 0)
{
size_t bytes_read = channel->read(read_pos, bytes_needed, ec);
if (bytes_read > 0)
{
read_pos += bytes_read;
bytes_needed -= bytes_read;
if (0 == bytes_needed)
{
size_t skip =
(tcp_header.rtcp[0] != 'R') ? 1 :
(tcp_header.rtcp[1] != 'T') ? 2 :
(tcp_header.rtcp[2] != 'C') ? 3 :
(tcp_header.rtcp[3] != 'P') ? 4 : 0;

if (skip && (skip < 4))
{
memmove(ptr, &ptr[skip], 4 - skip);
}

read_pos -= skip;
bytes_needed = skip;
}
}
else if (ec)
{
return 0;
}
}

bytes_needed = TCPHeader::size() - 4;
while (bytes_needed > 0)
{
size_t bytes_read = channel->read(read_pos, bytes_needed, ec);
if (bytes_read > 0)
{
read_pos += bytes_read;
bytes_needed -= bytes_read;
}
else if (ec)
{
return 0;
}
}

return TCPHeader::size();
}

/**
* On TCP, we must receive the header (14 Bytes) and then,
* the rest of the message, whose length is on the header.
Expand All @@ -886,117 +949,96 @@ bool TCPTransportInterface::Receive(
TCPHeader tcp_header;
asio::error_code ec;

size_t bytes_received = channel->read(reinterpret_cast<octet*>(&tcp_header),
TCPHeader::size(), ec);

remote_locator = channel->locator();

if (bytes_received != TCPHeader::size())
bool header_found = false;

do
{
if (bytes_received > 0)
{
logError(RTCP_MSG_IN, "Bad TCP header size: " << bytes_received << " (expected: : "
<< TCPHeader::size() << ")" << ec.message());
close_tcp_socket(channel);
}
else if (ec)
{
logWarning(DEBUG, "Error reading TCP header: " << ec.message());
close_tcp_socket(channel);
}
header_found = receive_header(channel, tcp_header, ec);
} while (!header_found && !ec);

if (ec)
{
logWarning(DEBUG, "Error reading TCP header: " << ec.message());
close_tcp_socket(channel);
success = false;
}
else
{
// Check RTPC Header
if (tcp_header.rtcp[0] != 'R'
|| tcp_header.rtcp[1] != 'T'
|| tcp_header.rtcp[2] != 'C'
|| tcp_header.rtcp[3] != 'P')
size_t body_size = tcp_header.length - static_cast<uint32_t>(TCPHeader::size());

if (body_size > receive_buffer_capacity)
{
logError(RTCP_MSG_IN, "Bad RTCP header identifier, closing connection.");
close_tcp_socket(channel);
logError(RTCP_MSG_IN, "Size of incoming TCP message is bigger than buffer capacity: "
<< static_cast<uint32_t>(body_size) << " vs. " << receive_buffer_capacity << ". "
<< "The full message will be dropped.");
success = false;
// Drop the message
size_t to_read = body_size;
size_t read_block = receive_buffer_capacity;
uint32_t readed;
while (read_block > 0)
{
read_body(receive_buffer, receive_buffer_capacity, &readed, channel,
read_block);
to_read -= readed;
read_block = (to_read >= receive_buffer_capacity) ? receive_buffer_capacity : to_read;
}
}
else
{
size_t body_size = tcp_header.length - static_cast<uint32_t>(TCPHeader::size());
logInfo(RTCP_MSG_IN, "Received RTCP MSG. Logical Port " << tcp_header.logical_port);
success = read_body(receive_buffer, receive_buffer_capacity, &receive_buffer_size,
channel, body_size);

if (body_size > receive_buffer_capacity)
if (success)
{
logError(RTCP_MSG_IN, "Size of incoming TCP message is bigger than buffer capacity: "
<< static_cast<uint32_t>(body_size) << " vs. " << receive_buffer_capacity << ". "
<< "The full message will be dropped.");
success = false;
// Drop the message
size_t to_read = body_size;
size_t read_block = receive_buffer_capacity;
uint32_t readed;
while (read_block > 0)
if (configuration()->check_crc
&& !check_crc(tcp_header, receive_buffer, receive_buffer_size))
{
read_body(receive_buffer, receive_buffer_capacity, &readed, channel,
read_block);
to_read -= readed;
read_block = (to_read >= receive_buffer_capacity) ? receive_buffer_capacity : to_read;
logWarning(RTCP_MSG_IN, "Bad TCP header CRC");
}
}
else
{
logInfo(RTCP_MSG_IN, "Received RTCP MSG. Logical Port " << tcp_header.logical_port);
success = read_body(receive_buffer, receive_buffer_capacity, &receive_buffer_size,
channel, body_size);

if (success)
if (tcp_header.logical_port == 0)
{
if (configuration()->check_crc
&& !check_crc(tcp_header, receive_buffer, receive_buffer_size))
std::shared_ptr<RTCPMessageManager> rtcp_message_manager;
if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status())

{
logWarning(RTCP_MSG_IN, "Bad TCP header CRC");
std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_);
rtcp_message_manager = rtcp_manager.lock();
}

if (tcp_header.logical_port == 0)
if (rtcp_message_manager)
{
std::shared_ptr<RTCPMessageManager> rtcp_message_manager;
if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status())

{
std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_);
rtcp_message_manager = rtcp_manager.lock();
}
// The channel is not going to be deleted because we lock it for reading.
ResponseCode responseCode = rtcp_message_manager->processRTCPMessage(
channel, receive_buffer, body_size);

if (rtcp_message_manager)
if (responseCode != RETCODE_OK)
{
// The channel is not going to be deleted because we lock it for reading.
ResponseCode responseCode = rtcp_message_manager->processRTCPMessage(
channel, receive_buffer, body_size);

if (responseCode != RETCODE_OK)
{
close_tcp_socket(channel);
}
success = false;

std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_);
rtcp_message_manager.reset();
rtcp_message_manager_cv_.notify_one();
}
else
{
success = false;
close_tcp_socket(channel);
}
success = false;

std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_);
rtcp_message_manager.reset();
rtcp_message_manager_cv_.notify_one();
}
else
{
IPLocator::setLogicalPort(remote_locator, tcp_header.logical_port);
logInfo(RTCP_MSG_IN, "[RECEIVE] From: " << remote_locator \
<< " - " << receive_buffer_size << " bytes.");
success = false;
close_tcp_socket(channel);
}

}
else
{
IPLocator::setLogicalPort(remote_locator, tcp_header.logical_port);
logInfo(RTCP_MSG_IN, "[RECEIVE] From: " << remote_locator \
<< " - " << receive_buffer_size << " bytes.");
}
// Error message already shown by read_body method.
}
// Error message already shown by read_body method.
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/tcp/RTCPHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct TCPHeader
uint16_t logical_port;

TCPHeader()
: length(sizeof(TCPHeader))
: length(TCPHEADER_SIZE)
, crc(0)
, logical_port(0)
{
Expand Down
Loading