Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[19665] Hotfix TCP sender resources creation #3932

Merged
merged 6 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/fastrtps/utils/IPLocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ class IPLocator
RTPS_DllAPI static std::string toWanstring(
const Locator_t& locator);

//! This method is useful in the case of having a tcp client with an initial peer
//! pointing to a WAN locator, and receiving a locator with LAN and WAN
//! addresses (TCP Client from TCP Server)
RTPS_DllAPI static Locator_t WanToLanLocator(
const Locator_t& locator);

//! Sets locator's LAN ID (as in RTCP protocol)
RTPS_DllAPI static bool setLanID(
Locator_t& locator,
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/transport/ChainingSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ class ChainingSenderResource : public fastrtps::rtps::SenderResource
};
}

fastrtps::rtps::SenderResource* lower_sender_cast()
{
fastrtps::rtps::SenderResource* lower_sender_cast = nullptr;

if (low_sender_resource_)
{
lower_sender_cast = static_cast<fastrtps::rtps::SenderResource*>(low_sender_resource_.get());
}

return lower_sender_cast;
}

virtual ~ChainingSenderResource()
{
if (clean_up)
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/transport/TCPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <fastdds/rtps/common/LocatorsIterator.hpp>
#include <fastdds/rtps/transport/SenderResource.h>

#include <rtps/transport/ChainingSenderResource.hpp>
#include <rtps/transport/TCPChannelResource.h>
#include <rtps/transport/TCPTransportInterface.h>

Expand Down Expand Up @@ -75,6 +76,17 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
if (sender_resource->kind() == transport.kind())
{
returned_resource = dynamic_cast<TCPSenderResource*>(sender_resource);

//! May be chained
if (!returned_resource)
{
auto chaining_sender = dynamic_cast<ChainingSenderResource*>(sender_resource);

if (chaining_sender)
{
returned_resource = dynamic_cast<TCPSenderResource*>(chaining_sender->lower_sender_cast());
}
}
}

return returned_resource;
Expand Down
6 changes: 4 additions & 2 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,8 +624,10 @@ bool TCPTransportInterface::OpenOutputChannel(
{
TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, sender_resource.get());

//TODO Review with wan ip.
if (tcp_sender_resource && physical_locator == tcp_sender_resource->channel()->locator())
if (tcp_sender_resource && (physical_locator == tcp_sender_resource->channel()->locator() ||
(IPLocator::hasWan(locator) &&
IPLocator::WanToLanLocator(physical_locator) ==
tcp_sender_resource->channel()->locator())))
{
// Look for an existing channel that matches this physical locator
auto existing_channel = channel_resources_.find(physical_locator);
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/transport/UDPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/transport/SenderResource.h>

#include <rtps/transport/ChainingSenderResource.hpp>
#include <rtps/transport/UDPTransportInterface.h>

namespace eprosima {
Expand Down Expand Up @@ -93,6 +94,17 @@ class UDPSenderResource : public fastrtps::rtps::SenderResource
if (sender_resource->kind() == transport.kind())
{
returned_resource = dynamic_cast<UDPSenderResource*>(sender_resource);

//! May be chained
if (!returned_resource)
{
auto chaining_sender = dynamic_cast<ChainingSenderResource*>(sender_resource);

if (chaining_sender)
{
returned_resource = dynamic_cast<UDPSenderResource*>(chaining_sender->lower_sender_cast());
}
}
}

return returned_resource;
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/transport/shared_mem/SharedMemSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <fastdds/rtps/transport/SenderResource.h>

#include <rtps/transport/ChainingSenderResource.hpp>
#include <rtps/transport/shared_mem/SharedMemTransport.h>

namespace eprosima {
Expand Down Expand Up @@ -67,6 +68,17 @@ class SharedMemSenderResource : public fastrtps::rtps::SenderResource
if (sender_resource->kind() == transport.kind())
{
returned_resource = dynamic_cast<SharedMemSenderResource*>(sender_resource);

//! May be chained
if (!returned_resource)
{
auto chaining_sender = dynamic_cast<ChainingSenderResource*>(sender_resource);

if (chaining_sender)
{
returned_resource = dynamic_cast<SharedMemSenderResource*>(chaining_sender->lower_sender_cast());
}
}
}

return returned_resource;
Expand Down
11 changes: 11 additions & 0 deletions src/cpp/utils/IPLocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,17 @@ std::string IPLocator::toWanstring(
return ss.str();
}

Locator_t IPLocator::WanToLanLocator(
const Locator_t& locator)
{
Locator_t out(locator);

std::memcpy(out.address + 12, out.address + 8, 4 * sizeof(octet));
std::memset(out.address + 8, 0, 4 * sizeof(octet));

return out;
}

bool IPLocator::setLanID(
Locator_t& locator,
const std::string& lanId)
Expand Down
120 changes: 119 additions & 1 deletion test/blackbox/common/BlackboxTestsTransportCustom.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <fastdds/rtps/transport/ChainingTransportDescriptor.h>
#include <fastdds/rtps/transport/ChainingTransport.h>
#include <fastdds/rtps/attributes/PropertyPolicy.h>
#include <fastdds/rtps/transport/TCPv4TransportDescriptor.h>

#include <gtest/gtest.h>

Expand All @@ -42,7 +43,6 @@ class TestChainingTransportDescriptor : public eprosima::fastdds::rtps::Chaining
eprosima::fastdds::rtps::TransportInterface* create_transport() const override;
};


const std::string test_property_name = "test_property";
const std::string test_property_value = "test_value";

Expand Down Expand Up @@ -188,3 +188,121 @@ TEST(ChainingTransportTests, basic_test)
ASSERT_TRUE(reader_receive_function_called);
ASSERT_TRUE(reader_send_function_called);
}

//! This is a regression test for Redmine #19665
//! A Participant with an initial peer (client) creates the correct
//! number of sender resources after discovering a participant with
//! a WAN listening address (TCP server)
TEST(ChainingTransportTests, tcp_client_server_with_wan_correct_sender_resources)
{
std::atomic<int> times_writer_init_function_called {0};
std::atomic<int> times_writer_receive_function_called{0};
std::atomic<int> times_writer_send_function_called{0};
std::atomic<int> times_reader_init_function_called{0};
std::atomic<int> times_reader_receive_function_called{0};
std::atomic<int> times_reader_send_function_called{0};

eprosima::fastrtps::rtps::PropertyPolicy test_property_policy;
test_property_policy.properties().push_back({test_property_name, test_property_value});

uint16_t port = static_cast<uint16_t>(GET_PID());

if (5000 > port)
{
port += 5000;
}

std::shared_ptr<eprosima::fastdds::rtps::TCPv4TransportDescriptor> reader_tcp_transport =
std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();

reader_tcp_transport->set_WAN_address("127.0.0.1");
reader_tcp_transport->listening_ports.push_back(port);

eprosima::fastrtps::rtps::LocatorList_t reader_locators;
eprosima::fastrtps::rtps::Locator_t reader_loc;
reader_loc.port = port;
IPLocator::setIPv4(reader_loc, "127.0.0.1");
reader_loc.kind = LOCATOR_KIND_TCPv4;
reader_locators.push_back(reader_loc);

std::shared_ptr<TestChainingTransportDescriptor> reader_transport =
std::make_shared<TestChainingTransportDescriptor>(reader_tcp_transport);
reader_transport->init_function_called = [&times_reader_init_function_called]()
{
times_reader_init_function_called.fetch_add(1);
};
reader_transport->receive_function_called = [&times_reader_receive_function_called]()
{
times_reader_receive_function_called.fetch_add(1);
};
reader_transport->send_function_called = [&times_reader_send_function_called]()
{
times_reader_send_function_called.fetch_add(1);
};

std::shared_ptr<eprosima::fastdds::rtps::TCPv4TransportDescriptor> writer_tcp_transport =
std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();

std::shared_ptr<TestChainingTransportDescriptor> writer_transport =
std::make_shared<TestChainingTransportDescriptor>(writer_tcp_transport);
writer_transport->init_function_called = [&times_writer_init_function_called]()
{
times_writer_init_function_called.fetch_add(1);
};
writer_transport->receive_function_called = [&times_writer_receive_function_called]()
{
times_writer_receive_function_called.fetch_add(1);
};
writer_transport->send_function_called = [&times_writer_send_function_called]()
{
times_writer_send_function_called.fetch_add(1);
};

PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

eprosima::fastrtps::rtps::LocatorList_t initial_peers;
initial_peers.push_back(reader_loc);

writer.disable_builtin_transport()
.add_user_transport_to_pparams(writer_transport)
.initial_peers(initial_peers)
.history_depth(10)
.property_policy(test_property_policy)
.init();

ASSERT_TRUE(writer.isInitialized());

reader.disable_builtin_transport()
.add_user_transport_to_pparams(reader_transport)
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.property_policy(test_property_policy)
.metatraffic_unicast_locator_list(reader_locators)
.set_default_unicast_locators(reader_locators)
.init();

ASSERT_TRUE(reader.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_helloworld_data_generator(1);
reader.startReception(data);
writer.send(data);
ASSERT_TRUE(data.empty());
reader.block_for_all();

ASSERT_EQ(times_writer_init_function_called.load(), 1);
ASSERT_EQ(times_reader_init_function_called.load(), 1);
ASSERT_GE(times_writer_send_function_called.load(), 0);
ASSERT_GE(times_reader_receive_function_called.load(), 0);

//! If only 1 sender resource was created
//! Expect less than 30 calls in send/receive
//! including discovery phase calls and reception.
//! Else something is wrong, more than one sender resource
//! is being created
ASSERT_LE(times_writer_send_function_called.load(), 30);
richiware marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_LE(times_reader_receive_function_called.load(), 30);
}
Loading