Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20120] TCPSendResources cleanup #4300

Merged
merged 27 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7d5b079
Refs #20120: Remove unused include
jepemi Jan 25, 2024
58f6308
Refs #20120: TCP event call
jepemi Jan 25, 2024
4c44fe4
Refs #20120: Sanitize transport
jepemi Jan 25, 2024
21b3037
Refs #20120: Added tests and minor fixes
jepemi Jan 26, 2024
493f3a6
Refs #20120: Extended doxygen description and added to versions.md
jepemi Jan 26, 2024
89a0fc2
Refs #20120: Uncrustify
jepemi Jan 26, 2024
e3df98c
Refs #20120: Add missing header
jepemi Jan 29, 2024
9ea1d8f
Refs #20120: Fix tests
jepemi Feb 1, 2024
d32ceff
Refs #20120: Uncrustify
jepemi Feb 1, 2024
6cd60e5
Refs #20120: After client-server decision making rebase, not working
jepemi Feb 9, 2024
24f32f6
Refs #20120: Update
jepemi Feb 15, 2024
c9d5caf
Refs #20120: Fix for chaining-transports
jepemi Feb 16, 2024
26130ef
Refs #20120: Add new channel connection status and tests
jepemi Feb 19, 2024
d59be2b
Refs #20120: PR refactor, timed event deleted. cleanup on pdp unbinding
jepemi Feb 20, 2024
6169f41
Refs #20120: Uncrustify
jepemi Feb 21, 2024
8abe004
Refs #20120: Add unittests
jepemi Feb 21, 2024
b6b4cbd
Refs #20120: Fix deadlock
jepemi Feb 22, 2024
f54ea87
Refs #20120: Fix unittest
jepemi Feb 22, 2024
099384b
Refs #20120: Fix asio throwing exceptions
jepemi Feb 22, 2024
deeca26
Refs #20120: Unnittest untab
jepemi Feb 22, 2024
4c6cde2
Refs #20120: Apply suggestions
jepemi Mar 1, 2024
5216d94
Refs #20120: Uncrustify
jepemi Mar 1, 2024
be4991d
Refs #20120: Consider wan case + associated tests
jepemi Mar 1, 2024
04e9ee5
Refs #20120: Remove versions.md update
jepemi Mar 1, 2024
98d0d36
Refs #20120: Fix rebasing wrong deletion
jepemi Mar 4, 2024
8e19c6e
Refs #20120: Delete assert clause
jepemi Mar 4, 2024
b479be8
Refs #20120: Apply suggestions
EduPonz Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refs #20120: Uncrustify
Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
  • Loading branch information
jepemi authored and EduPonz committed Mar 5, 2024
commit 89a0fc284a64120be3c8a8d570ead98577d5064a
8 changes: 4 additions & 4 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ RTPSParticipantImpl::RTPSParticipantImpl(
else
{
sanitize_transports_timer_.reset(new TimedEvent(mp_event_thr, [&]()
{
return sanitize_transports();
}, SANITIZE_TRANSPORTS_INTERVAL_MS));
{
return sanitize_transports();
}, SANITIZE_TRANSPORTS_INTERVAL_MS));

sanitize_transports_timer_->restart_timer();
}

Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ class RTPSParticipantImpl
//! Timed event to check the liveliness of the SendResources.
std::unique_ptr<fastrtps::rtps::TimedEvent> sanitize_transports_timer_;
static constexpr uint16_t SANITIZE_TRANSPORTS_INTERVAL_MS = 7000;

//! Timed event callback to sanitize the transports.
bool sanitize_transports();

Expand Down
11 changes: 7 additions & 4 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1841,11 +1841,14 @@ bool TCPTransportInterface::sanitize_transport(
SendResourceList& send_resource_list) const
{
// Remove send resources with disconnected channels
for (auto it = send_resource_list.begin(); it != send_resource_list.end();) {

for (auto it = send_resource_list.begin(); it != send_resource_list.end();)
{

TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, it->get());

if(tcp_sender_resource && tcp_sender_resource->channel()->connection_status() == TCPChannelResource::eConnectionStatus::eDisconnected)

if (tcp_sender_resource &&
tcp_sender_resource->channel()->connection_status() ==
TCPChannelResource::eConnectionStatus::eDisconnected)
{
it = send_resource_list.erase(it);
}
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ class TCPTransportInterface : public TransportInterface
Locator& locator) const;

/**
* Check and update send resource list.
* Check and update send resource list.
*/
bool sanitize_transport(
SendResourceList& send_resource_list) const;
Expand Down
143 changes: 143 additions & 0 deletions test/blackbox/common/BlackboxTestsTransportTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,149 @@ TEST_P(TransportTCP, multiple_listening_ports)
// Release TCP client and server resources.
delete client_1;
delete client_2;
}

