Skip to content

Commit

Permalink
Synchronize TCP reception to wait for RTCP header (#1963)
Browse files Browse the repository at this point in the history
* Synchronize TCP reception to wait for RTCP header (#1957)

* Refs 9141. Add unit test for TCP reception of out of sequence header.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 9141. New mutex to protect asio::write calls on TCP.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 9141. Synchronize reception to RTCP.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Fix warning.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Do not show warning on EOF error.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Tests improvements.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Fix number of bytes to skip.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11562. Linters.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
(cherry picked from commit 5a23fc4)

# Conflicts:
#	test/unittest/transport/TCPv4Tests.cpp

* Fixing rebase conflicts.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Uncrustify

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
(cherry picked from commit 10aa9cf)
  • Loading branch information
mergify[bot] authored and mergify-bot committed May 20, 2021
1 parent 0bcbe14 commit 79bb333
Show file tree
Hide file tree
Showing 5 changed files with 647 additions and 396 deletions.
56 changes: 33 additions & 23 deletions include/fastdds/rtps/transport/TCPChannelResourceBasic.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,36 @@
#ifndef _FASTDDS_TCP_CHANNEL_RESOURCE_BASIC_
#define _FASTDDS_TCP_CHANNEL_RESOURCE_BASIC_

#include <mutex>
#include <asio.hpp>
#include <fastdds/rtps/transport/TCPChannelResource.h>

namespace eprosima{
namespace fastdds{
namespace rtps{
namespace eprosima {
namespace fastdds {
namespace rtps {

class TCPChannelResourceBasic : public TCPChannelResource
{
asio::io_service& service_;

std::mutex send_mutex_;
std::shared_ptr<asio::ip::tcp::socket> socket_;

public:

// Constructor called when trying to connect to a remote server
TCPChannelResourceBasic(
TCPTransportInterface* parent,
asio::io_service& service,
const fastrtps::rtps::Locator_t& locator,
uint32_t maxMsgSize);
TCPTransportInterface* parent,
asio::io_service& service,
const fastrtps::rtps::Locator_t& locator,
uint32_t maxMsgSize);

// Constructor called when local server accepted connection
TCPChannelResourceBasic(
TCPTransportInterface* parent,
asio::io_service& service,
std::shared_ptr<asio::ip::tcp::socket> socket,
uint32_t maxMsgSize);
TCPTransportInterface* parent,
asio::io_service& service,
std::shared_ptr<asio::ip::tcp::socket> socket,
uint32_t maxMsgSize);

virtual ~TCPChannelResourceBasic();

Expand All @@ -49,34 +54,39 @@ class TCPChannelResourceBasic : public TCPChannelResource
void disconnect() override;

uint32_t read(
fastrtps::rtps::octet* buffer,
std::size_t size,
asio::error_code& ec) override;
fastrtps::rtps::octet* buffer,
std::size_t size,
asio::error_code& ec) override;

size_t send(
const fastrtps::rtps::octet* header,
size_t header_size,
const fastrtps::rtps::octet* data,
size_t size,
asio::error_code& ec) override;
const fastrtps::rtps::octet* header,
size_t header_size,
const fastrtps::rtps::octet* data,
size_t size,
asio::error_code& ec) override;

asio::ip::tcp::endpoint remote_endpoint() const override;
asio::ip::tcp::endpoint local_endpoint() const override;

void set_options(const TCPTransportDescriptor* options) override;
void set_options(
const TCPTransportDescriptor* options) override;

void cancel() override;
void close() override;
void shutdown(asio::socket_base::shutdown_type what) override;
void shutdown(
asio::socket_base::shutdown_type what) override;

inline std::shared_ptr<asio::ip::tcp::socket> socket()
{
return socket_;
}

private:
TCPChannelResourceBasic(const TCPChannelResourceBasic&) = delete;
TCPChannelResourceBasic& operator=(const TCPChannelResourceBasic&) = delete;

TCPChannelResourceBasic(
const TCPChannelResourceBasic&) = delete;
TCPChannelResourceBasic& operator =(
const TCPChannelResourceBasic&) = delete;
};


Expand Down
64 changes: 42 additions & 22 deletions include/fastdds/rtps/transport/tcp/RTCPHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

#include <fastcdr/exceptions/BadParamException.h>

namespace eprosima{
namespace fastdds{
namespace rtps{
namespace eprosima {
namespace fastdds {
namespace rtps {

#define TCPHEADER_SIZE 14

Expand All @@ -37,7 +37,7 @@ struct TCPHeader
uint16_t logical_port;

TCPHeader()
: length(sizeof(TCPHeader))
: length(TCPHEADER_SIZE)
, crc(0)
, logical_port(0)
{
Expand Down Expand Up @@ -67,6 +67,7 @@ struct TCPHeader
{
return TCPHEADER_SIZE;
}

};

union TCPTransactionId
Expand All @@ -84,7 +85,7 @@ union TCPTransactionId
memcpy(ints, t.ints, 3 * sizeof(uint32_t));
}

TCPTransactionId& operator++()
TCPTransactionId& operator ++()
{
if (ints[0] == 0xffffffff)
{
Expand All @@ -111,65 +112,76 @@ union TCPTransactionId
return *this;
}

TCPTransactionId operator++(int)
TCPTransactionId operator ++(
int)
{
TCPTransactionId prev = *this;
++(*this);
return prev;
}


TCPTransactionId& operator=(const TCPTransactionId& t)
TCPTransactionId& operator =(
const TCPTransactionId& t)
{
memcpy(ints, t.ints, 3 * sizeof(uint32_t));
return *this;
}

TCPTransactionId& operator=(const fastrtps::rtps::octet* id)
TCPTransactionId& operator =(
const fastrtps::rtps::octet* id)
{
memcpy(octets, id, 12 * sizeof(fastrtps::rtps::octet));
return *this;
}

TCPTransactionId& operator=(const char* id)
TCPTransactionId& operator =(
const char* id)
{
memcpy(octets, id, 12 * sizeof(fastrtps::rtps::octet));
return *this;
}

TCPTransactionId& operator=(const uint32_t* id)
TCPTransactionId& operator =(
const uint32_t* id)
{
memcpy(ints, id, 3 * sizeof(uint32_t));
return *this;
}

TCPTransactionId& operator=(uint32_t id)
TCPTransactionId& operator =(
uint32_t id)
{
ints[0] = id;
ints[1] = 0;
ints[2] = 0;
return *this;
}

TCPTransactionId& operator=(uint64_t id)
TCPTransactionId& operator =(
uint64_t id)
{
memset(ints, 0, sizeof(uint32_t) * 3);
memcpy(ints, &id, sizeof(uint64_t));
return *this;
}

bool operator==(const TCPTransactionId& t) const
bool operator ==(
const TCPTransactionId& t) const
{
return memcmp(ints, t.ints, 3 * sizeof(uint32_t)) == 0;
}

bool operator<(const TCPTransactionId& t) const
bool operator <(
const TCPTransactionId& t) const
{
return memcmp(ints, t.ints, 3 * sizeof(uint32_t)) < 0;
}

};

inline std::ostream& operator<<(std::ostream& output,const TCPTransactionId& t)
inline std::ostream& operator <<(
std::ostream& output,
const TCPTransactionId& t)
{
bool printed = false; // Don't skip cases like 99 0 34
for (int i = 2; i >= 0; --i)
Expand Down Expand Up @@ -205,14 +217,16 @@ class TCPControlMsgHeader
TCPTransactionId transaction_id_; // 12 bytes

public:

TCPControlMsgHeader()
{
kind_ = static_cast<TCPCPMKind>(0x00);
flags_ = static_cast<fastrtps::rtps::octet>(0x00);
length_ = 0;
}

void kind(TCPCPMKind kind)
void kind(
TCPCPMKind kind)
{
kind_ = kind;
}
Expand All @@ -227,7 +241,8 @@ class TCPControlMsgHeader
return kind_;
}

void length(uint16_t length)
void length(
uint16_t length)
{
length_ = length;
}
Expand All @@ -242,7 +257,8 @@ class TCPControlMsgHeader
return length_;
}

void transaction_id(TCPTransactionId transaction_id)
void transaction_id(
TCPTransactionId transaction_id)
{
transaction_id_ = transaction_id;
}
Expand All @@ -269,7 +285,8 @@ class TCPControlMsgHeader
flags_ = e | p | r;
}

void endianess(fastrtps::rtps::Endianness_t endianess)
void endianess(
fastrtps::rtps::Endianness_t endianess)
{
// Endianess flag has inverse logic than Endianness_t :-/
if (endianess == fastrtps::rtps::Endianness_t::BIGEND)
Expand All @@ -282,7 +299,8 @@ class TCPControlMsgHeader
}
}

void payload(bool payload)
void payload(
bool payload)
{
if (payload)
{
Expand All @@ -294,7 +312,8 @@ class TCPControlMsgHeader
}
}

void requires_response(bool requires_response)
void requires_response(
bool requires_response)
{
if (requires_response)
{
Expand Down Expand Up @@ -325,6 +344,7 @@ class TCPControlMsgHeader
{
return 16;
}

};


Expand Down
43 changes: 22 additions & 21 deletions src/cpp/rtps/transport/TCPChannelResourceBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ void TCPChannelResourceBasic::connect(
, ip::tcp::endpoint
#else
, ip::tcp::resolver::iterator
#endif
#endif // if ASIO_VERSION >= 101200
)
{
if (!channel_weak_ptr.expired())
{
parent_->SocketConnected(channel_weak_ptr, ec);
}
}
{
if (!channel_weak_ptr.expired())
{
parent_->SocketConnected(channel_weak_ptr, ec);
}
}
);
}
catch (const std::system_error& error)
Expand All @@ -108,23 +108,23 @@ void TCPChannelResourceBasic::disconnect()
auto socket = socket_;

service_.post([&, socket]()
{
try
{
try
{
std::error_code ec;
socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
socket->cancel();
std::error_code ec;
socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
socket->cancel();

// This method was added on the version 1.12.0
// This method was added on the version 1.12.0
#if ASIO_VERSION >= 101200 && (!defined(_WIN32_WINNT) || _WIN32_WINNT >= 0x0603)
socket->release();
#endif
socket->close();
}
catch (std::exception&)
{
}
});
socket->release();
#endif // if ASIO_VERSION >= 101200 && (!defined(_WIN32_WINNT) || _WIN32_WINNT >= 0x0603)
socket->close();
}
catch (std::exception&)
{
}
});

}
}
Expand Down Expand Up @@ -155,6 +155,7 @@ size_t TCPChannelResourceBasic::send(

if (eConnecting < connection_status_)
{
std::lock_guard<std::mutex> send_guard(send_mutex_);
if (header_size > 0)
{
std::array<asio::const_buffer, 2> buffers;
Expand Down
Loading

0 comments on commit 79bb333

Please sign in to comment.