Skip to content

Commit

Permalink
Hotfix TCP sender resources creation (#3932)
Browse files Browse the repository at this point in the history
* Refs #19665: Check if casting from a ChainingSender instead of SenderResource to avoid failing the cast

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #19665: Regression test

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #19665: Check if remote WAN  sender resource already exists

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #19655: Linter

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #19665: fix test mac vars init

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #19665: Apply Rev suggestions

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
  • Loading branch information
Mario-DL authored Oct 26, 2023
1 parent 8126a51 commit e94deb8
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 3 deletions.
6 changes: 6 additions & 0 deletions include/fastrtps/utils/IPLocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ class IPLocator
RTPS_DllAPI static std::string toWanstring(
const Locator_t& locator);

//! This method is useful in the case of having a tcp client with an initial peer
//! pointing to a WAN locator, and receiving a locator with LAN and WAN
//! addresses (TCP Client from TCP Server)
RTPS_DllAPI static Locator_t WanToLanLocator(
const Locator_t& locator);

//! Sets locator's LAN ID (as in RTCP protocol)
RTPS_DllAPI static bool setLanID(
Locator_t& locator,
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/transport/ChainingSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ class ChainingSenderResource : public fastrtps::rtps::SenderResource
};
}

fastrtps::rtps::SenderResource* lower_sender_cast()
{
fastrtps::rtps::SenderResource* lower_sender_cast = nullptr;

if (low_sender_resource_)
{
lower_sender_cast = static_cast<fastrtps::rtps::SenderResource*>(low_sender_resource_.get());
}

return lower_sender_cast;
}