// Test TCPv4 transport sanitizer. This test matches a server with a client, then releases the
// client resources and reinstatiates it. The it waits for the sanitizer to remove the first client
// from the send resource list. Finnally, it checks that the the send_resource_list has size 1.
TEST(TransportTCP, TCPv4_transport_sanitizer)
{
using eprosima::fastdds::rtps::DatagramInjectionTransportDescriptor;

PubSubWriter<HelloWorldPubSubType>* client = new PubSubWriter<HelloWorldPubSubType>(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType>* server = new PubSubReader<HelloWorldPubSubType>(TEST_TOPIC_NAME);

// Server
// Create a server with a DatagramInjectionTransportDescriptor which heritates from
// ChainingTransportDescriptor. This will allow us to get send_resource_list_ from the
// server participant when its transport gets its OpenOutputChannel() method called.
uint16_t server_port = 10000;
auto low_level_server_transport = std::make_shared<TCPv4TransportDescriptor>();
low_level_server_transport->add_listener_port(server_port);
auto server_transport = std::make_shared<DatagramInjectionTransportDescriptor>(low_level_server_transport);
server->disable_builtin_transport().add_user_transport_to_pparams(server_transport).init();
ASSERT_TRUE(server->isInitialized());

// Client
auto initialize_client = [&]() -> PubSubWriter<HelloWorldPubSubType>*
{
auto client_transport = std::make_shared<TCPv4TransportDescriptor>();
client->disable_builtin_transport().add_user_transport_to_pparams(client_transport);
Locator_t initialPeerLocator;
initialPeerLocator.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv6(initialPeerLocator, "::1");
initialPeerLocator.port = server_port;
LocatorList_t initial_peer_list;
initial_peer_list.push_back(initialPeerLocator);
client->initial_peers(initial_peer_list);
client->init();
return client;
};
client = initialize_client();
ASSERT_TRUE(client->isInitialized());

// Wait for discovery. OpenOutputChannel() is called.
client->wait_discovery();
server->wait_discovery();

// We can only update the senders when OpenOutputChannel() is called. If the sanitizer deletes
// the send resource, senders obtained from get_send_resource_list() won't have changed.
auto send_resource_list = server_transport->get_send_resource_list();
ASSERT_TRUE(send_resource_list.size() == 1);

// Release TCP client resources.
delete client;

// Wait for sanitizer to remove client from send_resource_list.
std::this_thread::sleep_for(std::chrono::milliseconds(7000));

client = new PubSubWriter<HelloWorldPubSubType>(TEST_TOPIC_NAME);
client = initialize_client();
ASSERT_TRUE(client->isInitialized());

// Wait for discovery. OpenOutputChannel() is called and we can update the senders.
client->wait_discovery();
server->wait_discovery();

// Check that the send_resource_list has size 1. This means that the sanitizer has removed the
// send resource for the first client and now has a send resource for the second client.
send_resource_list = server_transport->get_send_resource_list();
ASSERT_TRUE(send_resource_list.size() == 1);

// Release TCP client and server resources.
delete client;
delete server;
}

// Test TCPv6 transport sanitizer. This test matches a server with a client, then releases the
// client resources and reinstatiates it. The it waits for the sanitizer to remove the first client
// from the send resource list. Finnally, it checks that the the send_resource_list has size 1.
TEST(TransportTCP, TCPv6_transport_sanitizer)
{
using eprosima::fastdds::rtps::DatagramInjectionTransportDescriptor;

PubSubWriter<HelloWorldPubSubType>* client = new PubSubWriter<HelloWorldPubSubType>(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType>* server = new PubSubReader<HelloWorldPubSubType>(TEST_TOPIC_NAME);

// Server
// Create a server with a DatagramInjectionTransportDescriptor which heritates from
// ChainingTransportDescriptor. This will allow us to get send_resource_list_ from the
// server participant when its transport gets its OpenOutputChannel() method called.
uint16_t server_port = 10000;
auto low_level_server_transport = std::make_shared<TCPv4TransportDescriptor>();
low_level_server_transport->add_listener_port(server_port);
auto server_transport = std::make_shared<DatagramInjectionTransportDescriptor>(low_level_server_transport);
server->disable_builtin_transport().add_user_transport_to_pparams(server_transport).init();
ASSERT_TRUE(server->isInitialized());

// Client
auto initialize_client = [&]() -> PubSubWriter<HelloWorldPubSubType>*
{
auto client_transport = std::make_shared<TCPv4TransportDescriptor>();
client->disable_builtin_transport().add_user_transport_to_pparams(client_transport);
Locator_t initialPeerLocator;
initialPeerLocator.kind = LOCATOR_KIND_TCPv4;
IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1);
initialPeerLocator.port = server_port;
LocatorList_t initial_peer_list;
initial_peer_list.push_back(initialPeerLocator);
client->initial_peers(initial_peer_list);
client->init();
return client;
};
client = initialize_client();
ASSERT_TRUE(client->isInitialized());

// Wait for discovery. OpenOutputChannel() is called.
client->wait_discovery();
server->wait_discovery();

// We can only update the senders when OpenOutputChannel() is called. If the sanitizer deletes
// the send resource, senders obtained from get_send_resource_list() won't have changed.
auto send_resource_list = server_transport->get_send_resource_list();
ASSERT_TRUE(send_resource_list.size() == 1);

// Release TCP client resources.
delete client;

// Wait for sanitizer to remove client from send_resource_list.
std::this_thread::sleep_for(std::chrono::milliseconds(7000));

client = new PubSubWriter<HelloWorldPubSubType>(TEST_TOPIC_NAME);
client = initialize_client();
ASSERT_TRUE(client->isInitialized());

// Wait for discovery. OpenOutputChannel() is called and we can update the senders.
client->wait_discovery();
server->wait_discovery();

// Check that the send_resource_list has size 1. This means that the sanitizer has removed the
// send resource for the first client and now has a send resource for the second client.
send_resource_list = server_transport->get_send_resource_list();
ASSERT_TRUE(send_resource_list.size() == 1);

// Release TCP client and server resources.
delete client;
delete server;
}

Expand Down
3 changes: 2 additions & 1 deletion test/blackbox/common/DatagramInjectionTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ void DatagramInjectionTransportDescriptor::update_send_resource_list(
std::lock_guard<std::mutex> guard(mtx_);

send_resource_list_.clear();
for (const auto& resource : send_resource_list) {
for (const auto& resource : send_resource_list)
{
send_resource_list_.insert(resource.get());
}
}
Expand Down