diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 2fd9a2d60e2..14c9ccde6f1 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -258,21 +258,6 @@ void TCPTransportInterface::bind_socket( auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel); assert(it_remove != unbound_channel_resources_.end()); unbound_channel_resources_.erase(it_remove); - - unbound_lock.unlock(); - - // Look for an existing channel that matches this physical locator - auto existing_channel = channel_resources_.find(channel->locator()); - // If the channel exists, check if the channel reference wait until it finishes its tasks - if (existing_channel != channel_resources_.end()) - { - // Disconnect the old channel - existing_channel->second->disconnect(); - scopedLock.unlock(); - existing_channel->second->clear(); - scopedLock.lock(); - } - channel_resources_[channel->locator()] = channel; } @@ -693,6 +678,9 @@ bool TCPTransportInterface::OpenOutputChannel( if (existing_channel != channel_resources_.end() && existing_channel->second != tcp_sender_resource->channel()) { + // Disconnect the old channel + tcp_sender_resource->channel()->disconnect(); + tcp_sender_resource->channel()->clear(); // Update sender resource with new channel tcp_sender_resource->channel() = existing_channel->second; } @@ -929,17 +917,10 @@ void TCPTransportInterface::perform_listen_operation( { TransportReceiverInterface* receiver = it->second.first; ReceiverInUseCV* receiver_in_use = it->second.second; - receiver_in_use->cv.wait(scopedLock, [&]() - { - return receiver_in_use->in_use == false; - }); - if (TCPChannelResource::eConnectionStatus::eConnecting < channel->connection_status()) - { - receiver_in_use->in_use = true; - scopedLock.unlock(); - receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator); - scopedLock.lock(); - } + receiver_in_use->in_use = true; + scopedLock.unlock(); + receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator); + scopedLock.lock(); receiver_in_use->in_use = false; receiver_in_use->cv.notify_one(); } diff --git a/test/blackbox/common/BlackboxTestsTransportCustom.cpp b/test/blackbox/common/BlackboxTestsTransportCustom.cpp index 7efc7747e3b..5944d53d702 100644 --- a/test/blackbox/common/BlackboxTestsTransportCustom.cpp +++ b/test/blackbox/common/BlackboxTestsTransportCustom.cpp @@ -14,15 +14,17 @@ #include "BlackboxTests.hpp" -#include "PubSubReader.hpp" -#include "PubSubWriter.hpp" +#include + +#include #include #include #include #include -#include +#include "PubSubReader.hpp" +#include "PubSubWriter.hpp" using BuiltinTransports = eprosima::fastdds::rtps::BuiltinTransports; @@ -114,6 +116,181 @@ eprosima::fastdds::rtps::TransportInterface* TestChainingTransportDescriptor::cr return new TestChainingTransport(*this); } +class BuiltinTransportsTest +{ +public: + + static void test_xml( + const std::string& profiles_file, + const std::string& participant_profile) + { + run_test(profiles_file, participant_profile, "", BuiltinTransports::NONE); + } + + static void test_env( + const std::string& env_var_value) + { + if (env_var_value == "NONE") + { +#ifdef _WIN32 + _putenv_s(env_var_name_.c_str(), env_var_value.c_str()); +#else + setenv(env_var_name_.c_str(), env_var_value.c_str(), 1); +#endif // _WIN32 + + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + writer.init(); + ASSERT_FALSE(writer.isInitialized()); + + reader.init(); + ASSERT_FALSE(reader.isInitialized()); + + } + else + { + run_test("", "", env_var_value, BuiltinTransports::NONE); + } + } + + static void test_api( + const BuiltinTransports& builtin_transports) + { + if (builtin_transports == BuiltinTransports::NONE) + { + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + writer.setup_transports(builtin_transports).init(); + ASSERT_FALSE(writer.isInitialized()); + + reader.setup_transports(builtin_transports).init(); + ASSERT_FALSE(reader.isInitialized()); + } + else + { + run_test("", "", "", builtin_transports); + } + } + +private: + + static void run_test( + const std::string& profiles_file, + const std::string& participant_profile, + const std::string& env_var_value, + const BuiltinTransports& builtin_transports) + { + enum class BuiltinTransportsTestCase : uint8_t + { + NONE, + XML, + ENV, + API + }; + + BuiltinTransportsTestCase test_case = BuiltinTransportsTestCase::NONE; + + /* Validate input */ + if (profiles_file != "") + { + ASSERT_NE(participant_profile, ""); + ASSERT_EQ(builtin_transports, BuiltinTransports::NONE); + ASSERT_EQ(env_var_value, ""); + test_case = BuiltinTransportsTestCase::XML; + } + else if (env_var_value != "") + { + ASSERT_EQ(profiles_file, ""); + ASSERT_EQ(participant_profile, ""); + ASSERT_EQ(builtin_transports, BuiltinTransports::NONE); + test_case = BuiltinTransportsTestCase::ENV; + } + else if (builtin_transports != BuiltinTransports::NONE) + { + ASSERT_EQ(profiles_file, ""); + ASSERT_EQ(participant_profile, ""); + ASSERT_EQ(env_var_value, ""); + test_case = BuiltinTransportsTestCase::API; + } + + ASSERT_NE(test_case, BuiltinTransportsTestCase::NONE); + + /* Test configuration */ + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + // Reliable keep all to wait of all acked as end condition + writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS); + + reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS); + + // Builtin transport configuration according to test_case + switch (test_case) + { + case BuiltinTransportsTestCase::XML: + { + writer.set_xml_filename(profiles_file); + writer.set_participant_profile(participant_profile); + + reader.set_xml_filename(profiles_file); + reader.set_participant_profile(participant_profile); + break; + } + case BuiltinTransportsTestCase::ENV: + { +#ifdef _WIN32 + _putenv_s(env_var_name_.c_str(), env_var_name_.c_str()); +#else + setenv(env_var_name_.c_str(), env_var_name_.c_str(), 1); +#endif // _WIN32 + break; + } + case BuiltinTransportsTestCase::API: + { + writer.setup_transports(builtin_transports); + reader.setup_transports(builtin_transports); + break; + } + default: + { + FAIL(); + } + } + + /* Run test */ + // Init writer + writer.init(); + ASSERT_TRUE(writer.isInitialized()); + + // Init reader + reader.init(); + ASSERT_TRUE(reader.isInitialized()); + + // Wait for discovery + writer.wait_discovery(); + reader.wait_discovery(); + + // Send data + auto data = default_helloworld_data_generator(); + reader.startReception(data); + writer.send(data); + ASSERT_TRUE(data.empty()); + + // Wait for reception acknowledgement + reader.block_for_all(); + EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(3))); + } + + static const std::string env_var_name_; +}; + +// Static const member of non-integral types cannot be in-class initialized +const std::string BuiltinTransportsTest::env_var_name_ = "FASTDDS_BUILTIN_TRANSPORTS"; + TEST(ChainingTransportTests, basic_test) { bool writer_init_function_called = false; @@ -309,189 +486,128 @@ TEST(ChainingTransportTests, tcp_client_server_with_wan_correct_sender_resources ASSERT_LE(times_reader_receive_function_called.load(), 30); } -TEST(ChainingTransportTests, builtin_transports_basic_test) +TEST(ChainingTransportTests, builtin_transports_api_none) { - std::vector bt_list; - bt_list.push_back(BuiltinTransports::DEFAULT); - bt_list.push_back(BuiltinTransports::DEFAULTv6); - bt_list.push_back(BuiltinTransports::SHM); - bt_list.push_back(BuiltinTransports::UDPv4); - bt_list.push_back(BuiltinTransports::UDPv6); - bt_list.push_back(BuiltinTransports::LARGE_DATA); -#ifndef __APPLE__ - bt_list.push_back(BuiltinTransports::LARGE_DATAv6); -#endif // __APPLE__ - - for (auto test_transport : bt_list) - { - { - PubSubWriter writer(TEST_TOPIC_NAME); - PubSubReader reader(TEST_TOPIC_NAME); - - writer.setup_transports(test_transport) - .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) - .init(); - - ASSERT_TRUE(writer.isInitialized()); - - reader.setup_transports(test_transport) - .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) - .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) - .init(); - - ASSERT_TRUE(reader.isInitialized()); - - // Wait for discovery. - writer.wait_discovery(); - reader.wait_discovery(); - - auto data = default_helloworld_data_generator(); - // size_t num_messages = data.size(); - reader.startReception(data); - writer.send(data); - ASSERT_TRUE(data.empty()); - reader.block_for_all(); - - // Check reception - // reader.wait_for_all_received(std::chrono::seconds(3), num_messages); - EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(3))); - } - } - - PubSubWriter writer(TEST_TOPIC_NAME); - PubSubReader reader(TEST_TOPIC_NAME); + BuiltinTransportsTest::test_api(BuiltinTransports::NONE); +} - BuiltinTransports test_transport = BuiltinTransports::NONE; +TEST(ChainingTransportTests, builtin_transports_api_default) +{ + BuiltinTransportsTest::test_api(BuiltinTransports::DEFAULT); +} - writer.setup_transports(test_transport) - .init(); +TEST(ChainingTransportTests, builtin_transports_api_defaultv6) +{ + BuiltinTransportsTest::test_api(BuiltinTransports::DEFAULTv6); +} - ASSERT_FALSE(writer.isInitialized()); +TEST(ChainingTransportTests, builtin_transports_api_shm) +{ + BuiltinTransportsTest::test_api(BuiltinTransports::SHM); +} - reader.setup_transports(test_transport) - .init(); +TEST(ChainingTransportTests, builtin_transports_api_udpv4) +{ + BuiltinTransportsTest::test_api(BuiltinTransports::UDPv4); +} - ASSERT_FALSE(reader.isInitialized()); +TEST(ChainingTransportTests, builtin_transports_api_udpv6) +{ + BuiltinTransportsTest::test_api(BuiltinTransports::UDPv6); } -TEST(ChainingTransportTests, builtin_transports_env_var_test) +TEST(ChainingTransportTests, builtin_transports_api_large_data) { - const std::string env_var_name("FASTDDS_BUILTIN_TRANSPORTS"); - - std::vector bt_list; - bt_list.push_back("DEFAULT"); - bt_list.push_back("DEFAULTv6"); - bt_list.push_back("SHM"); - bt_list.push_back("UDPv4"); - bt_list.push_back("UDPv6"); - bt_list.push_back("LARGE_DATA"); + BuiltinTransportsTest::test_api(BuiltinTransports::LARGE_DATA); +} + #ifndef __APPLE__ - bt_list.push_back("LARGE_DATAv6"); +TEST(ChainingTransportTests, builtin_transports_api_large_datav6) +{ + BuiltinTransportsTest::test_api(BuiltinTransports::LARGE_DATAv6); +} #endif // __APPLE__ - for (auto test_transport : bt_list) - { - { -#ifdef _WIN32 - _putenv_s(env_var_name.c_str(), test_transport.c_str()); -#else - setenv(env_var_name.c_str(), test_transport.c_str(), 1); -#endif // _WIN32 - - PubSubWriter writer(TEST_TOPIC_NAME); - PubSubReader reader(TEST_TOPIC_NAME); - - writer.history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) - .init(); - - ASSERT_TRUE(writer.isInitialized()); - - reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) - .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) - .init(); - - ASSERT_TRUE(reader.isInitialized()); - - // Wait for discovery. - writer.wait_discovery(); - reader.wait_discovery(); - - auto data = default_helloworld_data_generator(); - // size_t num_messages = data.size(); - reader.startReception(data); - writer.send(data); - ASSERT_TRUE(data.empty()); - reader.block_for_all(); - - // Check reception - EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(3))); - } - } - - std::string value("NONE"); -#ifdef _WIN32 - _putenv_s(env_var_name.c_str(), value.c_str()); -#else - setenv(env_var_name.c_str(), value.c_str(), 1); -#endif // _WIN32 +TEST(ChainingTransportTests, builtin_transports_env_none) +{ + BuiltinTransportsTest::test_env("NONE"); +} - PubSubWriter writer(TEST_TOPIC_NAME); - PubSubReader reader(TEST_TOPIC_NAME); +TEST(ChainingTransportTests, builtin_transports_env_default) +{ + BuiltinTransportsTest::test_env("DEFAULT"); +} - writer.init(); +TEST(ChainingTransportTests, builtin_transports_env_defaultv6) +{ + BuiltinTransportsTest::test_env("DEFAULTv6"); +} - ASSERT_FALSE(writer.isInitialized()); +TEST(ChainingTransportTests, builtin_transports_env_shm) +{ + BuiltinTransportsTest::test_env("SHM"); +} - reader.init(); +TEST(ChainingTransportTests, builtin_transports_env_udpv4) +{ + BuiltinTransportsTest::test_env("UDPv4"); +} - ASSERT_FALSE(reader.isInitialized()); +TEST(ChainingTransportTests, builtin_transports_env_udpv6) +{ + BuiltinTransportsTest::test_env("UDPv6"); } -TEST(ChainingTransportTests, builtin_transports_xml_test) +TEST(ChainingTransportTests, builtin_transports_env_large_data) { - std::vector bt_list; - bt_list.push_back("participant_none"); - bt_list.push_back("participant_default"); - bt_list.push_back("participant_defaultv6"); - bt_list.push_back("participant_shm"); - bt_list.push_back("participant_udp"); - bt_list.push_back("participant_udpv6"); - bt_list.push_back("participant_largedata"); + BuiltinTransportsTest::test_env("LARGE_DATA"); +} + #ifndef __APPLE__ - bt_list.push_back("participant_largedatav6"); +TEST(ChainingTransportTests, builtin_transports_env_large_datav6) +{ + BuiltinTransportsTest::test_env("LARGE_DATAv6"); +} #endif // __APPLE__ - for (auto test_transport : bt_list) - { - { - PubSubWriter writer(TEST_TOPIC_NAME); - PubSubReader reader(TEST_TOPIC_NAME); +TEST(ChainingTransportTests, builtin_transports_xml_none) +{ + BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_none"); +} - writer.set_xml_filename("builtin_transports_profile.xml"); - writer.set_participant_profile(test_transport); - writer.init(); +TEST(ChainingTransportTests, builtin_transports_xml_default) +{ + BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_default"); +} - ASSERT_TRUE(writer.isInitialized()); +TEST(ChainingTransportTests, builtin_transports_xml_defaultv6) +{ + BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_defaultv6"); +} - reader.set_xml_filename("builtin_transports_profile.xml"); - reader.set_participant_profile(test_transport); - reader.init(); +TEST(ChainingTransportTests, builtin_transports_xml_shm) +{ + BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_shm"); +} - ASSERT_TRUE(reader.isInitialized()); +TEST(ChainingTransportTests, builtin_transports_xml_udpv4) +{ + BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_udp"); +} - // Wait for discovery. - writer.wait_discovery(); - reader.wait_discovery(); +TEST(ChainingTransportTests, builtin_transports_xml_udpv6) +{ + BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_udpv6"); +} - auto data = default_helloworld_data_generator(); - // size_t num_messages = data.size(); - reader.startReception(data); - writer.send(data); - ASSERT_TRUE(data.empty()); - reader.block_for_all(); +TEST(ChainingTransportTests, builtin_transports_xml_large_data) +{ + BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_largedata"); +} - // Check reception - EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(3))); - } - } +#ifndef __APPLE__ +TEST(ChainingTransportTests, builtin_transports_xml_large_datav6) +{ + BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_largedatav6"); } +#endif // __APPLE__ diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index 672071390ac..0c065246ef7 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -611,10 +611,8 @@ TEST_P(TransportTCP, TCPv6_copy) EXPECT_EQ(tcpv6_transport_copy, tcpv6_transport); } -// Test connection is successfully restablished after dropping and relaunching a TCP client (requester), -// when the server's listening thread for the old client hasn't processed all its messages. +// Test connection is successfully restablished after dropping and relaunching a TCP client (requester) // Issue -> https://github.com/eProsima/Fast-DDS/issues/2409 -// Issue -> https://github.com/eProsima/Fast-DDS/issues/4026 TEST(TransportTCP, Client_reconnection) { TCPReqRepHelloWorldReplier* replier; @@ -622,7 +620,7 @@ TEST(TransportTCP, Client_reconnection) const uint16_t nmsgs = 5; replier = new TCPReqRepHelloWorldReplier; - replier->init(1, 0, global_port, 0, nullptr, true); + replier->init(1, 0, global_port); ASSERT_TRUE(replier->isInitialized()); diff --git a/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp b/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp index d5c5d07e370..30776253694 100644 --- a/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp +++ b/test/blackbox/common/TCPReqRepHelloWorldReplier.cpp @@ -65,8 +65,7 @@ void TCPReqRepHelloWorldReplier::init( int domainId, uint16_t listeningPort, uint32_t maxInitialPeer, - const char* certs_folder, - bool use_busy_listener) + const char* certs_folder) { ParticipantAttributes pattr; pattr.domainId = domainId; @@ -133,14 +132,9 @@ void TCPReqRepHelloWorldReplier::init( puattr.topic.topicDataType = type_.getName(); puattr.topic.topicName = "HelloWorldTopicReply"; configPublisher("Reply"); - if (use_busy_listener) - { - reply_listener_.use_busy_listener(true); - } reply_publisher_ = Domain::createPublisher(participant_, puattr, &reply_listener_); ASSERT_NE(reply_publisher_, nullptr); - initialized_ = true; } diff --git a/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp b/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp index 3c76fa16581..4dfceeea3c5 100644 --- a/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp +++ b/test/blackbox/common/TCPReqRepHelloWorldReplier.hpp @@ -28,7 +28,6 @@ #include #include -#include #include #include #include @@ -92,7 +91,6 @@ class TCPReqRepHelloWorldReplier RequestListener( TCPReqRepHelloWorldReplier& replier) : replier_(replier) - , use_busy_listener_(false) { } @@ -108,16 +106,6 @@ class TCPReqRepHelloWorldReplier { replier_.matched(); } - else if (info.status == eprosima::fastrtps::rtps::REMOVED_MATCHING && use_busy_listener_) - { - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } - } - - void use_busy_listener( - const bool& value) - { - use_busy_listener_ = value; } private: @@ -126,7 +114,6 @@ class TCPReqRepHelloWorldReplier const RequestListener&) = delete; TCPReqRepHelloWorldReplier& replier_; - bool use_busy_listener_; } reply_listener_; @@ -138,8 +125,7 @@ class TCPReqRepHelloWorldReplier int domainId, uint16_t listeningPort, uint32_t maxInitialPeer = 0, - const char* certs_folder = nullptr, - bool use_busy_listener = false); + const char* certs_folder = nullptr); bool isInitialized() const { return initialized_; diff --git a/test/blackbox/xfail_tests.cmake b/test/blackbox/xfail_tests.cmake index fd2e111c56b..8679750f523 100644 --- a/test/blackbox/xfail_tests.cmake +++ b/test/blackbox/xfail_tests.cmake @@ -9,6 +9,17 @@ set(XFAIL_DDS_PIM_TESTS LivelinessQos/LivelinessQos.ThreeWriters_ThreeReaders/Transport LivelinessQos/LivelinessQos.TwoWriters_OneReader_ManualByParticipant/Intraprocess PersistenceLargeData/PersistenceLargeData.PubSubAsReliablePubPersistentWithFrag/Transport + + ########################################################## + # These test are here because of TCP instabilities + ########################################################## + ChainingTransportTests.builtin_transports_api_large_data + ChainingTransportTests.builtin_transports_api_large_datav6 + ChainingTransportTests.builtin_transports_env_large_data + ChainingTransportTests.builtin_transports_env_large_datav6 + ChainingTransportTests.builtin_transports_xml_large_data + ChainingTransportTests.builtin_transports_xml_large_datav6 + ########################################################## ) set_tests_properties(${XFAIL_DDS_PIM_TESTS}