From d9dac634efd6a80a7b8e16dd5f0fdeb875d2d163 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 21 May 2021 08:35:56 +0200 Subject: [PATCH] Synchronize TCP reception to wait for RTCP header (#1980) * Synchronize TCP reception to wait for RTCP header (#1957) * Refs 9141. Add unit test for TCP reception of out of sequence header. Signed-off-by: Miguel Company * Refs 9141. New mutex to protect asio::write calls on TCP. Signed-off-by: Miguel Company * Refs 9141. Synchronize reception to RTCP. Signed-off-by: Miguel Company * Refs 11562. Linters. Signed-off-by: Miguel Company * Refs 11562. Fix warning. Signed-off-by: Miguel Company * Refs 11562. Do not show warning on EOF error. Signed-off-by: Miguel Company * Refs 11562. Tests improvements. Signed-off-by: Miguel Company * Refs 11562. Fix number of bytes to skip. Signed-off-by: Miguel Company * Refs 11562. Linters. Signed-off-by: Miguel Company (cherry picked from commit 5a23fc4ba508db4f2f6a9debfc3b8b66c1492e46) # Conflicts: # test/unittest/transport/TCPv4Tests.cpp * Fixing rebase conflicts. Signed-off-by: Miguel Company * Uncrustify Signed-off-by: Miguel Company Co-authored-by: Miguel Company (cherry picked from commit 10aa9cf525471aecfa24feef88dd55d83d064cbe) Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .../rtps/transport/TCPChannelResourceBasic.h | 56 +- .../fastdds/rtps/transport/tcp/RTCPHeader.h | 64 +- .../transport/TCPChannelResourceBasic.cpp | 43 +- .../rtps/transport/TCPTransportInterface.cpp | 194 +++-- test/unittest/transport/TCPv4Tests.cpp | 686 +++++++++++------- 5 files changed, 647 insertions(+), 396 deletions(-) diff --git a/include/fastdds/rtps/transport/TCPChannelResourceBasic.h b/include/fastdds/rtps/transport/TCPChannelResourceBasic.h index dead090496a..3c4f56f4aab 100644 --- a/include/fastdds/rtps/transport/TCPChannelResourceBasic.h +++ b/include/fastdds/rtps/transport/TCPChannelResourceBasic.h @@ -15,31 +15,36 @@ #ifndef _FASTDDS_TCP_CHANNEL_RESOURCE_BASIC_ #define _FASTDDS_TCP_CHANNEL_RESOURCE_BASIC_ +#include #include #include -namespace eprosima{ -namespace fastdds{ -namespace rtps{ +namespace eprosima { +namespace fastdds { +namespace rtps { class TCPChannelResourceBasic : public TCPChannelResource { asio::io_service& service_; + + std::mutex send_mutex_; std::shared_ptr socket_; + public: + // Constructor called when trying to connect to a remote server TCPChannelResourceBasic( - TCPTransportInterface* parent, - asio::io_service& service, - const fastrtps::rtps::Locator_t& locator, - uint32_t maxMsgSize); + TCPTransportInterface* parent, + asio::io_service& service, + const fastrtps::rtps::Locator_t& locator, + uint32_t maxMsgSize); // Constructor called when local server accepted connection TCPChannelResourceBasic( - TCPTransportInterface* parent, - asio::io_service& service, - std::shared_ptr socket, - uint32_t maxMsgSize); + TCPTransportInterface* parent, + asio::io_service& service, + std::shared_ptr socket, + uint32_t maxMsgSize); virtual ~TCPChannelResourceBasic(); @@ -49,25 +54,27 @@ class TCPChannelResourceBasic : public TCPChannelResource void disconnect() override; uint32_t read( - fastrtps::rtps::octet* buffer, - std::size_t size, - asio::error_code& ec) override; + fastrtps::rtps::octet* buffer, + std::size_t size, + asio::error_code& ec) override; size_t send( - const fastrtps::rtps::octet* header, - size_t header_size, - const fastrtps::rtps::octet* data, - size_t size, - asio::error_code& ec) override; + const fastrtps::rtps::octet* header, + size_t header_size, + const fastrtps::rtps::octet* data, + size_t size, + asio::error_code& ec) override; asio::ip::tcp::endpoint remote_endpoint() const override; asio::ip::tcp::endpoint local_endpoint() const override; - void set_options(const TCPTransportDescriptor* options) override; + void set_options( + const TCPTransportDescriptor* options) override; void cancel() override; void close() override; - void shutdown(asio::socket_base::shutdown_type what) override; + void shutdown( + asio::socket_base::shutdown_type what) override; inline std::shared_ptr socket() { @@ -75,8 +82,11 @@ class TCPChannelResourceBasic : public TCPChannelResource } private: - TCPChannelResourceBasic(const TCPChannelResourceBasic&) = delete; - TCPChannelResourceBasic& operator=(const TCPChannelResourceBasic&) = delete; + + TCPChannelResourceBasic( + const TCPChannelResourceBasic&) = delete; + TCPChannelResourceBasic& operator =( + const TCPChannelResourceBasic&) = delete; }; diff --git a/include/fastdds/rtps/transport/tcp/RTCPHeader.h b/include/fastdds/rtps/transport/tcp/RTCPHeader.h index 932cc3a6592..edf0eeb887b 100644 --- a/include/fastdds/rtps/transport/tcp/RTCPHeader.h +++ b/include/fastdds/rtps/transport/tcp/RTCPHeader.h @@ -22,9 +22,9 @@ #include -namespace eprosima{ -namespace fastdds{ -namespace rtps{ +namespace eprosima { +namespace fastdds { +namespace rtps { #define TCPHEADER_SIZE 14 @@ -37,7 +37,7 @@ struct TCPHeader uint16_t logical_port; TCPHeader() - : length(sizeof(TCPHeader)) + : length(TCPHEADER_SIZE) , crc(0) , logical_port(0) { @@ -67,6 +67,7 @@ struct TCPHeader { return TCPHEADER_SIZE; } + }; union TCPTransactionId @@ -84,7 +85,7 @@ union TCPTransactionId memcpy(ints, t.ints, 3 * sizeof(uint32_t)); } - TCPTransactionId& operator++() + TCPTransactionId& operator ++() { if (ints[0] == 0xffffffff) { @@ -111,39 +112,44 @@ union TCPTransactionId return *this; } - TCPTransactionId operator++(int) + TCPTransactionId operator ++( + int) { TCPTransactionId prev = *this; ++(*this); return prev; } - - TCPTransactionId& operator=(const TCPTransactionId& t) + TCPTransactionId& operator =( + const TCPTransactionId& t) { memcpy(ints, t.ints, 3 * sizeof(uint32_t)); return *this; } - TCPTransactionId& operator=(const fastrtps::rtps::octet* id) + TCPTransactionId& operator =( + const fastrtps::rtps::octet* id) { memcpy(octets, id, 12 * sizeof(fastrtps::rtps::octet)); return *this; } - TCPTransactionId& operator=(const char* id) + TCPTransactionId& operator =( + const char* id) { memcpy(octets, id, 12 * sizeof(fastrtps::rtps::octet)); return *this; } - TCPTransactionId& operator=(const uint32_t* id) + TCPTransactionId& operator =( + const uint32_t* id) { memcpy(ints, id, 3 * sizeof(uint32_t)); return *this; } - TCPTransactionId& operator=(uint32_t id) + TCPTransactionId& operator =( + uint32_t id) { ints[0] = id; ints[1] = 0; @@ -151,25 +157,31 @@ union TCPTransactionId return *this; } - TCPTransactionId& operator=(uint64_t id) + TCPTransactionId& operator =( + uint64_t id) { memset(ints, 0, sizeof(uint32_t) * 3); memcpy(ints, &id, sizeof(uint64_t)); return *this; } - bool operator==(const TCPTransactionId& t) const + bool operator ==( + const TCPTransactionId& t) const { return memcmp(ints, t.ints, 3 * sizeof(uint32_t)) == 0; } - bool operator<(const TCPTransactionId& t) const + bool operator <( + const TCPTransactionId& t) const { return memcmp(ints, t.ints, 3 * sizeof(uint32_t)) < 0; } + }; -inline std::ostream& operator<<(std::ostream& output,const TCPTransactionId& t) +inline std::ostream& operator <<( + std::ostream& output, + const TCPTransactionId& t) { bool printed = false; // Don't skip cases like 99 0 34 for (int i = 2; i >= 0; --i) @@ -205,6 +217,7 @@ class TCPControlMsgHeader TCPTransactionId transaction_id_; // 12 bytes public: + TCPControlMsgHeader() { kind_ = static_cast(0x00); @@ -212,7 +225,8 @@ class TCPControlMsgHeader length_ = 0; } - void kind(TCPCPMKind kind) + void kind( + TCPCPMKind kind) { kind_ = kind; } @@ -227,7 +241,8 @@ class TCPControlMsgHeader return kind_; } - void length(uint16_t length) + void length( + uint16_t length) { length_ = length; } @@ -242,7 +257,8 @@ class TCPControlMsgHeader return length_; } - void transaction_id(TCPTransactionId transaction_id) + void transaction_id( + TCPTransactionId transaction_id) { transaction_id_ = transaction_id; } @@ -269,7 +285,8 @@ class TCPControlMsgHeader flags_ = e | p | r; } - void endianess(fastrtps::rtps::Endianness_t endianess) + void endianess( + fastrtps::rtps::Endianness_t endianess) { // Endianess flag has inverse logic than Endianness_t :-/ if (endianess == fastrtps::rtps::Endianness_t::BIGEND) @@ -282,7 +299,8 @@ class TCPControlMsgHeader } } - void payload(bool payload) + void payload( + bool payload) { if (payload) { @@ -294,7 +312,8 @@ class TCPControlMsgHeader } } - void requires_response(bool requires_response) + void requires_response( + bool requires_response) { if (requires_response) { @@ -325,6 +344,7 @@ class TCPControlMsgHeader { return 16; } + }; diff --git a/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp b/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp index 187089f8319..c4a2019de28 100644 --- a/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp +++ b/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp @@ -84,14 +84,14 @@ void TCPChannelResourceBasic::connect( , ip::tcp::endpoint #else , ip::tcp::resolver::iterator -#endif +#endif // if ASIO_VERSION >= 101200 ) - { - if (!channel_weak_ptr.expired()) - { - parent_->SocketConnected(channel_weak_ptr, ec); - } - } + { + if (!channel_weak_ptr.expired()) + { + parent_->SocketConnected(channel_weak_ptr, ec); + } + } ); } catch (const std::system_error& error) @@ -108,23 +108,23 @@ void TCPChannelResourceBasic::disconnect() auto socket = socket_; service_.post([&, socket]() + { + try { - try - { - std::error_code ec; - socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec); - socket->cancel(); + std::error_code ec; + socket->shutdown(asio::ip::tcp::socket::shutdown_both, ec); + socket->cancel(); - // This method was added on the version 1.12.0 + // This method was added on the version 1.12.0 #if ASIO_VERSION >= 101200 && (!defined(_WIN32_WINNT) || _WIN32_WINNT >= 0x0603) - socket->release(); -#endif - socket->close(); - } - catch (std::exception&) - { - } - }); + socket->release(); +#endif // if ASIO_VERSION >= 101200 && (!defined(_WIN32_WINNT) || _WIN32_WINNT >= 0x0603) + socket->close(); + } + catch (std::exception&) + { + } + }); } } @@ -155,6 +155,7 @@ size_t TCPChannelResourceBasic::send( if (eConnecting < connection_status_) { + std::lock_guard send_guard(send_mutex_); if (header_size > 0) { std::array buffers; diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 5899cb65f66..53533b515cd 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -838,6 +838,68 @@ bool TCPTransportInterface::read_body( return true; } +bool receive_header( + std::shared_ptr& channel, + TCPHeader& tcp_header, + asio::error_code& ec) +{ + // Cleanup header + octet* ptr = tcp_header.address(); + memset(ptr, 0, sizeof(TCPHeader)); + + // Prepare read position + octet* read_pos = ptr; + size_t bytes_needed = 4; + + // Wait for sync + while (bytes_needed > 0) + { + size_t bytes_read = channel->read(read_pos, bytes_needed, ec); + if (bytes_read > 0) + { + read_pos += bytes_read; + bytes_needed -= bytes_read; + if (0 == bytes_needed) + { + size_t skip = // Text Next possible match Skip to next match + (tcp_header.rtcp[0] != 'R') ? 1 : // X--- XRTCP 1 + (tcp_header.rtcp[1] != 'T') ? 1 : // RX-- RRTCP 1 + (tcp_header.rtcp[2] != 'C') ? 2 : // RTX- RTRTCP 2 + (tcp_header.rtcp[3] != 'P') ? 3 : 0; // RTCX RTCRTCP 3 + + if (skip) + { + memmove(ptr, &ptr[skip], 4 - skip); + } + + read_pos -= skip; + bytes_needed = skip; + } + } + else if (ec) + { + return false; + } + } + + bytes_needed = TCPHeader::size() - 4; + while (bytes_needed > 0) + { + size_t bytes_read = channel->read(read_pos, bytes_needed, ec); + if (bytes_read > 0) + { + read_pos += bytes_read; + bytes_needed -= bytes_read; + } + else if (ec) + { + return false; + } + } + + return true; +} + /** * On TCP, we must receive the header (14 Bytes) and then, * the rest of the message, whose length is on the header. @@ -863,117 +925,99 @@ bool TCPTransportInterface::Receive( TCPHeader tcp_header; asio::error_code ec; - size_t bytes_received = channel->read(reinterpret_cast(&tcp_header), - TCPHeader::size(), ec); + bool header_found = false; - remote_locator = channel->locator(); + do + { + header_found = receive_header(channel, tcp_header, ec); + } while (!header_found && !ec); - if (bytes_received != TCPHeader::size()) + if (ec) { - if (bytes_received > 0) - { - logError(RTCP_MSG_IN, "Bad TCP header size: " << bytes_received << " (expected: : " - << TCPHeader::size() << ")" << ec.message()); - close_tcp_socket(channel); - } - else if (ec) + if (ec != asio::error::eof) { logWarning(DEBUG, "Error reading TCP header: " << ec.message()); - close_tcp_socket(channel); } - + close_tcp_socket(channel); success = false; } else { - // Check RTPC Header - if (tcp_header.rtcp[0] != 'R' - || tcp_header.rtcp[1] != 'T' - || tcp_header.rtcp[2] != 'C' - || tcp_header.rtcp[3] != 'P') + size_t body_size = tcp_header.length - static_cast(TCPHeader::size()); + + if (body_size > receive_buffer_capacity) { - logError(RTCP_MSG_IN, "Bad RTCP header identifier, closing connection."); - close_tcp_socket(channel); + logError(RTCP_MSG_IN, "Size of incoming TCP message is bigger than buffer capacity: " + << static_cast(body_size) << " vs. " << receive_buffer_capacity << ". " + << "The full message will be dropped."); success = false; + // Drop the message + size_t to_read = body_size; + size_t read_block = receive_buffer_capacity; + uint32_t readed; + while (read_block > 0) + { + read_body(receive_buffer, receive_buffer_capacity, &readed, channel, + read_block); + to_read -= readed; + read_block = (to_read >= receive_buffer_capacity) ? receive_buffer_capacity : to_read; + } } else { - size_t body_size = tcp_header.length - static_cast(TCPHeader::size()); + logInfo(RTCP_MSG_IN, "Received RTCP MSG. Logical Port " << tcp_header.logical_port); + success = read_body(receive_buffer, receive_buffer_capacity, &receive_buffer_size, + channel, body_size); - if (body_size > receive_buffer_capacity) + if (success) { - logError(RTCP_MSG_IN, "Size of incoming TCP message is bigger than buffer capacity: " - << static_cast(body_size) << " vs. " << receive_buffer_capacity << ". " - << "The full message will be dropped."); - success = false; - // Drop the message - size_t to_read = body_size; - size_t read_block = receive_buffer_capacity; - uint32_t readed; - while (read_block > 0) + if (configuration()->check_crc + && !check_crc(tcp_header, receive_buffer, receive_buffer_size)) { - read_body(receive_buffer, receive_buffer_capacity, &readed, channel, - read_block); - to_read -= readed; - read_block = (to_read >= receive_buffer_capacity) ? receive_buffer_capacity : to_read; + logWarning(RTCP_MSG_IN, "Bad TCP header CRC"); } - } - else - { - logInfo(RTCP_MSG_IN, "Received RTCP MSG. Logical Port " << tcp_header.logical_port); - success = read_body(receive_buffer, receive_buffer_capacity, &receive_buffer_size, - channel, body_size); - if (success) + if (tcp_header.logical_port == 0) { - if (configuration()->check_crc - && !check_crc(tcp_header, receive_buffer, receive_buffer_size)) + std::shared_ptr rtcp_message_manager; + if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status()) + { - logWarning(RTCP_MSG_IN, "Bad TCP header CRC"); + std::unique_lock lock(rtcp_message_manager_mutex_); + rtcp_message_manager = rtcp_manager.lock(); } - if (tcp_header.logical_port == 0) + if (rtcp_message_manager) { - std::shared_ptr rtcp_message_manager; - if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status()) - - { - std::unique_lock lock(rtcp_message_manager_mutex_); - rtcp_message_manager = rtcp_manager.lock(); - } + // The channel is not going to be deleted because we lock it for reading. + ResponseCode responseCode = rtcp_message_manager->processRTCPMessage( + channel, receive_buffer, body_size); - if (rtcp_message_manager) + if (responseCode != RETCODE_OK) { - // The channel is not going to be deleted because we lock it for reading. - ResponseCode responseCode = rtcp_message_manager->processRTCPMessage( - channel, receive_buffer, body_size); - - if (responseCode != RETCODE_OK) - { - close_tcp_socket(channel); - } - success = false; - - std::unique_lock lock(rtcp_message_manager_mutex_); - rtcp_message_manager.reset(); - rtcp_message_manager_cv_.notify_one(); - } - else - { - success = false; close_tcp_socket(channel); } + success = false; + std::unique_lock lock(rtcp_message_manager_mutex_); + rtcp_message_manager.reset(); + rtcp_message_manager_cv_.notify_one(); } else { - IPLocator::setLogicalPort(remote_locator, tcp_header.logical_port); - logInfo(RTCP_MSG_IN, "[RECEIVE] From: " << remote_locator \ - << " - " << receive_buffer_size << " bytes."); + success = false; + close_tcp_socket(channel); } + + } + else + { + IPLocator::setLogicalPort(remote_locator, tcp_header.logical_port); + logInfo(RTCP_MSG_IN, "[RECEIVE] From: " << remote_locator \ + << " - " << receive_buffer_size << " bytes."); } - // Error message already shown by read_body method. } + // Error message already shown by read_body method. } } } diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index a54950c8a50..3f9c9ee2aa3 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -20,6 +20,7 @@ #include #include #include "../../../src/cpp/rtps/transport/TCPSenderResource.hpp" +#include #include #include @@ -28,23 +29,25 @@ using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; +using TCPHeader = eprosima::fastdds::rtps::TCPHeader; #if defined(_WIN32) #define GET_PID _getpid #else #define GET_PID getpid -#endif +#endif // if defined(_WIN32) static uint16_t g_default_port = 0; static uint16_t g_output_port = 0; static uint16_t g_input_port = 0; static std::string g_test_wan_address = "88.88.88.88"; -uint16_t get_port(uint16_t offset) +uint16_t get_port( + uint16_t offset) { uint16_t port = static_cast(GET_PID()); - if(offset > port) + if (offset > port) { port += offset; } @@ -52,26 +55,27 @@ uint16_t get_port(uint16_t offset) return port; } -class TCPv4Tests: public ::testing::Test +class TCPv4Tests : public ::testing::Test { - public: - TCPv4Tests() - { - eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); - HELPER_SetDescriptorDefaults(); - } +public: - ~TCPv4Tests() - { - eprosima::fastdds::dds::Log::KillThread(); - } + TCPv4Tests() + { + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); + HELPER_SetDescriptorDefaults(); + } + + ~TCPv4Tests() + { + eprosima::fastdds::dds::Log::KillThread(); + } - void HELPER_SetDescriptorDefaults(); + void HELPER_SetDescriptorDefaults(); - TCPv4TransportDescriptor descriptor; - TCPv4TransportDescriptor descriptorOnlyOutput; - std::unique_ptr senderThread; - std::unique_ptr receiverThread; + TCPv4TransportDescriptor descriptor; + TCPv4TransportDescriptor descriptorOnlyOutput; + std::unique_ptr senderThread; + std::unique_ptr receiverThread; }; TEST_F(TCPv4Tests, locators_with_kind_1_supported) @@ -191,43 +195,45 @@ TEST_F(TCPv4Tests, send_and_receive_between_ports) IPLocator::setLogicalPort(outputLocator, 7410); MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); + MockMessageReceiver* msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); SendResourceList send_resource_list; ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(send_resource_list, outputLocator)); ASSERT_FALSE(send_resource_list.empty()); - octet message[5] = { 'H','e','l','l','o' }; + octet message[5] = { 'H', 'e', 'l', 'l', 'o' }; Semaphore sem; std::function recCallback = [&]() - { - EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); - sem.post(); - }; + { + EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); + sem.post(); + }; msg_recv->setCallback(recCallback); auto sendThreadFunction = [&]() - { - bool sent = false; - while (!sent) - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + { + bool sent = false; + while (!sent) + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - EXPECT_TRUE(sent); - }; + sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_TRUE(sent); + }; senderThread.reset(new std::thread(sendThreadFunction)); std::this_thread::sleep_for(std::chrono::milliseconds(1)); senderThread->join(); sem.wait(); } -#endif +#endif // ifndef __APPLE__ TEST_F(TCPv4Tests, send_is_rejected_if_buffer_size_is_bigger_to_size_specified_in_descriptor) { @@ -256,7 +262,7 @@ TEST_F(TCPv4Tests, send_is_rejected_if_buffer_size_is_bigger_to_size_specified_i // Then std::vector receiveBufferWrongSize(descriptor.sendBufferSize + 1); ASSERT_FALSE(send_resource_list.at(0)->send(receiveBufferWrongSize.data(), (uint32_t)receiveBufferWrongSize.size(), - &destination_begin, &destination_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100)))); + &destination_begin, &destination_end, (std::chrono::steady_clock::now() + std::chrono::microseconds(100)))); } TEST_F(TCPv4Tests, RemoteToMainLocal_simply_strips_out_address_leaving_IP_ANY) @@ -268,7 +274,7 @@ TEST_F(TCPv4Tests, RemoteToMainLocal_simply_strips_out_address_leaving_IP_ANY) Locator_t remote_locator; remote_locator.kind = LOCATOR_KIND_TCPv4; remote_locator.port = g_default_port; - IPLocator::setIPv4(remote_locator, 222,222,222,222); + IPLocator::setIPv4(remote_locator, 222, 222, 222, 222); // When Locator_t mainLocalLocator = transportUnderTest.RemoteToMainLocal(remote_locator); @@ -306,23 +312,23 @@ TEST_F(TCPv4Tests, send_to_wrong_interface) outputChannelLocator.port = g_output_port; outputChannelLocator.kind = LOCATOR_KIND_TCPv4; IPLocator::setLogicalPort(outputChannelLocator, 7400); - IPLocator::setIPv4(outputChannelLocator, 127,0,0,1); // Loopback + IPLocator::setIPv4(outputChannelLocator, 127, 0, 0, 1); // Loopback SendResourceList send_resource_list; ASSERT_TRUE(transportUnderTest.OpenOutputChannel(send_resource_list, outputChannelLocator)); ASSERT_FALSE(send_resource_list.empty()); //Sending through a different IP will NOT work, except 0.0.0.0 Locator_t wrongLocator(outputChannelLocator); - IPLocator::setIPv4(wrongLocator, 111,111,111,111); + IPLocator::setIPv4(wrongLocator, 111, 111, 111, 111); LocatorList_t locator_list; locator_list.push_back(wrongLocator); Locators wrong_begin(locator_list.begin()); Locators wrong_end(locator_list.end()); - std::vector message = { 'H','e','l','l','o' }; + std::vector message = { 'H', 'e', 'l', 'l', 'o' }; ASSERT_FALSE(send_resource_list.at(0)->send(message.data(), (uint32_t)message.size(), &wrong_begin, &wrong_end, - (std::chrono::steady_clock::now()+ std::chrono::microseconds(100)))); + (std::chrono::steady_clock::now() + std::chrono::microseconds(100)))); } TEST_F(TCPv4Tests, send_to_blocked_interface) @@ -343,16 +349,16 @@ TEST_F(TCPv4Tests, send_to_blocked_interface) //Sending through a different IP will NOT work, except 0.0.0.0 Locator_t wrongLocator(outputChannelLocator); IPLocator::setIPv4(wrongLocator, 111, 111 - , 111, 111); + , 111, 111); LocatorList_t locator_list; locator_list.push_back(wrongLocator); Locators wrong_begin(locator_list.begin()); Locators wrong_end(locator_list.end()); - std::vector message = { 'H','e','l','l','o' }; + std::vector message = { 'H', 'e', 'l', 'l', 'o' }; ASSERT_FALSE(send_resource_list.at(0)->send(message.data(), (uint32_t)message.size(), &wrong_begin, &wrong_end, - (std::chrono::steady_clock::now()+ std::chrono::microseconds(100)))); + (std::chrono::steady_clock::now() + std::chrono::microseconds(100)))); } #ifndef __APPLE__ @@ -397,7 +403,7 @@ TEST_F(TCPv4Tests, send_and_receive_between_allowed_interfaces_ports) LocatorList_t locator_list; locator_list.push_back(inputLocator); - + Locator_t outputLocator; outputLocator.kind = LOCATOR_KIND_TCPv4; outputLocator.set_address(locator); @@ -406,40 +412,44 @@ TEST_F(TCPv4Tests, send_and_receive_between_allowed_interfaces_ports) { MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); + MockMessageReceiver* msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); SendResourceList send_resource_list; ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(send_resource_list, outputLocator)); ASSERT_FALSE(send_resource_list.empty()); - octet message[5] = { 'H','e','l','l','o' }; + octet message[5] = { 'H', 'e', 'l', 'l', 'o' }; bool bOk = false; std::function recCallback = [&]() - { - EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); - bOk = true; - }; + { + EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); + bOk = true; + }; msg_recv->setCallback(recCallback); bool bFinish(false); auto sendThreadFunction = [&]() - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); - - bool sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - while (!bFinish && !sent) - { - Locators input_begin2(locator_list.begin()); - Locators input_end2(locator_list.end()); - - sent = send_resource_list.at(0)->send(message, 5, &input_begin2, &input_end2, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - EXPECT_TRUE(sent); - //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); - }; + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); + + bool sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + while (!bFinish && !sent) + { + Locators input_begin2(locator_list.begin()); + Locators input_end2(locator_list.end()); + + sent = + send_resource_list.at(0)->send(message, 5, &input_begin2, &input_end2, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_TRUE(sent); + //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); + }; senderThread.reset(new std::thread(sendThreadFunction)); std::this_thread::sleep_for(std::chrono::seconds(10)); @@ -490,7 +500,7 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_client_verifies) LocatorList_t locator_list; locator_list.push_back(inputLocator); - + Locator_t outputLocator; outputLocator.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); @@ -499,40 +509,44 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_client_verifies) { MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); + MockMessageReceiver* msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); SendResourceList send_resource_list; ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(send_resource_list, outputLocator)); ASSERT_FALSE(send_resource_list.empty()); - octet message[5] = { 'H','e','l','l','o' }; + octet message[5] = { 'H', 'e', 'l', 'l', 'o' }; Semaphore sem; std::function recCallback = [&]() - { - EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); - sem.post(); - }; + { + EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); + sem.post(); + }; msg_recv->setCallback(recCallback); auto sendThreadFunction = [&]() - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - bool sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - while (!sent) - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + bool sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + while (!sent) + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - EXPECT_TRUE(sent); - //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); - }; + sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_TRUE(sent); + //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); + }; senderThread.reset(new std::thread(sendThreadFunction)); std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -586,7 +600,7 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_server_verifies) LocatorList_t locator_list; locator_list.push_back(inputLocator); - + Locator_t outputLocator; outputLocator.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); @@ -595,40 +609,44 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_server_verifies) { MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); + MockMessageReceiver* msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); SendResourceList send_resource_list; ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(send_resource_list, outputLocator)); ASSERT_FALSE(send_resource_list.empty()); - octet message[5] = { 'H','e','l','l','o' }; + octet message[5] = { 'H', 'e', 'l', 'l', 'o' }; Semaphore sem; std::function recCallback = [&]() - { - EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); - sem.post(); - }; + { + EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); + sem.post(); + }; msg_recv->setCallback(recCallback); auto sendThreadFunction = [&]() - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - bool sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - while (!sent) - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + bool sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + while (!sent) + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - EXPECT_TRUE(sent); - //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); - }; + sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_TRUE(sent); + //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); + }; senderThread.reset(new std::thread(sendThreadFunction)); std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -651,7 +669,7 @@ TEST_F(TCPv4Tests, send_and_receive_between_both_secure_ports) recvDescriptor.tls_config.cert_chain_file = "mainpubcert.pem"; recvDescriptor.tls_config.private_key_file = "mainpubkey.pem"; recvDescriptor.tls_config.verify_file = "maincacert.pem"; - // Server doesn't accept clients without certs + // Server doesn't accept clients without certs recvDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER | TLSVerifyMode::VERIFY_FAIL_IF_NO_PEER_CERT; recvDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); recvDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE); @@ -693,40 +711,44 @@ TEST_F(TCPv4Tests, send_and_receive_between_both_secure_ports) { MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); + MockMessageReceiver* msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); SendResourceList send_resource_list; ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(send_resource_list, outputLocator)); ASSERT_FALSE(send_resource_list.empty()); - octet message[5] = { 'H','e','l','l','o' }; + octet message[5] = { 'H', 'e', 'l', 'l', 'o' }; Semaphore sem; std::function recCallback = [&]() - { - EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); - sem.post(); - }; + { + EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); + sem.post(); + }; msg_recv->setCallback(recCallback); auto sendThreadFunction = [&]() - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - bool sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - while (!sent) - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + bool sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + while (!sent) + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - EXPECT_TRUE(sent); - //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); - }; + sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_TRUE(sent); + //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); + }; senderThread.reset(new std::thread(sendThreadFunction)); std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -749,7 +771,7 @@ TEST_F(TCPv4Tests, send_and_receive_between_both_secure_ports_untrusted) recvDescriptor.tls_config.cert_chain_file = "mainpubcert.pem"; recvDescriptor.tls_config.private_key_file = "mainpubkey.pem"; recvDescriptor.tls_config.verify_file = "ca.pem"; // This CA doesn't know about these certificates - // Server doesn't accept clients without certs + // Server doesn't accept clients without certs recvDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_FAIL_IF_NO_PEER_CERT; recvDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); recvDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE); @@ -782,7 +804,7 @@ TEST_F(TCPv4Tests, send_and_receive_between_both_secure_ports_untrusted) LocatorList_t locator_list; locator_list.push_back(inputLocator); - + Locator_t outputLocator; outputLocator.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); @@ -791,44 +813,48 @@ TEST_F(TCPv4Tests, send_and_receive_between_both_secure_ports_untrusted) { MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); + MockMessageReceiver* msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); SendResourceList send_resource_list; ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(send_resource_list, outputLocator)); ASSERT_FALSE(send_resource_list.empty()); - octet message[5] = { 'H','e','l','l','o' }; + octet message[5] = { 'H', 'e', 'l', 'l', 'o' }; Semaphore sem; std::function recCallback = [&]() - { - ASSERT_TRUE(false); - EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); - sem.post(); - }; + { + ASSERT_TRUE(false); + EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); + sem.post(); + }; msg_recv->setCallback(recCallback); auto sendThreadFunction = [&]() - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - bool sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - int count = 0; - while (!sent && count < 30) - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + bool sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + int count = 0; + while (!sent && count < 30) + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - ++count; - } - EXPECT_FALSE(sent); - sem.post(); - //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); - }; + sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ++count; + } + EXPECT_FALSE(sent); + sem.post(); + //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); + }; senderThread.reset(new std::thread(sendThreadFunction)); std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -884,7 +910,7 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_1) LocatorList_t locator_list; locator_list.push_back(inputLocator); - + Locator_t outputLocator; outputLocator.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); @@ -893,39 +919,43 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_1) { MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); + MockMessageReceiver* msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); SendResourceList send_resource_list; ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(send_resource_list, outputLocator)); ASSERT_FALSE(send_resource_list.empty()); - octet message[5] = { 'H','e','l','l','o' }; + octet message[5] = { 'H', 'e', 'l', 'l', 'o' }; Semaphore sem; std::function recCallback = [&]() - { - EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); - sem.post(); - }; + { + EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); + sem.post(); + }; msg_recv->setCallback(recCallback); auto sendThreadFunction = [&]() - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - bool sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - while (!sent) - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + bool sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + while (!sent) + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - EXPECT_TRUE(sent); - }; + sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_TRUE(sent); + }; senderThread.reset(new std::thread(sendThreadFunction)); std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -934,8 +964,8 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_1) } } /* -TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_2) -{ + TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_2) + { eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode; @@ -1009,7 +1039,7 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_2) { sent = sendTransportUnderTest2.send(message, 5, outputLocator, inputLocator); std::this_thread::sleep_for(std::chrono::milliseconds(100)); - ++count; + ++count; } EXPECT_FALSE(sent); sem.post(); @@ -1021,8 +1051,8 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_2) sem.wait(); } ASSERT_TRUE(sendTransportUnderTest2.CloseOutputChannel(outputLocator)); -} -*/ + } + */ TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_untrusted_server) { @@ -1037,7 +1067,7 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_untrusted_server) recvDescriptor.tls_config.password = "testkey"; recvDescriptor.tls_config.cert_chain_file = "mainpubcert.pem"; recvDescriptor.tls_config.private_key_file = "mainpubkey.pem"; - // Server doesn't accept clients without certs + // Server doesn't accept clients without certs recvDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER; recvDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); recvDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE); @@ -1067,7 +1097,7 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_untrusted_server) LocatorList_t locator_list; locator_list.push_back(inputLocator); - + Locator_t outputLocator; outputLocator.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); @@ -1076,43 +1106,47 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_untrusted_server) { MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); + MockMessageReceiver* msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); SendResourceList send_resource_list; ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(send_resource_list, outputLocator)); ASSERT_FALSE(send_resource_list.empty()); - octet message[5] = { 'H','e','l','l','o' }; + octet message[5] = { 'H', 'e', 'l', 'l', 'o' }; Semaphore sem; std::function recCallback = [&]() - { - ASSERT_TRUE(false); - EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); - sem.post(); - }; + { + ASSERT_TRUE(false); + EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); + sem.post(); + }; msg_recv->setCallback(recCallback); auto sendThreadFunction = [&]() - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - bool sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - int count = 0; - while (!sent && count < 30) - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); - sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - ++count; - } - EXPECT_FALSE(sent); - sem.post(); - //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); - }; + bool sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + int count = 0; + while (!sent && count < 30) + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); + sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ++count; + } + EXPECT_FALSE(sent); + sem.post(); + //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); + }; senderThread.reset(new std::thread(sendThreadFunction)); std::this_thread::sleep_for(std::chrono::milliseconds(1)); @@ -1148,7 +1182,7 @@ TEST_F(TCPv4Tests, send_and_receive_between_allowed_localhost_interfaces_ports) LocatorList_t locator_list; locator_list.push_back(inputLocator); - + Locator_t outputLocator; outputLocator.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); @@ -1157,40 +1191,44 @@ TEST_F(TCPv4Tests, send_and_receive_between_allowed_localhost_interfaces_ports) { MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); + MockMessageReceiver* msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); SendResourceList send_resource_list; ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(send_resource_list, outputLocator)); ASSERT_FALSE(send_resource_list.empty()); - octet message[5] = { 'H','e','l','l','o' }; + octet message[5] = { 'H', 'e', 'l', 'l', 'o' }; bool bOk = false; std::function recCallback = [&]() - { - EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); - bOk = true; - }; + { + EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); + bOk = true; + }; msg_recv->setCallback(recCallback); bool bFinish(false); auto sendThreadFunction = [&]() - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); - bool sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - while (!bFinish && !sent) - { - Locators input_begin2(locator_list.begin()); - Locators input_end2(locator_list.end()); + bool sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + while (!bFinish && !sent) + { + Locators input_begin2(locator_list.begin()); + Locators input_end2(locator_list.end()); - sent = send_resource_list.at(0)->send(message, 5, &input_begin2, &input_end2, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - EXPECT_TRUE(sent); - //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); - }; + sent = + send_resource_list.at(0)->send(message, 5, &input_begin2, &input_end2, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_TRUE(sent); + //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); + }; senderThread.reset(new std::thread(sendThreadFunction)); std::this_thread::sleep_for(std::chrono::seconds(10)); @@ -1241,7 +1279,7 @@ TEST_F(TCPv4Tests, send_and_receive_between_blocked_interfaces_ports) LocatorList_t locator_list; locator_list.push_back(inputLocator); - + Locator_t outputLocator; outputLocator.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); @@ -1250,40 +1288,44 @@ TEST_F(TCPv4Tests, send_and_receive_between_blocked_interfaces_ports) { MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); + MockMessageReceiver* msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); SendResourceList send_resource_list; ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(send_resource_list, outputLocator)); ASSERT_FALSE(send_resource_list.empty()); - octet message[5] = { 'H','e','l','l','o' }; + octet message[5] = { 'H', 'e', 'l', 'l', 'o' }; bool bOk = false; std::function recCallback = [&]() - { - EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); - bOk = true; - }; + { + EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); + bOk = true; + }; msg_recv->setCallback(recCallback); bool bFinished(false); auto sendThreadFunction = [&]() - { - Locators input_begin(locator_list.begin()); - Locators input_end(locator_list.end()); - - bool sent = send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - while (!bFinished && !sent) - { - Locators input_begin2(locator_list.begin()); - Locators input_end2(locator_list.end()); - - sent = send_resource_list.at(0)->send(message, 5, &input_begin2, &input_end2, (std::chrono::steady_clock::now()+ std::chrono::microseconds(100))); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - EXPECT_FALSE(sent); - //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); - }; + { + Locators input_begin(locator_list.begin()); + Locators input_end(locator_list.end()); + + bool sent = + send_resource_list.at(0)->send(message, 5, &input_begin, &input_end, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + while (!bFinished && !sent) + { + Locators input_begin2(locator_list.begin()); + Locators input_end2(locator_list.end()); + + sent = + send_resource_list.at(0)->send(message, 5, &input_begin2, &input_end2, + (std::chrono::steady_clock::now() + std::chrono::microseconds(100))); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + EXPECT_FALSE(sent); + //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); + }; senderThread.reset(new std::thread(sendThreadFunction)); std::this_thread::sleep_for(std::chrono::seconds(10)); @@ -1295,7 +1337,139 @@ TEST_F(TCPv4Tests, send_and_receive_between_blocked_interfaces_ports) } } -#endif +#endif // ifndef __APPLE__ + +TEST_F(TCPv4Tests, receive_unordered_data) +{ + constexpr uint16_t logical_port = 7410; + constexpr uint32_t num_bytes_1 = 3; + constexpr uint32_t num_bytes_2 = 13; + const char* bad_headers[] = + { + "-RTC", "-RT", "-R", + "-RRTC", "-RRT", "-RR", + "-RTRTC", "-RTRT", "-RTR", + "-RTCRTC", "-RTCRT", "-RTCR" + }; + + struct Receiver : public TransportReceiverInterface + { + std::array num_received{ 0, 0, 0 }; + + void OnDataReceived( + const octet* data, + const uint32_t size, + const Locator_t& local_locator, + const Locator_t& remote_locator) override + { + static_cast(data); + static_cast(local_locator); + static_cast(remote_locator); + + std::cout << "Received " << size << " bytes: " << std::hex << uint32_t(data[0]) << std::dec << std::endl; + + switch (size) + { + case num_bytes_1: + num_received[0]++; + break; + case num_bytes_2: + num_received[1]++; + break; + default: + num_received[2]++; + break; + } + } + + }; + + Receiver receiver; + + TCPv4TransportDescriptor test_descriptor = descriptor; + test_descriptor.check_crc = false; + TCPv4Transport uut(test_descriptor); + ASSERT_TRUE(uut.init()) << "Failed to initialize transport. Port " << g_default_port << " may be in use"; + + Locator_t input_locator; + input_locator.kind = LOCATOR_KIND_TCPv4; + input_locator.port = g_default_port; + IPLocator::setIPv4(input_locator, 127, 0, 0, 1); + IPLocator::setLogicalPort(input_locator, logical_port); + + EXPECT_TRUE(uut.OpenInputChannel(input_locator, &receiver, 0xFFFF)); + + // Let acceptor to be open + std::this_thread::sleep_for(std::chrono::seconds(1)); + + asio::error_code ec; + asio::io_context ctx; + + asio::ip::tcp::socket sender(ctx); + asio::ip::tcp::endpoint destination; + destination.port(g_default_port); + destination.address(asio::ip::address::from_string("127.0.0.1")); + sender.connect(destination, ec); + ASSERT_TRUE(!ec) << ec; + + std::array bytes_1{ 0 }; + std::array bytes_2{ 0 }; + + TCPHeader h1; + h1.logical_port = logical_port; + h1.length += num_bytes_1; + + TCPHeader h2; + h2.logical_port = logical_port; + h2.length += num_bytes_2; + + std::array expected_number{ 0, 0, 0 }; + + auto send_first = [&]() + { + expected_number[0]++; + bytes_1[0]++; + EXPECT_EQ(TCPHeader::size(), asio::write(sender, asio::buffer(&h1, TCPHeader::size()), ec)); + EXPECT_EQ(num_bytes_1, asio::write(sender, asio::buffer(bytes_1.data(), bytes_1.size()), ec)); + }; + + // Send first synchronized + send_first(); + + // Send non-matching RTCP headers + for (const char* header : bad_headers) + { + asio::write(sender, asio::buffer(header, strlen(header) - 1), ec); + } + + // Send first prepended with bad headers + for (const char* header : bad_headers) + { + asio::write(sender, asio::buffer(header, strlen(header) - 1), ec); + send_first(); + } + + // Interleave headers and data (only first will arrive) + expected_number[0]++; + EXPECT_EQ(TCPHeader::size(), asio::write(sender, asio::buffer(&h1, TCPHeader::size()), ec)); + EXPECT_EQ(TCPHeader::size(), asio::write(sender, asio::buffer(&h2, TCPHeader::size()), ec)); + EXPECT_EQ(num_bytes_1, asio::write(sender, asio::buffer(bytes_1.data(), bytes_1.size()), ec)); + EXPECT_EQ(num_bytes_2, asio::write(sender, asio::buffer(bytes_2.data(), bytes_2.size()), ec)); + + // Send second without interleaving + expected_number[1]++; + EXPECT_EQ(TCPHeader::size(), asio::write(sender, asio::buffer(&h2, TCPHeader::size()), ec)); + EXPECT_EQ(num_bytes_2, asio::write(sender, asio::buffer(bytes_2.data(), bytes_2.size()), ec)); + + // Wait for data to be received + std::this_thread::sleep_for(std::chrono::seconds(1)); + + EXPECT_TRUE(!sender.close(ec)); + + EXPECT_EQ(expected_number, receiver.num_received); + + EXPECT_TRUE(uut.CloseInputChannel(input_locator)); +} void TCPv4Tests::HELPER_SetDescriptorDefaults() { @@ -1303,7 +1477,9 @@ void TCPv4Tests::HELPER_SetDescriptorDefaults() descriptor.set_WAN_address(g_test_wan_address); } -int main(int argc, char **argv) +int main( + int argc, + char** argv) { eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Info); g_default_port = get_port(4000);