diff --git a/include/fastrtps/utils/IPLocator.h b/include/fastrtps/utils/IPLocator.h index 35257c877d8..656da36f3e7 100644 --- a/include/fastrtps/utils/IPLocator.h +++ b/include/fastrtps/utils/IPLocator.h @@ -201,6 +201,12 @@ class IPLocator RTPS_DllAPI static std::string toWanstring( const Locator_t& locator); + //! This method is useful in the case of having a tcp client with an initial peer + //! pointing to a WAN locator, and receiving a locator with LAN and WAN + //! addresses (TCP Client from TCP Server) + RTPS_DllAPI static Locator_t WanToLanLocator( + const Locator_t& locator); + //! Sets locator's LAN ID (as in RTCP protocol) RTPS_DllAPI static bool setLanID( Locator_t& locator, diff --git a/src/cpp/rtps/transport/ChainingSenderResource.hpp b/src/cpp/rtps/transport/ChainingSenderResource.hpp index 23352ad7519..65973372cff 100644 --- a/src/cpp/rtps/transport/ChainingSenderResource.hpp +++ b/src/cpp/rtps/transport/ChainingSenderResource.hpp @@ -56,6 +56,18 @@ class ChainingSenderResource : public fastrtps::rtps::SenderResource }; } + fastrtps::rtps::SenderResource* lower_sender_cast() + { + fastrtps::rtps::SenderResource* lower_sender_cast = nullptr; + + if (low_sender_resource_) + { + lower_sender_cast = static_cast(low_sender_resource_.get()); + } + + return lower_sender_cast; + } + virtual ~ChainingSenderResource() { if (clean_up) diff --git a/src/cpp/rtps/transport/TCPSenderResource.hpp b/src/cpp/rtps/transport/TCPSenderResource.hpp index 086b0e9cb31..7347864c329 100644 --- a/src/cpp/rtps/transport/TCPSenderResource.hpp +++ b/src/cpp/rtps/transport/TCPSenderResource.hpp @@ -18,6 +18,11 @@ #include #include +<<<<<<< HEAD +======= +#include +#include +>>>>>>> e94deb85d (Hotfix TCP sender resources creation (#3932)) #include #include @@ -75,6 +80,17 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource if (sender_resource->kind() == transport.kind()) { returned_resource = dynamic_cast(sender_resource); + + //! May be chained + if (!returned_resource) + { + auto chaining_sender = dynamic_cast(sender_resource); + + if (chaining_sender) + { + returned_resource = dynamic_cast(chaining_sender->lower_sender_cast()); + } + } } return returned_resource; diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 83448999e7f..26a03e3ecc8 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -611,8 +611,10 @@ bool TCPTransportInterface::OpenOutputChannel( { TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, sender_resource.get()); - //TODO Review with wan ip. - if (tcp_sender_resource && physical_locator == tcp_sender_resource->channel()->locator()) + if (tcp_sender_resource && (physical_locator == tcp_sender_resource->channel()->locator() || + (IPLocator::hasWan(locator) && + IPLocator::WanToLanLocator(physical_locator) == + tcp_sender_resource->channel()->locator()))) { // Look for an existing channel that matches this physical locator auto existing_channel = channel_resources_.find(physical_locator); diff --git a/src/cpp/rtps/transport/UDPSenderResource.hpp b/src/cpp/rtps/transport/UDPSenderResource.hpp index d3ea116d21e..6ed793c3f09 100644 --- a/src/cpp/rtps/transport/UDPSenderResource.hpp +++ b/src/cpp/rtps/transport/UDPSenderResource.hpp @@ -18,6 +18,7 @@ #include #include +#include #include namespace eprosima { @@ -93,6 +94,17 @@ class UDPSenderResource : public fastrtps::rtps::SenderResource if (sender_resource->kind() == transport.kind()) { returned_resource = dynamic_cast(sender_resource); + + //! May be chained + if (!returned_resource) + { + auto chaining_sender = dynamic_cast(sender_resource); + + if (chaining_sender) + { + returned_resource = dynamic_cast(chaining_sender->lower_sender_cast()); + } + } } return returned_resource; diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemSenderResource.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemSenderResource.hpp index d6de74132f7..905baa8eb47 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemSenderResource.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemSenderResource.hpp @@ -15,7 +15,13 @@ #ifndef _FASTDDS_SHAREDMEM_SENDERRESOURCE_HPP_ #define _FASTDDS_SHAREDMEM_SENDERRESOURCE_HPP_ +<<<<<<< HEAD #include +======= +#include + +#include +>>>>>>> e94deb85d (Hotfix TCP sender resources creation (#3932)) #include namespace eprosima { @@ -66,6 +72,17 @@ class SharedMemSenderResource : public fastrtps::rtps::SenderResource if (sender_resource->kind() == transport.kind()) { returned_resource = dynamic_cast(sender_resource); + + //! May be chained + if (!returned_resource) + { + auto chaining_sender = dynamic_cast(sender_resource); + + if (chaining_sender) + { + returned_resource = dynamic_cast(chaining_sender->lower_sender_cast()); + } + } } return returned_resource; diff --git a/src/cpp/utils/IPLocator.cpp b/src/cpp/utils/IPLocator.cpp index 81227446065..6f84851e5a8 100644 --- a/src/cpp/utils/IPLocator.cpp +++ b/src/cpp/utils/IPLocator.cpp @@ -713,6 +713,17 @@ std::string IPLocator::toWanstring( return ss.str(); } +Locator_t IPLocator::WanToLanLocator( + const Locator_t& locator) +{ + Locator_t out(locator); + + std::memcpy(out.address + 12, out.address + 8, 4 * sizeof(octet)); + std::memset(out.address + 8, 0, 4 * sizeof(octet)); + + return out; +} + bool IPLocator::setLanID( Locator_t& locator, const std::string& lanId) diff --git a/test/blackbox/common/BlackboxTestsTransportCustom.cpp b/test/blackbox/common/BlackboxTestsTransportCustom.cpp index cb0aa8ea525..8b879bd7249 100644 --- a/test/blackbox/common/BlackboxTestsTransportCustom.cpp +++ b/test/blackbox/common/BlackboxTestsTransportCustom.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include @@ -42,7 +43,6 @@ class TestChainingTransportDescriptor : public eprosima::fastdds::rtps::Chaining eprosima::fastdds::rtps::TransportInterface* create_transport() const override; }; - const std::string test_property_name = "test_property"; const std::string test_property_value = "test_value"; @@ -188,3 +188,121 @@ TEST(ChainingTransportTests, basic_test) ASSERT_TRUE(reader_receive_function_called); ASSERT_TRUE(reader_send_function_called); } + +//! This is a regression test for Redmine #19665 +//! A Participant with an initial peer (client) creates the correct +//! number of sender resources after discovering a participant with +//! a WAN listening address (TCP server) +TEST(ChainingTransportTests, tcp_client_server_with_wan_correct_sender_resources) +{ + std::atomic times_writer_init_function_called {0}; + std::atomic times_writer_receive_function_called{0}; + std::atomic times_writer_send_function_called{0}; + std::atomic times_reader_init_function_called{0}; + std::atomic times_reader_receive_function_called{0}; + std::atomic times_reader_send_function_called{0}; + + eprosima::fastrtps::rtps::PropertyPolicy test_property_policy; + test_property_policy.properties().push_back({test_property_name, test_property_value}); + + uint16_t port = static_cast(GET_PID()); + + if (5000 > port) + { + port += 5000; + } + + std::shared_ptr reader_tcp_transport = + std::make_shared(); + + reader_tcp_transport->set_WAN_address("127.0.0.1"); + reader_tcp_transport->listening_ports.push_back(port); + + eprosima::fastrtps::rtps::LocatorList_t reader_locators; + eprosima::fastrtps::rtps::Locator_t reader_loc; + reader_loc.port = port; + IPLocator::setIPv4(reader_loc, "127.0.0.1"); + reader_loc.kind = LOCATOR_KIND_TCPv4; + reader_locators.push_back(reader_loc); + + std::shared_ptr reader_transport = + std::make_shared(reader_tcp_transport); + reader_transport->init_function_called = [×_reader_init_function_called]() + { + times_reader_init_function_called.fetch_add(1); + }; + reader_transport->receive_function_called = [×_reader_receive_function_called]() + { + times_reader_receive_function_called.fetch_add(1); + }; + reader_transport->send_function_called = [×_reader_send_function_called]() + { + times_reader_send_function_called.fetch_add(1); + }; + + std::shared_ptr writer_tcp_transport = + std::make_shared(); + + std::shared_ptr writer_transport = + std::make_shared(writer_tcp_transport); + writer_transport->init_function_called = [×_writer_init_function_called]() + { + times_writer_init_function_called.fetch_add(1); + }; + writer_transport->receive_function_called = [×_writer_receive_function_called]() + { + times_writer_receive_function_called.fetch_add(1); + }; + writer_transport->send_function_called = [×_writer_send_function_called]() + { + times_writer_send_function_called.fetch_add(1); + }; + + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + eprosima::fastrtps::rtps::LocatorList_t initial_peers; + initial_peers.push_back(reader_loc); + + writer.disable_builtin_transport() + .add_user_transport_to_pparams(writer_transport) + .initial_peers(initial_peers) + .history_depth(10) + .property_policy(test_property_policy) + .init(); + + ASSERT_TRUE(writer.isInitialized()); + + reader.disable_builtin_transport() + .add_user_transport_to_pparams(reader_transport) + .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .property_policy(test_property_policy) + .metatraffic_unicast_locator_list(reader_locators) + .set_default_unicast_locators(reader_locators) + .init(); + + ASSERT_TRUE(reader.isInitialized()); + + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_helloworld_data_generator(1); + reader.startReception(data); + writer.send(data); + ASSERT_TRUE(data.empty()); + reader.block_for_all(); + + ASSERT_EQ(times_writer_init_function_called.load(), 1); + ASSERT_EQ(times_reader_init_function_called.load(), 1); + ASSERT_GE(times_writer_send_function_called.load(), 0); + ASSERT_GE(times_reader_receive_function_called.load(), 0); + + //! If only 1 sender resource was created + //! Expect less than 30 calls in send/receive + //! including discovery phase calls and reception. + //! Else something is wrong, more than one sender resource + //! is being created + ASSERT_LE(times_writer_send_function_called.load(), 30); + ASSERT_LE(times_reader_receive_function_called.load(), 30); +} \ No newline at end of file