From a4cd14968efdf03607fc6b8154162f6aa744ecc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ferreira=20Gonz=C3=A1lez?= Date: Wed, 11 Dec 2024 16:51:31 +0100 Subject: [PATCH] Fix unique network flows with TCP transports (#5461) * Refs #22055: Add regression tests Signed-off-by: cferreiragonz * Refs #22055: Fix unique flows for TCP Signed-off-by: cferreiragonz * Refs #22055: Fix tests Signed-off-by: cferreiragonz --------- Signed-off-by: cferreiragonz (cherry picked from commit 81cdb10a9076adb7262d450ccec3f47bf29f67da) --- .../rtps/participant/RTPSParticipantImpl.cpp | 10 +- .../api/dds-pim/PubSubParticipant.hpp | 7 + .../common/BlackboxTestsTransportTCP.cpp | 131 ++++++++++++++++++ 3 files changed, 147 insertions(+), 1 deletion(-) diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 0c240518bbc..a0b48ad210f 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -2001,7 +2001,15 @@ bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint( // Set port on unicast locators for (Locator_t& loc : attributes.unicastLocatorList) { - loc.port = port; + // Set logical port only TCP locators + if (LOCATOR_KIND_TCPv4 == loc.kind || LOCATOR_KIND_TCPv6 == loc.kind) + { + IPLocator::setLogicalPort(loc, port); + } + else + { + loc.port = port; + } } // Try creating receiver resources diff --git a/test/blackbox/api/dds-pim/PubSubParticipant.hpp b/test/blackbox/api/dds-pim/PubSubParticipant.hpp index bc4afb9800a..304b17b3b62 100644 --- a/test/blackbox/api/dds-pim/PubSubParticipant.hpp +++ b/test/blackbox/api/dds-pim/PubSubParticipant.hpp @@ -684,6 +684,13 @@ class PubSubParticipant return false; } + PubSubParticipant& initial_peers( + const eprosima::fastdds::rtps::LocatorList& initial_peers) + { + participant_qos_.wire_protocol().builtin.initialPeersList = initial_peers; + return *this; + } + PubSubParticipant& pub_property_policy( const eprosima::fastrtps::rtps::PropertyPolicy property_policy) { diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index a7a3edad7a3..dd50ee8807a 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -25,6 +25,7 @@ #include "../api/dds-pim/TCPReqRepHelloWorldRequester.hpp" #include "../api/dds-pim/TCPReqRepHelloWorldReplier.hpp" +#include "PubSubParticipant.hpp" #include "PubSubReader.hpp" #include "PubSubWriter.hpp" #include "DatagramInjectionTransport.hpp" @@ -1385,6 +1386,136 @@ TEST_P(TransportTCP, TCP_initial_peers_connection) p3.block_for_all(); } +TEST_P(TransportTCP, tcp_unique_network_flows_init) +{ + // TCP Writer creation should fail as feature is not implemented for writers + { + PubSubWriter writer(TEST_TOPIC_NAME); + PropertyPolicy properties; + properties.properties().emplace_back("fastdds.unique_network_flows", ""); + + test_transport_->add_listener_port(global_port); + writer.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); + + writer.entity_property_policy(properties).init(); + + EXPECT_FALSE(writer.isInitialized()); + } + + // Two readers on the same participant not requesting unique flows should give the same logical port and same physical port + { + PubSubParticipant participant(0, 2, 0, 0); + + participant.sub_topic_name(TEST_TOPIC_NAME); + + participant.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); + + ASSERT_TRUE(participant.init_participant()); + ASSERT_TRUE(participant.init_subscriber(0)); + ASSERT_TRUE(participant.init_subscriber(1)); + + LocatorList_t locators; + LocatorList_t locators2; + + participant.get_native_reader(0).get_listening_locators(locators); + participant.get_native_reader(1).get_listening_locators(locators2); + + EXPECT_TRUE(locators == locators2); + // LocatorList size depends on the number of interfaces. Different address but same port. + ASSERT_GT(locators.size(), 0); + ASSERT_GT(locators2.size(), 0); + auto locator1 = locators.begin(); + auto locator2 = locators2.begin(); + EXPECT_EQ(IPLocator::getPhysicalPort(*locator1), IPLocator::getPhysicalPort(*locator2)); + EXPECT_EQ(IPLocator::getLogicalPort(*locator1), IPLocator::getLogicalPort(*locator2)); + } + + // Two TCP readers on the same participant requesting unique flows should give different logical ports but same physical port + { + PubSubParticipant participant(0, 2, 0, 0); + + PropertyPolicy properties; + properties.properties().emplace_back("fastdds.unique_network_flows", ""); + participant.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties); + + participant.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); + + ASSERT_TRUE(participant.init_participant()); + ASSERT_TRUE(participant.init_subscriber(0)); + ASSERT_TRUE(participant.init_subscriber(1)); + + LocatorList_t locators; + LocatorList_t locators2; + + participant.get_native_reader(0).get_listening_locators(locators); + participant.get_native_reader(1).get_listening_locators(locators2); + + EXPECT_FALSE(locators == locators2); + // LocatorList size depends on the number of interfaces. Different address but same port. + ASSERT_GT(locators.size(), 0); + ASSERT_GT(locators2.size(), 0); + auto locator1 = locators.begin(); + auto locator2 = locators2.begin(); + EXPECT_EQ(IPLocator::getPhysicalPort(*locator1), IPLocator::getPhysicalPort(*locator2)); + EXPECT_NE(IPLocator::getLogicalPort(*locator1), IPLocator::getLogicalPort(*locator2)); + } +} + +TEST_P(TransportTCP, tcp_unique_network_flows_communication) +{ + PubSubParticipant readers(0, 2, 0, 2); + PubSubWriter writer(TEST_TOPIC_NAME); + + PropertyPolicy properties; + properties.properties().emplace_back("fastdds.unique_network_flows", ""); + readers.disable_builtin_transport().add_user_transport_to_pparams(test_transport_); + + eprosima::fastdds::rtps::Locator_t initial_peer_locator; + if (use_ipv6) + { + initial_peer_locator.kind = LOCATOR_KIND_TCPv6; + eprosima::fastdds::rtps::IPLocator::setIPv6(initial_peer_locator, "::1"); + } + else + { + initial_peer_locator.kind = LOCATOR_KIND_TCPv4; + eprosima::fastdds::rtps::IPLocator::setIPv4(initial_peer_locator, "127.0.0.1"); + } + eprosima::fastdds::rtps::IPLocator::setPhysicalPort(initial_peer_locator, global_port); + eprosima::fastdds::rtps::LocatorList_t initial_peer_list; + initial_peer_list.push_back(initial_peer_locator); + + readers.sub_topic_name(TEST_TOPIC_NAME) + .sub_property_policy(properties) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .initial_peers(initial_peer_list); + + ASSERT_TRUE(readers.init_participant()); + ASSERT_TRUE(readers.init_subscriber(0)); + ASSERT_TRUE(readers.init_subscriber(1)); + + test_transport_->add_listener_port(global_port); + writer.disable_builtin_transport() + .add_user_transport_to_pparams(test_transport_) + .reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .history_depth(100); + + writer.init(); + ASSERT_TRUE(writer.isInitialized()); + + // Wait for discovery. + writer.wait_discovery(); + readers.sub_wait_discovery(); + + // Send data + auto data = default_helloworld_data_generator(); + writer.send(data); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block until readers have acknowledged all samples. + EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(30))); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else