virtual ~ChainingSenderResource()
{
if (clean_up)
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/transport/TCPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <fastdds/rtps/common/LocatorsIterator.hpp>
#include <fastdds/rtps/transport/SenderResource.h>

#include <rtps/transport/ChainingSenderResource.hpp>
#include <rtps/transport/TCPChannelResource.h>
#include <rtps/transport/TCPTransportInterface.h>

Expand Down Expand Up @@ -75,6 +76,17 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
if (sender_resource->kind() == transport.kind())
{
returned_resource = dynamic_cast<TCPSenderResource*>(sender_resource);

//! May be chained
if (!returned_resource)
{
auto chaining_sender = dynamic_cast<ChainingSenderResource*>(sender_resource);

if (chaining_sender)
{
returned_resource = dynamic_cast<TCPSenderResource*>(chaining_sender->lower_sender_cast());
}
}
}

return returned_resource;
Expand Down
6 changes: 4 additions & 2 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,8 +624,10 @@ bool TCPTransportInterface::OpenOutputChannel(
{
TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, sender_resource.get());

//TODO Review with wan ip.
if (tcp_sender_resource && physical_locator == tcp_sender_resource->channel()->locator())
if (tcp_sender_resource && (physical_locator == tcp_sender_resource->channel()->locator() ||
(IPLocator::hasWan(locator) &&
IPLocator::WanToLanLocator(physical_locator) ==
tcp_sender_resource->channel()->locator())))
{
// Look for an existing channel that matches this physical locator
auto existing_channel = channel_resources_.find(physical_locator);
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/transport/UDPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/transport/SenderResource.h>

#include <rtps/transport/ChainingSenderResource.hpp>
#include <rtps/transport/UDPTransportInterface.h>

namespace eprosima {
Expand Down Expand Up @@ -93,6 +94,17 @@ class UDPSenderResource : public fastrtps::rtps::SenderResource
if (sender_resource->kind() == transport.kind())
{
returned_resource = dynamic_cast<UDPSenderResource*>(sender_resource);

//! May be chained
if (!returned_resource)
{
auto chaining_sender = dynamic_cast<ChainingSenderResource*>(sender_resource);

if (chaining_sender)
{
returned_resource = dynamic_cast<UDPSenderResource*>(chaining_sender->lower_sender_cast());
}
}
}

return returned_resource;
Expand Down
12 changes: 12 additions & 0 deletions src/cpp/rtps/transport/shared_mem/SharedMemSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <fastdds/rtps/transport/SenderResource.h>

#include <rtps/transport/ChainingSenderResource.hpp>
#include <rtps/transport/shared_mem/SharedMemTransport.h>

namespace eprosima {
Expand Down Expand Up @@ -67,6 +68,17 @@ class SharedMemSenderResource : public fastrtps::rtps::SenderResource
if (sender_resource->kind() == transport.kind())
{
returned_resource = dynamic_cast<SharedMemSenderResource*>(sender_resource);

//! May be chained
if (!returned_resource)
{
auto chaining_sender = dynamic_cast<ChainingSenderResource*>(sender_resource);

if (chaining_sender)
{
returned_resource = dynamic_cast<SharedMemSenderResource*>(chaining_sender->lower_sender_cast());
}
}
}

return returned_resource;
Expand Down
11 changes: 11 additions & 0 deletions src/cpp/utils/IPLocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,17 @@ std::string IPLocator::toWanstring(
return ss.str();
}

Locator_t IPLocator::WanToLanLocator(
const Locator_t& locator)
{
Locator_t out(locator);

std::memcpy(out.address + 12, out.address + 8, 4 * sizeof(octet));
std::memset(out.address + 8, 0, 4 * sizeof(octet));

return out;
}

bool IPLocator::setLanID(
Locator_t& locator,
const std::string& lanId)
Expand Down
120 changes: 119 additions & 1 deletion test/blackbox/common/BlackboxTestsTransportCustom.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <fastdds/rtps/transport/ChainingTransportDescriptor.h>
#include <fastdds/rtps/transport/ChainingTransport.h>
#include <fastdds/rtps/attributes/PropertyPolicy.h>
#include <fastdds/rtps/transport/TCPv4TransportDescriptor.h>

#include <gtest/gtest.h>

Expand All @@ -42,7 +43,6 @@ class TestChainingTransportDescriptor : public eprosima::fastdds::rtps::Chaining
eprosima::fastdds::rtps::TransportInterface* create_transport() const override;
};


const std::string test_property_name = "test_property";
const std::string test_property_value = "test_value";

Expand Down Expand Up @@ -188,3 +188,121 @@ TEST(ChainingTransportTests, basic_test)
ASSERT_TRUE(reader_receive_function_called);
ASSERT_TRUE(reader_send_function_called);
}

//! This is a regression test for Redmine #19665
//! A Participant with an initial peer (client) creates the correct
//! number of sender resources after discovering a participant with
//! a WAN listening address (TCP server)
TEST(ChainingTransportTests, tcp_client_server_with_wan_correct_sender_resources)
{
std::atomic<int> times_writer_init_function_called {0};
std::atomic<int> times_writer_receive_function_called{0};
std::atomic<int> times_writer_send_function_called{0};
std::atomic<int> times_reader_init_function_called{0};
std::atomic<int> times_reader_receive_function_called{0};
std::atomic<int> times_reader_send_function_called{0};

eprosima::fastrtps::rtps::PropertyPolicy test_property_policy;
test_property_policy.properties().push_back({test_property_name, test_property_value});

uint16_t port = static_cast<uint16_t>(GET_PID());

if (5000 > port)
{
port += 5000;
}

std::shared_ptr<eprosima::fastdds::rtps::TCPv4TransportDescriptor> reader_tcp_transport =
std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();

reader_tcp_transport->set_WAN_address("127.0.0.1");
reader_tcp_transport->listening_ports.push_back(port);

eprosima::fastrtps::rtps::LocatorList_t reader_locators;
eprosima::fastrtps::rtps::Locator_t reader_loc;
reader_loc.port = port;
IPLocator::setIPv4(reader_loc, "127.0.0.1");
reader_loc.kind = LOCATOR_KIND_TCPv4;
reader_locators.push_back(reader_loc);

std::shared_ptr<TestChainingTransportDescriptor> reader_transport =
std::make_shared<TestChainingTransportDescriptor>(reader_tcp_transport);
reader_transport->init_function_called = [&times_reader_init_function_called]()
{
times_reader_init_function_called.fetch_add(1);
};
reader_transport->receive_function_called = [&times_reader_receive_function_called]()
{
times_reader_receive_function_called.fetch_add(1);
};
reader_transport->send_function_called = [&times_reader_send_function_called]()
{
times_reader_send_function_called.fetch_add(1);
};

std::shared_ptr<eprosima::fastdds::rtps::TCPv4TransportDescriptor> writer_tcp_transport =
std::make_shared<eprosima::fastdds::rtps::TCPv4TransportDescriptor>();

std::shared_ptr<TestChainingTransportDescriptor> writer_transport =
std::make_shared<TestChainingTransportDescriptor>(writer_tcp_transport);
writer_transport->init_function_called = [&times_writer_init_function_called]()
{
times_writer_init_function_called.fetch_add(1);
};
writer_transport->receive_function_called = [&times_writer_receive_function_called]()
{
times_writer_receive_function_called.fetch_add(1);
};
writer_transport->send_function_called = [&times_writer_send_function_called]()
{
times_writer_send_function_called.fetch_add(1);
};

PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

eprosima::fastrtps::rtps::LocatorList_t initial_peers;
initial_peers.push_back(reader_loc);

writer.disable_builtin_transport()
.add_user_transport_to_pparams(writer_transport)
.initial_peers(initial_peers)
.history_depth(10)
.property_policy(test_property_policy)
.init();

ASSERT_TRUE(writer.isInitialized());

reader.disable_builtin_transport()
.add_user_transport_to_pparams(reader_transport)
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.property_policy(test_property_policy)
.metatraffic_unicast_locator_list(reader_locators)
.set_default_unicast_locators(reader_locators)
.init();

ASSERT_TRUE(reader.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_helloworld_data_generator(1);
reader.startReception(data);
writer.send(data);
ASSERT_TRUE(data.empty());
reader.block_for_all();

ASSERT_EQ(times_writer_init_function_called.load(), 1);
ASSERT_EQ(times_reader_init_function_called.load(), 1);
ASSERT_GE(times_writer_send_function_called.load(), 0);
ASSERT_GE(times_reader_receive_function_called.load(), 0);

//! If only 1 sender resource was created
//! Expect less than 30 calls in send/receive
//! including discovery phase calls and reception.
//! Else something is wrong, more than one sender resource
//! is being created
ASSERT_LE(times_writer_send_function_called.load(), 30);
ASSERT_LE(times_reader_receive_function_called.load(), 30);
}

0 comments on commit e94deb8

Please sign in to comment.