diff --git a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp index 1c2259f8922..5bfc9177b2d 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp @@ -44,6 +44,8 @@ #include <fastrtps/types/TypeObjectFactory.h> #include <fastrtps/types/DynamicPubSubType.h> +#include <fastdds/rtps/common/LocatorList.hpp> + #include <fastrtps/utils/TimeConversion.h> #include <fastrtps/utils/IPLocator.h> #include "fastrtps/utils/shared_mutex.hpp" @@ -1239,6 +1241,21 @@ bool PDP::remove_remote_participant( this->mp_mutex->lock(); + // Delete from sender resource list (TCP only) + LocatorList_t remote_participant_locators; + for (auto& remote_participant_default_locator : pdata->default_locators.unicast) + { + remote_participant_locators.push_back(remote_participant_default_locator); + } + for (auto& remote_participant_metatraffic_locator : pdata->metatraffic_locators.unicast) + { + remote_participant_locators.push_back(remote_participant_metatraffic_locator); + } + if (!remote_participant_locators.empty()) + { + mp_RTPSParticipant->update_removed_participant(remote_participant_locators); + } + // Return reader proxy objects to pool for (auto pit : *pdata->m_readers) { @@ -1266,6 +1283,7 @@ bool PDP::remove_remote_participant( participant_proxies_pool_.push_back(pdata); this->mp_mutex->unlock(); + return true; } diff --git a/src/cpp/rtps/network/NetworkFactory.cpp b/src/cpp/rtps/network/NetworkFactory.cpp index 2208827a1ea..1773038cfa4 100644 --- a/src/cpp/rtps/network/NetworkFactory.cpp +++ b/src/cpp/rtps/network/NetworkFactory.cpp @@ -18,12 +18,13 @@ #include <utility> #include <fastdds/rtps/common/Guid.h> +#include <fastdds/rtps/common/LocatorList.hpp> #include <fastdds/rtps/participant/RTPSParticipant.h> #include <fastdds/rtps/transport/TransportDescriptorInterface.h> #include <fastrtps/utils/IPFinder.h> #include <fastrtps/utils/IPLocator.h> -#include <rtps/transport/UDPv4Transport.h> +#include <rtps/transport/TCPTransportInterface.h> using namespace std; using namespace eprosima::fastdds::rtps; @@ -471,6 +472,24 @@ void NetworkFactory::update_network_interfaces() } } +void NetworkFactory::remove_participant_associated_send_resources( + SendResourceList& send_resource_list, + const LocatorList_t& remote_participant_locators, + const LocatorList_t& participant_initial_peers) const +{ + for (auto& transport : mRegisteredTransports) + { + TCPTransportInterface* tcp_transport = dynamic_cast<TCPTransportInterface*>(transport.get()); + if (tcp_transport) + { + tcp_transport->CloseOutputChannel( + send_resource_list, + remote_participant_locators, + participant_initial_peers); + } + } +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/network/NetworkFactory.h b/src/cpp/rtps/network/NetworkFactory.h index 53e2df5fd1c..1a679b66754 100644 --- a/src/cpp/rtps/network/NetworkFactory.h +++ b/src/cpp/rtps/network/NetworkFactory.h @@ -19,6 +19,7 @@ #include <memory> #include <fastdds/rtps/common/Locator.h> +#include <fastdds/rtps/common/LocatorList.hpp> #include <fastdds/rtps/common/LocatorSelector.hpp> #include <fastdds/rtps/messages/MessageReceiver.h> #include <fastdds/rtps/transport/SenderResource.h> @@ -246,6 +247,18 @@ class NetworkFactory */ void update_network_interfaces(); + /** + * Remove the given participants from the send resource list + * + * @param send_resource_list List of send resources associated to the local participant. + * @param remote_participant_locators List of locators associated to the remote participant. + * @param participant_initial_peers List of locators of the initial peers of the local participant. + */ + void remove_participant_associated_send_resources( + fastdds::rtps::SendResourceList& send_resource_list, + const LocatorList_t& remote_participant_locators, + const LocatorList_t& participant_initial_peers) const; + private: std::vector<std::unique_ptr<fastdds::rtps::TransportInterface>> mRegisteredTransports; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 9165f1cb95c..da7b2da2476 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -51,6 +51,8 @@ #include <fastdds/rtps/writer/StatelessPersistentWriter.h> #include <fastdds/rtps/writer/StatefulPersistentWriter.h> +#include <fastdds/rtps/common/LocatorList.hpp> + #include <fastrtps/utils/IPFinder.h> #include <fastrtps/utils/Semaphore.h> #include <fastrtps/xmlparser/XMLProfileManager.h> @@ -2982,6 +2984,19 @@ bool RTPSParticipantImpl::should_match_local_endpoints( return should_match_local_endpoints; } +void RTPSParticipantImpl::update_removed_participant( + const LocatorList_t& remote_participant_locators) +{ + if (!remote_participant_locators.empty()) + { + std::lock_guard<std::timed_mutex> guard(m_send_resources_mutex_); + m_network_Factory.remove_participant_associated_send_resources( + send_resource_list_, + remote_participant_locators, + m_att.builtin.initialPeersList); + } +} + } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 644437861f0..951fa092948 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -40,6 +40,7 @@ #include <fastdds/rtps/builtin/data/ReaderProxyData.h> #include <fastdds/rtps/builtin/data/WriterProxyData.h> #include <fastdds/rtps/common/Guid.h> +#include <fastdds/rtps/common/LocatorList.hpp> #include <fastdds/rtps/history/IChangePool.h> #include <fastdds/rtps/history/IPayloadPool.h> #include <fastdds/rtps/messages/MessageReceiver.h> @@ -1266,6 +1267,14 @@ class RTPSParticipantImpl return match_local_endpoints_; } + /** + * Method called on participant removal with the set of locators associated to the participant. + * + * @param remote_participant_locators Set of locators associated to the participant removed. + */ + void update_removed_participant( + const LocatorList_t& remote_participant_locators); + }; } // namespace rtps } /* namespace rtps */ diff --git a/src/cpp/rtps/transport/TCPSenderResource.hpp b/src/cpp/rtps/transport/TCPSenderResource.hpp index 416ea94adfd..6a6bcc46392 100644 --- a/src/cpp/rtps/transport/TCPSenderResource.hpp +++ b/src/cpp/rtps/transport/TCPSenderResource.hpp @@ -39,7 +39,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource // Implementation functions are bound to the right transport parameters clean_up = [this, &transport]() { - transport.CloseOutputChannel(locator_); + transport.SenderResourceHasBeenClosed(locator_); }; send_lambda_ = [this, &transport]( @@ -68,7 +68,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource } static TCPSenderResource* cast( - TransportInterface& transport, + const TransportInterface& transport, SenderResource* sender_resource) { TCPSenderResource* returned_resource = nullptr; diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 75980fd33fa..f848f932bfe 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -19,6 +19,7 @@ #include <chrono> #include <cstring> #include <map> +#include <set> #include <memory> #include <mutex> #include <string> @@ -276,6 +277,19 @@ Locator TCPTransportInterface::remote_endpoint_to_locator( return locator; } +Locator TCPTransportInterface::local_endpoint_to_locator( + const std::shared_ptr<TCPChannelResource>& channel) const +{ + Locator locator; + asio::error_code ec; + endpoint_to_locator(channel->local_endpoint(ec), locator); + if (ec) + { + LOCATOR_INVALID(locator); + } + return locator; +} + void TCPTransportInterface::bind_socket( std::shared_ptr<TCPChannelResource>& channel) { @@ -634,11 +648,24 @@ bool TCPTransportInterface::transform_remote_locator( return false; } -void TCPTransportInterface::CloseOutputChannel( +void TCPTransportInterface::SenderResourceHasBeenClosed( fastrtps::rtps::Locator_t& locator) { - locator.set_Invalid_Address(); - locator.port = 0; + // The TCPSendResource associated channel cannot be removed from the channel_resources_ map. On transport's destruction + // this map is consulted to send the unbind requests. If not sending it, the other participant wouldn't disconnect the + // socket and keep a connection status of eEstablished. This would prevent new connect calls since it thinks it's already + // connected. + // If moving this unbind send with the respective channel disconnection to this point, the following problem arises: + // If receiving a SenderResourceHasBeenClosed call after receiving an unbinding message from a remote participant (our participant + // isn't disconnecting but we want to erase this send resource), the channel cannot be disconnected here since the listening thread has + // taken the read mutex (permanently waiting at read asio layer). This mutex is also needed to disconnect the socket (deadlock). + // Socket disconnection should always be done in the listening thread (or in the transport cleanup, when receiver resources have + // already been destroyed and the listening thread had consequently finished). + // An assert() clause finding the respective channel resource cannot be made since in LARGE DATA scenario, where the PDP discovery is done + // via UDP, a server's send resource can be created with without any associated channel resource until receiving a connection request from + // the client. + // The send resource locator is invalidated to prevent further use of associated channel. + LOCATOR_INVALID(locator); } bool TCPTransportInterface::CloseInputChannel( @@ -1187,7 +1214,6 @@ bool TCPTransportInterface::Receive( { std::shared_ptr<RTCPMessageManager> rtcp_message_manager; if (TCPChannelResource::eConnectionStatus::eDisconnected != channel->connection_status()) - { std::unique_lock<std::mutex> lock(rtcp_message_manager_mutex_); rtcp_message_manager = rtcp_manager.lock(); @@ -1436,10 +1462,8 @@ void TCPTransportInterface::SocketAccepted( create_listening_thread(channel); EPROSIMA_LOG_INFO(RTCP, "Accepted connection (local: " - << channel->local_endpoint().address() << ":" - << channel->local_endpoint().port() << "), remote: " - << channel->remote_endpoint().address() << ":" - << channel->remote_endpoint().port() << ")"); + << local_endpoint_to_locator(channel) << ", remote: " + << remote_endpoint_to_locator(channel) << ")"); } else { @@ -1481,10 +1505,8 @@ void TCPTransportInterface::SecureSocketAccepted( create_listening_thread(secure_channel); EPROSIMA_LOG_INFO(RTCP, " Accepted connection (local: " - << socket->lowest_layer().local_endpoint().address() << ":" - << socket->lowest_layer().local_endpoint().port() << "), remote: " - << socket->lowest_layer().remote_endpoint().address() << ":" - << socket->lowest_layer().remote_endpoint().port() << ")"); + << local_endpoint_to_locator(secure_channel) << ", remote: " + << remote_endpoint_to_locator(secure_channel) << ")"); } else { @@ -1837,6 +1859,60 @@ void TCPTransportInterface::fill_local_physical_port( } } +void TCPTransportInterface::CloseOutputChannel( + SendResourceList& send_resource_list, + const LocatorList& remote_participant_locators, + const LocatorList& participant_initial_peers) const +{ + // Since send resources handle physical locators, we need to convert the remote participant locators to physical + std::set<Locator> remote_participant_physical_locators; + for (const Locator& remote_participant_locator : remote_participant_locators) + { + remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(remote_participant_locator)); + + // Also add the WANtoLANLocator ([0][WAN] address) if the remote locator is a WAN locator. In WAN scenario, + //initial peer can also work with the WANtoLANLocator of the remote participant. + if (IPLocator::hasWan(remote_participant_locator)) + { + remote_participant_physical_locators.insert(IPLocator::toPhysicalLocator(IPLocator::WanToLanLocator( + remote_participant_locator))); + } + } + + // Exlude initial peers. + for (const auto& initial_peer : participant_initial_peers) + { + if (std::find(remote_participant_physical_locators.begin(), remote_participant_physical_locators.end(), + IPLocator::toPhysicalLocator(initial_peer)) != remote_participant_physical_locators.end()) + { + remote_participant_physical_locators.erase(IPLocator::toPhysicalLocator(initial_peer)); + } + } + + for (const auto& remote_participant_physical_locator : remote_participant_physical_locators) + { + if (!IsLocatorSupported(remote_participant_physical_locator)) + { + continue; + } + // Remove send resources for the associated remote participant locator + for (auto it = send_resource_list.begin(); it != send_resource_list.end();) + { + TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, it->get()); + + if (tcp_sender_resource) + { + if (tcp_sender_resource->locator() == remote_participant_physical_locator) + { + it = send_resource_list.erase(it); + continue; + } + } + ++it; + } + } +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index bf5217c0b2f..47d1f29409a 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -183,6 +183,12 @@ class TCPTransportInterface : public TransportInterface Locator remote_endpoint_to_locator( const std::shared_ptr<TCPChannelResource>& channel) const; + /** + * Converts a local endpoint to a locator if possible. Otherwise, it sets an invalid locator. + */ + Locator local_endpoint_to_locator( + const std::shared_ptr<TCPChannelResource>& channel) const; + /** * Shutdown method to close the connections of the transports. */ @@ -228,7 +234,7 @@ class TCPTransportInterface : public TransportInterface const Locator&) override; //! Resets the locator bound to the sender resource. - void CloseOutputChannel( + void SenderResourceHasBeenClosed( fastrtps::rtps::Locator_t& locator); //! Reports whether Locators correspond to the same port. @@ -460,6 +466,18 @@ class TCPTransportInterface : public TransportInterface void fill_local_physical_port( Locator& locator) const; + /** + * Close the output channel associated to the given remote participant but if its locators belong to the + * given list of initial peers. + * + * @param send_resource_list List of send resources associated to the local participant. + * @param remote_participant_locators Set of locators associated to the remote participant. + * @param participant_initial_peers List of locators associated to the initial peers of the local participant. + */ + void CloseOutputChannel( + SendResourceList& send_resource_list, + const LocatorList& remote_participant_locators, + const LocatorList& participant_initial_peers) const; }; } // namespace rtps diff --git a/src/cpp/rtps/transport/UDPSenderResource.hpp b/src/cpp/rtps/transport/UDPSenderResource.hpp index 4db78236c72..70165141f80 100644 --- a/src/cpp/rtps/transport/UDPSenderResource.hpp +++ b/src/cpp/rtps/transport/UDPSenderResource.hpp @@ -43,7 +43,7 @@ class UDPSenderResource : public fastrtps::rtps::SenderResource // Implementation functions are bound to the right transport parameters clean_up = [this, &transport]() { - transport.CloseOutputChannel(socket_); + transport.SenderResourceHasBeenClosed(socket_); }; send_lambda_ = [this, &transport]( diff --git a/src/cpp/rtps/transport/UDPTransportInterface.cpp b/src/cpp/rtps/transport/UDPTransportInterface.cpp index 820056e26b3..5c5feaa400c 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.cpp +++ b/src/cpp/rtps/transport/UDPTransportInterface.cpp @@ -102,7 +102,7 @@ bool UDPTransportInterface::CloseInputChannel( return true; } -void UDPTransportInterface::CloseOutputChannel( +void UDPTransportInterface::SenderResourceHasBeenClosed( eProsimaUDPSocket& socket) { socket.cancel(); diff --git a/src/cpp/rtps/transport/UDPTransportInterface.h b/src/cpp/rtps/transport/UDPTransportInterface.h index 287aa5b32b8..655ab4aad85 100644 --- a/src/cpp/rtps/transport/UDPTransportInterface.h +++ b/src/cpp/rtps/transport/UDPTransportInterface.h @@ -48,7 +48,7 @@ class UDPTransportInterface : public TransportInterface const Locator&) override; //! Removes all outbound sockets on the given port. - void CloseOutputChannel( + void SenderResourceHasBeenClosed( eProsimaUDPSocket& socket); //! Reports whether Locators correspond to the same port. diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index d7ed5b9d1fe..53197968c52 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -27,6 +27,7 @@ #include "../api/dds-pim/TCPReqRepHelloWorldReplier.hpp" #include "PubSubReader.hpp" #include "PubSubWriter.hpp" +#include "DatagramInjectionTransport.hpp" using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; @@ -929,6 +930,296 @@ TEST_P(TransportTCP, multiple_listening_ports) delete server; } +// Test TCP send resource cleaning. This test matches a server with a client and then releases the +// client resources. After PDP unbind message, the server removes the client +// from the send resource list. +TEST_P(TransportTCP, send_resource_cleanup) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + + using eprosima::fastdds::rtps::DatagramInjectionTransportDescriptor; + + std::unique_ptr<PubSubWriter<HelloWorldPubSubType>> client(new PubSubWriter<HelloWorldPubSubType>(TEST_TOPIC_NAME)); + std::unique_ptr<PubSubWriter<HelloWorldPubSubType>> udp_participant(new PubSubWriter<HelloWorldPubSubType>( + TEST_TOPIC_NAME)); + std::unique_ptr<PubSubReader<HelloWorldPubSubType>> server(new PubSubReader<HelloWorldPubSubType>(TEST_TOPIC_NAME)); + + // Server + // Create a server with two transports, one of which uses a DatagramInjectionTransportDescriptor + // which heritates from ChainingTransportDescriptor. The low level transport of this chaining transport will be UDP. + // This will allow us to get send_resource_list_ from the server participant when UDP transport gets its OpenOutputChannel() + // method called. This should happen after TCP transports connection is established. We can then see how many TCP send + // resources exist. + // For the cleanup test we follow that same procedure. Firstly we destroy both participants and then instantiate a new + // UDP participant. The send resource list will get updated with no TCP send resource. + // __________________________________________________________ _____________________ + // | Server | | Client | + // | | | | + // | SendResourceList | | | + // | | | | | + // | Empty | | | + // | | | | | + // | | - TCPv4 init() | | | + // | | | | | + // | | - ChainingTransport(UDP) init() | | | + // | | | | | + // | 1 TCP <------------------------------------------------- TCPv4 init() | + // | | | | | + // | 1 TCP + 1 UDP <------------------------------------------------- UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 1 get_send_resource_list() | | | + // | | | | | + // | Empty <-------------------------------------------------- clean transports | + // | | | | | + // | 1 UDP - ChainingTransport(UDP) <------------------------ UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 0 get_send_resource_list() | | | + // |__________________________________________________________| |_____________________| + // + uint16_t server_port = 10000; + test_transport_->add_listener_port(server_port); + auto low_level_transport = std::make_shared<UDPv4TransportDescriptor>(); + auto server_chaining_transport = std::make_shared<DatagramInjectionTransportDescriptor>(low_level_transport); + server->disable_builtin_transport().add_user_transport_to_pparams(test_transport_).add_user_transport_to_pparams( + server_chaining_transport).init(); + ASSERT_TRUE(server->isInitialized()); + + // Client + auto initialize_client = [&](PubSubWriter<HelloWorldPubSubType>* client) + { + std::shared_ptr<TCPTransportDescriptor> client_transport; + Locator_t initialPeerLocator; + if (use_ipv6) + { + client_transport = std::make_shared<TCPv6TransportDescriptor>(); + initialPeerLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator, "::1"); + } + else + { + client_transport = std::make_shared<TCPv4TransportDescriptor>(); + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + } + client->disable_builtin_transport().add_user_transport_to_pparams(client_transport); + initialPeerLocator.port = server_port; + LocatorList_t initial_peer_list; + initial_peer_list.push_back(initialPeerLocator); + client->initial_peers(initial_peer_list); + client->init(); + }; + auto initialize_udp_participant = [&](PubSubWriter<HelloWorldPubSubType>* udp_participant) + { + auto udp_participant_transport = std::make_shared<UDPv4TransportDescriptor>(); + udp_participant->disable_builtin_transport().add_user_transport_to_pparams(udp_participant_transport); + udp_participant->init(); + }; + initialize_client(client.get()); + ASSERT_TRUE(client->isInitialized()); + + // Wait for discovery. OpenOutputChannel() is called. We create a udp participant after to guarantee + // that the TCP participants have been mutually discovered when OpenOutputChannel() is called. + server->wait_discovery(std::chrono::seconds(0), 1); + client->wait_discovery(1, std::chrono::seconds(0)); + + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + server->wait_discovery(std::chrono::seconds(0), 2); + udp_participant->wait_discovery(1, std::chrono::seconds(0)); + + // We can only update the senders when OpenOutputChannel() is called. If the send resource + // is deleted later, senders obtained from get_send_resource_list() won't have changed. + auto send_resource_list = server_chaining_transport->get_send_resource_list(); + auto tcp_send_resources = [](const std::set<SenderResource*>& send_resource_list) -> size_t + { + size_t tcp_send_resources = 0; + for (auto& sender_resource : send_resource_list) + { + if (sender_resource->kind() == LOCATOR_KIND_TCPv4 || sender_resource->kind() == LOCATOR_KIND_TCPv6) + { + tcp_send_resources++; + } + } + return tcp_send_resources; + }; + EXPECT_EQ(tcp_send_resources(send_resource_list), 1); + + // Release TCP client resources. + client.reset(); + udp_participant.reset(); + + // Wait for undiscovery. + server->wait_writer_undiscovery(); + + // Create new udp client. + udp_participant.reset(new PubSubWriter<HelloWorldPubSubType>(TEST_TOPIC_NAME)); + + // Wait for discovery. OpenOutputChannel() is called and we can update the senders. + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + server->wait_discovery(std::chrono::seconds(0), 1); + udp_participant->wait_discovery(1, std::chrono::seconds(0)); + + // Check that the send_resource_list has size 0. This means that the send resource + // for the client has been removed. + send_resource_list = server_chaining_transport->get_send_resource_list(); + EXPECT_EQ(tcp_send_resources(send_resource_list), 0); + send_resource_list.clear(); +} + +// Test TCP send resource cleaning. In this case, since the send resource has been created from an initial_peer, +// the send resource should not be removed. +TEST_P(TransportTCP, send_resource_cleanup_initial_peer) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + + using eprosima::fastdds::rtps::DatagramInjectionTransportDescriptor; + + std::unique_ptr<PubSubWriter<HelloWorldPubSubType>> client(new PubSubWriter<HelloWorldPubSubType>(TEST_TOPIC_NAME)); + std::unique_ptr<PubSubReader<HelloWorldPubSubType>> udp_participant(new PubSubReader<HelloWorldPubSubType>( + TEST_TOPIC_NAME)); + std::unique_ptr<PubSubReader<HelloWorldPubSubType>> server(new PubSubReader<HelloWorldPubSubType>(TEST_TOPIC_NAME)); + + // Client + // Create a client with two transports, one of which uses a DatagramInjectionTransportDescriptor + // which heritates from ChainingTransportDescriptor. This will allow us to get send_resource_list_ + // from the client participant when its transport gets its OpenOutputChannel() method called. + + // __________________________________________________________ _____________________ + // | Server | | Client | + // | | | | + // | SendResourceList | | | + // | | | | | + // | Empty | | | + // | | | | | + // | | - TCPv4 init() | | | + // | | | | | + // | | - ChainingTransport(UDP) init() | | | + // | | | | | + // | 1 TCP <------------------------------------------------- TCPv4 init() | + // | | | | | + // | 1 TCP + 1 UDP <------------------------------------------------- UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 1 get_send_resource_list() | | | + // | | | | | + // | 1 TCP (initial peer) <-------------------------------------------------- clean transports | + // | | | | | + // | 1 TCP + 1 UDP - ChainingTransport(UDP) <------------------------ UDPv4 init() | + // | | | | | + // | | - ChainingTransport-> | | | + // | TCP SendResources == 1 get_send_resource_list() | | | + // | (initial peer) | | | + // |__________________________________________________________| |_____________________| + // + + uint16_t server_port = 10000; + LocatorList_t initial_peer_list; + Locator_t initialPeerLocator; + if (use_ipv6) + { + initialPeerLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator, "::1"); + } + else + { + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + } + initialPeerLocator.port = server_port; + initial_peer_list.push_back(initialPeerLocator); + client->initial_peers(initial_peer_list); + + auto low_level_transport = std::make_shared<UDPv4TransportDescriptor>(); + auto client_chaining_transport = std::make_shared<DatagramInjectionTransportDescriptor>(low_level_transport); + client->disable_builtin_transport().add_user_transport_to_pparams(test_transport_).add_user_transport_to_pparams( + client_chaining_transport).init(); + ASSERT_TRUE(client->isInitialized()); + + // Server + auto initialize_server = [&](PubSubReader<HelloWorldPubSubType>* server) + { + std::shared_ptr<TCPTransportDescriptor> server_transport; + if (use_ipv6) + { + server_transport = std::make_shared<TCPv6TransportDescriptor>(); + } + else + { + server_transport = std::make_shared<TCPv4TransportDescriptor>(); + } + server_transport->add_listener_port(server_port); + server->disable_builtin_transport().add_user_transport_to_pparams(server_transport); + server->init(); + }; + auto initialize_udp_participant = [&](PubSubReader<HelloWorldPubSubType>* udp_participant) + { + auto udp_participant_transport = std::make_shared<UDPv4TransportDescriptor>(); + udp_participant->disable_builtin_transport().add_user_transport_to_pparams(udp_participant_transport); + udp_participant->init(); + }; + initialize_server(server.get()); + ASSERT_TRUE(server->isInitialized()); + + // Wait for discovery. OpenOutputChannel() is called. We create a udp participant after to guarantee + // that the TCP participants have been mutually discovered when OpenOutputChannel() is called. + client->wait_discovery(1, std::chrono::seconds(0)); + server->wait_discovery(std::chrono::seconds(0), 1); + + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + client->wait_discovery(2, std::chrono::seconds(0)); + udp_participant->wait_discovery(std::chrono::seconds(0), 1); + + // We can only update the senders when OpenOutputChannel() is called. If the send resource + // is deleted later, senders obtained from get_send_resource_list() won't have changed. + auto send_resource_list = client_chaining_transport->get_send_resource_list(); + auto tcp_send_resources = [](const std::set<SenderResource*>& send_resource_list) -> size_t + { + size_t tcp_send_resources = 0; + for (auto& sender_resource : send_resource_list) + { + if (sender_resource->kind() == LOCATOR_KIND_TCPv4 || sender_resource->kind() == LOCATOR_KIND_TCPv6) + { + tcp_send_resources++; + } + } + return tcp_send_resources; + }; + EXPECT_EQ(tcp_send_resources(send_resource_list), 1); + + // Release TCP client resources. + server.reset(); + udp_participant.reset(); + + // Wait for undiscovery. + client->wait_reader_undiscovery(); + + // Create new client instances. + udp_participant.reset(new PubSubReader<HelloWorldPubSubType>(TEST_TOPIC_NAME)); + + // Wait for discovery. OpenOutputChannel() is called and we can update the senders. + initialize_udp_participant(udp_participant.get()); + ASSERT_TRUE(udp_participant->isInitialized()); + client->wait_discovery(1, std::chrono::seconds(0)); + udp_participant->wait_discovery(std::chrono::seconds(0), 1); + + // Check that the send_resource_list has size 1. This means that the send resource + // for the first client hasn't been removed because it was created from an initial_peer. + send_resource_list = client_chaining_transport->get_send_resource_list(); + EXPECT_EQ(tcp_send_resources(send_resource_list), 1); + send_resource_list.clear(); + + // If relaunching the server, the client should connect again. + server.reset(new PubSubReader<HelloWorldPubSubType>(TEST_TOPIC_NAME)); + initialize_server(server.get()); + ASSERT_TRUE(server->isInitialized()); + server->wait_discovery(std::chrono::seconds(0), 1); + client->wait_discovery(2, std::chrono::seconds(0)); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else diff --git a/test/blackbox/common/DatagramInjectionTransport.cpp b/test/blackbox/common/DatagramInjectionTransport.cpp index a4b74f43b3f..3b1259a0de0 100644 --- a/test/blackbox/common/DatagramInjectionTransport.cpp +++ b/test/blackbox/common/DatagramInjectionTransport.cpp @@ -42,6 +42,24 @@ std::set<TransportReceiverInterface*> DatagramInjectionTransportDescriptor::get_ return receivers_; } +void DatagramInjectionTransportDescriptor::update_send_resource_list( + const SendResourceList& send_resource_list) +{ + std::lock_guard<std::mutex> guard(mtx_); + + send_resource_list_.clear(); + for (const auto& resource : send_resource_list) + { + send_resource_list_.insert(resource.get()); + } +} + +std::set<SenderResource*> DatagramInjectionTransportDescriptor::get_send_resource_list() +{ + std::lock_guard<std::mutex> guard(mtx_); + return send_resource_list_; +} + } // namespace rtps } // namespace fastdds } // namespace eprosima diff --git a/test/blackbox/common/DatagramInjectionTransport.hpp b/test/blackbox/common/DatagramInjectionTransport.hpp index 3cfff45d6c9..aa1cd4b0873 100644 --- a/test/blackbox/common/DatagramInjectionTransport.hpp +++ b/test/blackbox/common/DatagramInjectionTransport.hpp @@ -17,6 +17,9 @@ #include <fastdds/rtps/transport/ChainingTransport.h> #include <fastdds/rtps/transport/ChainingTransportDescriptor.h> +#include <fastdds/rtps/transport/SenderResource.h> + +using SenderResource = eprosima::fastrtps::rtps::SenderResource; namespace eprosima { namespace fastdds { @@ -37,10 +40,16 @@ class DatagramInjectionTransportDescriptor : public ChainingTransportDescriptor std::set<TransportReceiverInterface*> get_receivers(); + void update_send_resource_list( + const SendResourceList& send_resource_list); + + std::set<SenderResource*> get_send_resource_list(); + private: std::mutex mtx_; std::set<TransportReceiverInterface*> receivers_; + std::set<SenderResource*> send_resource_list_; }; class DatagramInjectionTransport : public ChainingTransport @@ -60,23 +69,25 @@ class DatagramInjectionTransport : public ChainingTransport } bool send( - eprosima::fastrtps::rtps::SenderResource* /*low_sender_resource*/, - const eprosima::fastrtps::rtps::octet* /*send_buffer*/, - uint32_t /*send_buffer_size*/, - eprosima::fastrtps::rtps::LocatorsIterator* /*destination_locators_begin*/, - eprosima::fastrtps::rtps::LocatorsIterator* /*destination_locators_end*/, - const std::chrono::steady_clock::time_point& /*timeout*/) override + eprosima::fastrtps::rtps::SenderResource* low_sender_resource, + const eprosima::fastrtps::rtps::octet* send_buffer, + uint32_t send_buffer_size, + eprosima::fastrtps::rtps::LocatorsIterator* destination_locators_begin, + eprosima::fastrtps::rtps::LocatorsIterator* destination_locators_end, + const std::chrono::steady_clock::time_point& timeout) override { - return true; + return low_sender_resource->send(send_buffer, send_buffer_size, destination_locators_begin, + destination_locators_end, timeout); } void receive( - TransportReceiverInterface* /*next_receiver*/, - const eprosima::fastrtps::rtps::octet* /*receive_buffer*/, - uint32_t /*receive_buffer_size*/, - const eprosima::fastrtps::rtps::Locator_t& /*local_locator*/, - const eprosima::fastrtps::rtps::Locator_t& /*remote_locator*/) override + TransportReceiverInterface* next_receiver, + const eprosima::fastrtps::rtps::octet* receive_buffer, + uint32_t receive_buffer_size, + const eprosima::fastrtps::rtps::Locator_t& local_locator, + const eprosima::fastrtps::rtps::Locator_t& remote_locator) override { + next_receiver->OnDataReceived(receive_buffer, receive_buffer_size, local_locator, remote_locator); } bool OpenInputChannel( @@ -92,6 +103,15 @@ class DatagramInjectionTransport : public ChainingTransport return ret_val; } + bool OpenOutputChannel( + SendResourceList& send_resource_list, + const Locator& loc) override + { + bool ret_val = ChainingTransport::OpenOutputChannel(send_resource_list, loc); + parent_->update_send_resource_list(send_resource_list); + return ret_val; + } + private: DatagramInjectionTransportDescriptor* parent_ = nullptr; diff --git a/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h b/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h index 003306a10bb..0e541a7e2a6 100644 --- a/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h +++ b/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h @@ -26,6 +26,7 @@ #include <fastrtps/rtps/reader/RTPSReader.h> #include <fastrtps/rtps/resources/ResourceEvent.h> #include <fastrtps/rtps/writer/RTPSWriter.h> +#include <fastdds/rtps/common/LocatorList.hpp> #if HAVE_SECURITY #include <rtps/security/SecurityManager.h> @@ -331,6 +332,8 @@ class RTPSParticipantImpl MOCK_METHOD(bool, ignore_participant, (const GuidPrefix_t&)); + MOCK_METHOD(bool, update_removed_participant, (rtps::LocatorList_t&)); + private: MockParticipantListener listener_; diff --git a/test/unittest/rtps/discovery/CMakeLists.txt b/test/unittest/rtps/discovery/CMakeLists.txt index d188e24b969..52fcaf6541a 100644 --- a/test/unittest/rtps/discovery/CMakeLists.txt +++ b/test/unittest/rtps/discovery/CMakeLists.txt @@ -101,6 +101,27 @@ endif() gtest_discover_tests(EdpTests) #PDP TESTS + +set(TCPTransportInterface_SOURCE + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/ChannelResource.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/messages/RTPSMessageCreator.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/TCPControlMessage.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResource.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPTransportInterface.cpp + ) +if(TLS_FOUND) + set(TCPTransportInterface_SOURCE + ${TCPTransportInterface_SOURCE} + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorSecure.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp + ) +endif() + set(PDPTESTS_SOURCE PDPTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/builtin/discovery/participant/PDP.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp @@ -148,6 +169,7 @@ set(PDPTESTS_SOURCE PDPTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp + ${TCPTransportInterface_SOURCE} ) add_executable(PDPTests ${PDPTESTS_SOURCE}) @@ -185,7 +207,8 @@ target_include_directories(PDPTests PRIVATE target_link_libraries(PDPTests foonathan_memory GTest::gmock - ${CMAKE_DL_LIBS}) + ${CMAKE_DL_LIBS} + $<$<BOOL:${TLS_FOUND}>:OpenSSL::SSL$<SEMICOLON>OpenSSL::Crypto>) if(QNX) target_link_libraries(PDPTests socket) endif() @@ -196,4 +219,3 @@ else() endif() gtest_discover_tests(PDPTests) - diff --git a/test/unittest/rtps/security/CMakeLists.txt b/test/unittest/rtps/security/CMakeLists.txt index 0858655f941..0dc80d27d0a 100644 --- a/test/unittest/rtps/security/CMakeLists.txt +++ b/test/unittest/rtps/security/CMakeLists.txt @@ -40,6 +40,7 @@ set(SOURCES_SECURITY_TEST_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/security/exceptions/SecurityException.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/TimedConditionVariable.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/security/cryptography/AESGCMGMAC_Types.cpp ${PROJECT_SOURCE_DIR}/test/mock/rtps/SecurityPluginFactory/rtps/security/SecurityPluginFactory.cpp ) diff --git a/test/unittest/statistics/rtps/CMakeLists.txt b/test/unittest/statistics/rtps/CMakeLists.txt index 5ad9316a8de..5bbb555cd27 100644 --- a/test/unittest/statistics/rtps/CMakeLists.txt +++ b/test/unittest/statistics/rtps/CMakeLists.txt @@ -43,6 +43,26 @@ target_include_directories(RTPSStatisticsTests PRIVATE target_link_libraries(RTPSStatisticsTests fastrtps fastcdr GTest::gtest GTest::gmock) gtest_discover_tests(RTPSStatisticsTests) +set(TCPTransportInterface_SOURCE + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/ChannelResource.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/messages/RTPSMessageCreator.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/TCPControlMessage.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResource.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPTransportInterface.cpp + ) +if(TLS_FOUND) + set(TCPTransportInterface_SOURCE + ${TCPTransportInterface_SOURCE} + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorSecure.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp + ) +endif() + set(STATISTICS_RTPS_MONITORSERVICETESTS_SOURCE MonitorServiceTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/publisher/qos/WriterQos.cpp @@ -105,6 +125,7 @@ set(STATISTICS_RTPS_MONITORSERVICETESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/statistics/types/monitorservice_typesv1.cxx ${PROJECT_SOURCE_DIR}/src/cpp/statistics/types/types.cxx ${PROJECT_SOURCE_DIR}/src/cpp/statistics/types/typesv1.cxx + ${TCPTransportInterface_SOURCE} ) add_executable(MonitorServiceTests ${STATISTICS_RTPS_MONITORSERVICETESTS_SOURCE}) @@ -141,11 +162,15 @@ target_include_directories(MonitorServiceTests PRIVATE ${Asio_INCLUDE_DIR} ) -target_link_libraries(MonitorServiceTests fastcdr GTest::gtest GTest::gmock) +target_link_libraries(MonitorServiceTests + fastcdr + GTest::gtest + GTest::gmock + $<$<BOOL:${TLS_FOUND}>:OpenSSL::SSL$<SEMICOLON>OpenSSL::Crypto>) + gtest_discover_tests(MonitorServiceTests) if(QNX) target_link_libraries(RTPSStatisticsTests socket) target_link_libraries(MonitorServiceTests socket) endif() - diff --git a/test/unittest/transport/CMakeLists.txt b/test/unittest/transport/CMakeLists.txt index 1d2d1684663..05f83080b4b 100644 --- a/test/unittest/transport/CMakeLists.txt +++ b/test/unittest/transport/CMakeLists.txt @@ -46,6 +46,24 @@ if(TLS_FOUND) # ${CMAKE_CURRENT_BINARY_DIR}/permissions_helloworld.smime COPYONLY) endif() +set(TCPTransportInterface_SOURCE + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/messages/RTPSMessageCreator.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/tcp/TCPControlMessage.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptor.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResource.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPTransportInterface.cpp + ) +if(TLS_FOUND) + set(TCPTransportInterface_SOURCE + ${TCPTransportInterface_SOURCE} + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPAcceptorSecure.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceSecure.cpp + ) +endif() + set(UDPV4TESTS_SOURCE UDPv4Tests.cpp mock/MockReceiverResource.cpp @@ -63,6 +81,7 @@ set(UDPV4TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp + ${TCPTransportInterface_SOURCE} ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp @@ -86,6 +105,7 @@ set(UDPV6TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv6Transport.cpp + ${TCPTransportInterface_SOURCE} ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp @@ -119,9 +139,6 @@ set(TCPV4TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPTransportInterface.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPv4Transport.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp @@ -165,9 +182,6 @@ set(TCPV6TESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPTransportInterface.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/TCPv6Transport.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp @@ -184,32 +198,6 @@ if(TLS_FOUND) ) endif() -set(TEST_UDPV4TESTS_SOURCE - test_UDPv4Tests.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/policy/ParameterList.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/OStreamConsumer.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutConsumer.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/StdoutErrConsumer.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/attributes/PropertyPolicy.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/attributes/ThreadSettings.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/common/Time_t.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/flowcontrol/ThroughputControllerDescriptor.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/messages/RTPSMessageCreator.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/network/NetworkFactory.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/ChannelResource.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/test_UDPv4Transport.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/utils/SystemInfo.cpp - ) - set(SHAREDMEMTESTS_SOURCE SharedMemTests.cpp mock/MockReceiverResource.cpp @@ -226,9 +214,7 @@ set(SHAREDMEMTESTS_SOURCE ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/PortBasedTransportDescriptor.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/shared_mem/SharedMemTransportDescriptor.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPChannelResource.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPTransportInterface.cpp - ${PROJECT_SOURCE_DIR}/src/cpp/rtps/transport/UDPv4Transport.cpp + ${TCPTransportInterface_SOURCE} ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPFinder.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/IPLocator.cpp ${PROJECT_SOURCE_DIR}/src/cpp/utils/md5.cpp @@ -278,7 +264,8 @@ target_include_directories(UDPv4Tests PRIVATE target_link_libraries(UDPv4Tests fastcdr GTest::gtest - ${MOCKS}) + ${MOCKS} + $<$<BOOL:${TLS_FOUND}>:OpenSSL::SSL$<SEMICOLON>OpenSSL::Crypto>) if(QNX) target_link_libraries(UDPv4Tests socket) endif() @@ -287,37 +274,6 @@ if(MSVC OR MSVC_IDE) endif() gtest_discover_tests(UDPv4Tests) -add_executable(test_UDPv4Tests ${TEST_UDPV4TESTS_SOURCE}) -target_compile_definitions(test_UDPv4Tests PRIVATE - BOOST_ASIO_STANDALONE - ASIO_STANDALONE - $<$<AND:$<NOT:$<BOOL:${WIN32}>>,$<STREQUAL:"${CMAKE_BUILD_TYPE}","Debug">>:__DEBUG> - $<$<BOOL:${INTERNAL_DEBUG}>:__INTERNALDEBUG> # Internal debug activated. - ) -target_include_directories(test_UDPv4Tests PRIVATE - ${Asio_INCLUDE_DIR} - ${PROJECT_SOURCE_DIR}/test/mock/rtps/ParticipantProxyData - ${PROJECT_SOURCE_DIR}/test/mock/dds/QosPolicies - ${PROJECT_SOURCE_DIR}/test/mock/rtps/MessageReceiver - ${PROJECT_SOURCE_DIR}/test/mock/rtps/ReceiverResource - ${PROJECT_SOURCE_DIR}/include ${PROJECT_BINARY_DIR}/include - ${PROJECT_SOURCE_DIR}/src/cpp - $<$<BOOL:${ANDROID}>:${ANDROID_IFADDRS_INCLUDE_DIR}> - ) -target_link_libraries(test_UDPv4Tests - fastcdr - GTest::gtest - ${MOCKS}) -if(QNX) - target_link_libraries(test_UDPv4Tests socket) -endif() -if(MSVC OR MSVC_IDE) - target_link_libraries(test_UDPv4Tests ${PRIVACY} iphlpapi Shlwapi) -else() - target_link_libraries(test_UDPv4Tests ${PRIVACY}) -endif() -gtest_discover_tests(test_UDPv4Tests) - ########################## # IPv6 tests ########################## @@ -344,7 +300,8 @@ if(NOT DISABLE_UDPV6_TESTS) target_link_libraries(UDPv6Tests fastcdr GTest::gtest - ${MOCKS}) + ${MOCKS} + $<$<BOOL:${TLS_FOUND}>:OpenSSL::SSL$<SEMICOLON>OpenSSL::Crypto>) if(QNX) target_link_libraries(UDPv6Tests socket) endif() diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index a4445c49b08..e6481da3418 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -21,6 +21,7 @@ #include "mock/MockTCPChannelResource.h" #include "mock/MockTCPv4Transport.h" #include <fastdds/dds/log/Log.hpp> +#include <fastdds/rtps/common/LocatorList.hpp> #include <fastrtps/transport/TCPv4TransportDescriptor.h> #include <fastrtps/utils/Semaphore.h> #include <fastrtps/utils/IPFinder.h> @@ -1113,96 +1114,8 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_1) sem.wait(); } } -/* - TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_2) - { - eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); - - using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode; - using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions; - using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole; - TCPv4TransportDescriptor recvDescriptor; - recvDescriptor.add_listener_port(g_default_port + 1); - recvDescriptor.apply_security = true; - recvDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT; - //recvDescriptor.tls_config.password = "testkey"; - //recvDescriptor.tls_config.password = "test"; - //recvDescriptor.tls_config.cert_chain_file = "mainpubcert.pem"; - //recvDescriptor.tls_config.private_key_file = "mainpubkey.pem"; - recvDescriptor.tls_config.verify_file = "maincacert.pem"; // This CA only know about mainsub certificates - //recvDescriptor.tls_config.verify_file = "ca.pem"; - // Server doesn't accept clients without certs - recvDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_FAIL_IF_NO_PEER_CERT | TLSVerifyMode::VERIFY_PEER; - recvDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); - TCPv4Transport receiveTransportUnderTest(recvDescriptor); - receiveTransportUnderTest.init(); - - Locator_t inputLocator; - inputLocator.kind = LOCATOR_KIND_TCPv4; - inputLocator.port = g_default_port + 1; - IPLocator::setIPv4(inputLocator, 127, 0, 0, 1); - IPLocator::setLogicalPort(inputLocator, 7410); - - Locator_t outputLocator; - outputLocator.kind = LOCATOR_KIND_TCPv4; - IPLocator::setIPv4(outputLocator, 127, 0, 0, 1); - outputLocator.port = g_default_port + 1; - IPLocator::setLogicalPort(outputLocator, 7410); - - TCPv4TransportDescriptor sendDescriptor2; - sendDescriptor2.apply_security = true; - sendDescriptor2.tls_config.handshake_role = TLSHSRole::SERVER; - sendDescriptor2.tls_config.password = "test"; - sendDescriptor2.tls_config.cert_chain_file = "server.pem"; - sendDescriptor2.tls_config.private_key_file = "server.pem"; - //sendDescriptor2.tls_config.password = "testkey"; - //sendDescriptor2.tls_config.cert_chain_file = "mainsubcert.pem"; - //sendDescriptor2.tls_config.private_key_file = "mainsubkey.pem"; - sendDescriptor2.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER; - sendDescriptor2.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); - TCPv4Transport sendTransportUnderTest2(sendDescriptor2); - sendTransportUnderTest2.init(); - - { - MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast<MockMessageReceiver*>(receiver.CreateMessageReceiver()); - ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); - - ASSERT_TRUE(sendTransportUnderTest2.OpenOutputChannel(outputLocator)); - octet message[5] = { 'H','e','l','l','o' }; - - Semaphore sem; - std::function<void()> recCallback = [&]() - { - EXPECT_FALSE(true); // Should not receive - sem.post(); - }; - - msg_recv->setCallback(recCallback); - - auto sendThreadFunction = [&]() - { - bool sent = sendTransportUnderTest2.send(message, 5, outputLocator, inputLocator); - int count = 0; - while (!sent && count < 30) - { - sent = sendTransportUnderTest2.send(message, 5, outputLocator, inputLocator); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - ++count; - } - EXPECT_FALSE(sent); - sem.post(); - }; - - senderThread.reset(new std::thread(sendThreadFunction)); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - senderThread->join(); - sem.wait(); - } - ASSERT_TRUE(sendTransportUnderTest2.CloseOutputChannel(outputLocator)); - } - */ +// TODO(eduponz): TEST_F(TCPv4Tests, send_and_receive_between_secure_clients_2) TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_untrusted_server) { @@ -2145,6 +2058,96 @@ TEST_F(TCPv4Tests, opening_output_channel_with_same_locator_as_local_listening_p ASSERT_EQ(send_resource_list.size(), 2u); } +// This test verifies that the send resource list is correctly cleaned both in LAN and WAN cases. +TEST_F(TCPv4Tests, remove_from_send_resource_list) +{ + // Three scenarios are considered: LAN, WAN1 and WAN2 + // LAN: The remote locator is in the same LAN as the local locator + // WAN1: The remote locator is in a different LAN than the local locator, and initial peers have LAN and WAN remote addresses. + // WAN2: The remote locator is in a different LAN than the local locator, and initial peers have WANtoLANLocator ([0][WAN] address). + std::vector<std::string> test_cases = { + "LAN", + "WAN1", + "WAN2" + }; + + for (const std::string& test_case : test_cases) + { + TCPv4TransportDescriptor send_descriptor; + + MockTCPv4Transport send_transport_under_test(send_descriptor); + send_transport_under_test.init(); + + Locator_t discovery_locator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", g_default_port, discovery_locator); + IPLocator::setLogicalPort(discovery_locator, 7410); + + Locator_t initial_peer_locator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", g_default_port + 1, initial_peer_locator); + IPLocator::setLogicalPort(initial_peer_locator, 7410); + LocatorList_t initial_peer_list; + + if (test_case == "WAN1" || test_case == "WAN2") + { + IPLocator::setWan(discovery_locator, g_test_wan_address); + IPLocator::setWan(initial_peer_locator, g_test_wan_address); + + if (test_case == "WAN2") + { + initial_peer_locator = IPLocator::WanToLanLocator(initial_peer_locator); + } + } + + initial_peer_list.push_back(initial_peer_locator); + + SendResourceList send_resource_list; + ASSERT_TRUE(send_transport_under_test.OpenOutputChannel(send_resource_list, discovery_locator)); + ASSERT_TRUE(send_transport_under_test.OpenOutputChannel(send_resource_list, initial_peer_locator)); + ASSERT_EQ(send_resource_list.size(), 2u); + + // Using a wrong locator should not remove the channel resource + LocatorList_t wrong_remote_participant_physical_locators; + Locator_t wrong_output_locator; + IPLocator::createLocator(LOCATOR_KIND_TCPv4, "127.0.0.1", g_default_port + 2, wrong_output_locator); + IPLocator::setLogicalPort(wrong_output_locator, 7410); + + if (test_case == "WAN1" || test_case == "WAN2") + { + IPLocator::setWan(wrong_output_locator, g_test_wan_address); + } + wrong_remote_participant_physical_locators.push_back(wrong_output_locator); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + wrong_remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 2); + + // Using the correct locator should remove the channel resource + LocatorList_t remote_participant_physical_locators; + remote_participant_physical_locators.push_back(discovery_locator); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 1); + + // Using the initial peer locator should not remove the channel resource + remote_participant_physical_locators.clear(); + if (test_case == "WAN2") + { + // In WAN2, the remote_participant_physical_locators are the real Locators, not the WANtoLANLocators. + IPLocator::setIPv4(initial_peer_locator, "127.0.0.1"); + IPLocator::setWan(initial_peer_locator, g_test_wan_address); + } + remote_participant_physical_locators.push_back(initial_peer_locator); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 1); + } +} + void TCPv4Tests::HELPER_SetDescriptorDefaults() { descriptor.add_listener_port(g_default_port); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index b86f0286031..35104541227 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -20,6 +20,7 @@ #include <fastdds/dds/log/Log.hpp> #include <fastdds/rtps/attributes/RTPSParticipantAttributes.h> +#include <fastdds/rtps/common/LocatorList.hpp> #include <fastrtps/transport/TCPv6TransportDescriptor.h> #include <fastrtps/utils/IPLocator.h> #include <fastrtps/utils/Semaphore.h> @@ -143,9 +144,9 @@ TEST_F(TCPv6Tests, opening_and_closing_output_channel) ASSERT_FALSE (transportUnderTest.IsOutputChannelOpen(genericOutputChannelLocator)); ASSERT_TRUE (transportUnderTest.OpenOutputChannel(genericOutputChannelLocator)); ASSERT_TRUE (transportUnderTest.IsOutputChannelOpen(genericOutputChannelLocator)); - ASSERT_TRUE (transportUnderTest.CloseOutputChannel(genericOutputChannelLocator)); + ASSERT_TRUE (transportUnderTest.SenderResourceHasBeenClosed(genericOutputChannelLocator)); ASSERT_FALSE (transportUnderTest.IsOutputChannelOpen(genericOutputChannelLocator)); - ASSERT_FALSE (transportUnderTest.CloseOutputChannel(genericOutputChannelLocator)); + ASSERT_FALSE (transportUnderTest.SenderResourceHasBeenClosed(genericOutputChannelLocator)); */ } @@ -496,186 +497,64 @@ TEST_F(TCPv6Tests, opening_output_channel_with_same_locator_as_local_listening_p ASSERT_EQ(send_resource_list.size(), 2u); } -/* - TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports) - { - eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); - - using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions; - using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode; - - TCPv6TransportDescriptor recvDescriptor; - recvDescriptor.add_listener_port(g_default_port); - recvDescriptor.apply_security = true; - recvDescriptor.tls_config.password = "testkey"; - recvDescriptor.tls_config.cert_chain_file = "mainpubcert.pem"; - recvDescriptor.tls_config.private_key_file = "mainpubkey.pem"; - recvDescriptor.tls_config.verify_file = "maincacert.pem"; - // Server doesn't accept clients without certs - recvDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER | TLSVerifyMode::VERIFY_FAIL_IF_NO_PEER_CERT; - recvDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); - recvDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE); - recvDescriptor.tls_config.add_option(TLSOptions::NO_COMPRESSION); - recvDescriptor.tls_config.add_option(TLSOptions::NO_SSLV2); - recvDescriptor.tls_config.add_option(TLSOptions::NO_SSLV3); - TCPv6Transport receiveTransportUnderTest(recvDescriptor); - receiveTransportUnderTest.init(); - - TCPv6TransportDescriptor sendDescriptor; - sendDescriptor.apply_security = true; - sendDescriptor.tls_config.password = "testkey"; - sendDescriptor.tls_config.cert_chain_file = "mainsubcert.pem"; - sendDescriptor.tls_config.private_key_file = "mainsubkey.pem"; - sendDescriptor.tls_config.verify_file = "maincacert.pem"; - sendDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER; - sendDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); - sendDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE); - sendDescriptor.tls_config.add_option(TLSOptions::NO_COMPRESSION); - sendDescriptor.tls_config.add_option(TLSOptions::NO_SSLV2); - sendDescriptor.tls_config.add_option(TLSOptions::NO_SSLV3); - TCPv6Transport sendTransportUnderTest(sendDescriptor); - sendTransportUnderTest.init(); - - Locator_t inputLocator; - inputLocator.kind = LOCATOR_KIND_TCPv6; - inputLocator.port = g_default_port; - IPLocator::setIPv4(inputLocator, "::1"); - IPLocator::setLogicalPort(inputLocator, 7410); - - Locator_t outputLocator; - outputLocator.kind = LOCATOR_KIND_TCPv6; - IPLocator::setIPv4(outputLocator, "::1"); - outputLocator.port = g_default_port; - IPLocator::setLogicalPort(outputLocator, 7410); - - { - MockReceiverResource receiver(receiveTransportUnderTest, inputLocator); - MockMessageReceiver *msg_recv = dynamic_cast<MockMessageReceiver*>(receiver.CreateMessageReceiver()); - ASSERT_TRUE(receiveTransportUnderTest.IsInputChannelOpen(inputLocator)); - - ASSERT_TRUE(sendTransportUnderTest.OpenOutputChannel(outputLocator)); - octet message[5] = { 'H','e','l','l','o' }; - - Semaphore sem; - std::function<void()> recCallback = [&]() - { - EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); - sem.post(); - }; - - msg_recv->setCallback(recCallback); - - auto sendThreadFunction = [&]() - { - bool sent = sendTransportUnderTest.send(message, 5, outputLocator, inputLocator); - while (!sent) - { - sent = sendTransportUnderTest.send(message, 5, outputLocator, inputLocator); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - EXPECT_TRUE(sent); - //EXPECT_TRUE(transportUnderTest.send(message, 5, outputLocator, inputLocator)); - }; - - senderThread.reset(new std::thread(sendThreadFunction)); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - senderThread->join(); - sem.wait(); - } - ASSERT_TRUE(sendTransportUnderTest.CloseOutputChannel(outputLocator)); - } - */ -// TODO SKIP AT THIS MOMENT -/* - TEST_F(TCPv6Tests, send_and_receive_between_ports) - { - descriptor.listening_ports.push_back(g_default_port); - TCPv6Transport transportUnderTest(descriptor); - transportUnderTest.init(); - - Locator_t localLocator; - localLocator.port = g_default_port; - localLocator.kind = LOCATOR_KIND_TCPv6; - IPLocator::setIPv6(localLocator, "::1"); - - Locator_t outputChannelLocator; - outputChannelLocator = g_default_port; - outputChannelLocator.kind = LOCATOR_KIND_TCPv6; - IPLocator::setIPv6(outputChannelLocator, "::1"); - - MockReceiverResource receiver(transportUnderTest, localLocator); - MockMessageReceiver *msg_recv = dynamic_cast<MockMessageReceiver*>(receiver.CreateMessageReceiver()); - - ASSERT_TRUE(transportUnderTest.OpenOutputChannel(outputChannelLocator)); // Includes loopback - ASSERT_TRUE(transportUnderTest.IsInputChannelOpen(localLocator)); - octet message[5] = { 'H','e','l','l','o' }; - - std::this_thread::sleep_for(std::chrono::seconds(5)); - - Semaphore sem; - std::function<void()> recCallback = [&]() - { - EXPECT_EQ(memcmp(message,msg_recv->data,5), 0); - sem.post(); - }; - - msg_recv->setCallback(recCallback); +// This test verifies that the send resource list is correctly cleaned and the channel resource is removed +// from the channel_resources_map. +TEST_F(TCPv6Tests, remove_from_send_resource_list) +{ + TCPv6TransportDescriptor send_descriptor; + MockTCPv6Transport send_transport_under_test(send_descriptor); + send_transport_under_test.init(); - auto sendThreadFunction = [&]() - { - EXPECT_TRUE(transportUnderTest.send(message, 5, outputChannelLocator, localLocator)); - }; - - senderThread.reset(new std::thread(sendThreadFunction)); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - senderThread->join(); - sem.wait(); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(outputChannelLocator)); - } - - TEST_F(TCPv6Tests, send_to_loopback) - { - TCPv6Transport transportUnderTest(descriptor); - transportUnderTest.init(); + Locator_t output_locator_1; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", g_default_port, output_locator_1); + IPLocator::setLogicalPort(output_locator_1, 7410); - Locator_t multicastLocator; - multicastLocator.set_port(g_default_port); - multicastLocator.kind = LOCATOR_KIND_TCPv6; - IPLocator::setIPv6(multicastLocator, 0xff31, 0, 0, 0, 0, 0, 0, 0); + Locator_t output_locator_2; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", g_default_port + 1, output_locator_2); + IPLocator::setLogicalPort(output_locator_2, 7410); - Locator_t outputChannelLocator; - outputChannelLocator.set_port(g_default_port + 1); - outputChannelLocator.kind = LOCATOR_KIND_TCPv6; - IPLocator::setIPv6(outputChannelLocator, 0,0,0,0,0,0,0,1); // Loopback + LocatorList_t initial_peer_list; + initial_peer_list.push_back(output_locator_2); - MockReceiverResource receiver(transportUnderTest, multicastLocator); - MockMessageReceiver *msg_recv = dynamic_cast<MockMessageReceiver*>(receiver.CreateMessageReceiver()); + SendResourceList send_resource_list; + ASSERT_TRUE(send_transport_under_test.OpenOutputChannel(send_resource_list, output_locator_1)); + ASSERT_TRUE(send_transport_under_test.OpenOutputChannel(send_resource_list, output_locator_2)); + ASSERT_EQ(send_resource_list.size(), 2u); - ASSERT_TRUE(transportUnderTest.OpenOutputChannel(outputChannelLocator)); - ASSERT_TRUE(transportUnderTest.IsInputChannelOpen(multicastLocator)); - octet message[5] = { 'H','e','l','l','o' }; + // Using a wrong locator should not remove the channel resource + LocatorList_t wrong_remote_participant_physical_locators; + Locator_t wrong_output_locator; + IPLocator::createLocator(LOCATOR_KIND_TCPv6, "::1", g_default_port + 2, wrong_output_locator); + IPLocator::setLogicalPort(wrong_output_locator, 7410); + wrong_remote_participant_physical_locators.push_back(wrong_output_locator); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + wrong_remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 2u); - Semaphore sem; - std::function<void()> recCallback = [&]() - { - EXPECT_EQ(memcmp(message,msg_recv->data,5), 0); - sem.post(); - }; + // Using the correct locator should remove the channel resource + LocatorList_t remote_participant_physical_locators; + remote_participant_physical_locators.push_back(output_locator_1); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 1u); + + // Using the initial peer locator should not remove the channel resource + remote_participant_physical_locators.clear(); + remote_participant_physical_locators.push_back(output_locator_2); + send_transport_under_test.CloseOutputChannel( + send_resource_list, + remote_participant_physical_locators, + initial_peer_list); + ASSERT_EQ(send_resource_list.size(), 1u); +} - msg_recv->setCallback(recCallback); +// TODO: TEST_F(TCPv6Tests, send_and_receive_between_both_secure_ports) +// TODO: TEST_F(TCPv6Tests, send_and_receive_between_ports) - auto sendThreadFunction = [&]() - { - EXPECT_TRUE(transportUnderTest.send(message, 5, outputChannelLocator, multicastLocator)); - }; - - senderThread.reset(new std::thread(sendThreadFunction)); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - senderThread->join(); - sem.wait(); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(outputChannelLocator)); - } - */ #endif // ifndef __APPLE__ void TCPv6Tests::HELPER_SetDescriptorDefaults() diff --git a/test/unittest/transport/test_UDPv4Tests.cpp b/test/unittest/transport/test_UDPv4Tests.cpp deleted file mode 100644 index 512095ac7fd..00000000000 --- a/test/unittest/transport/test_UDPv4Tests.cpp +++ /dev/null @@ -1,270 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include <thread> -#include <memory> -#include <string> - -#include <gtest/gtest.h> -#include <fastdds/dds/log/Log.hpp> -#include <fastrtps/transport/test_UDPv4TransportDescriptor.h> -#include <fastrtps/rtps/common/CDRMessage_t.h> -#include <fastrtps/rtps/messages/RTPSMessageCreator.h> -#include <fastrtps/qos/ParameterTypes.h> -#include <fastrtps/utils/IPLocator.h> -#include <rtps/transport/test_UDPv4Transport.h> - -#if defined(_WIN32) -#define GET_PID _getpid -#else -#define GET_PID getpid -#endif // if defined(_WIN32) - -using IPLocator = eprosima::fastrtps::rtps::IPLocator; -using test_UDPv4Transport = eprosima::fastdds::rtps::test_UDPv4Transport; - -static uint16_t g_default_port = 0; - -uint16_t get_port() -{ - uint16_t port = static_cast<uint16_t>(GET_PID()); - - if (4000 > port) - { - port += 4000; - } - - return port; -} - -using namespace eprosima::fastrtps; -using namespace eprosima::fastrtps::rtps; - -class test_UDPv4Tests : public ::testing::Test -{ -public: - - test_UDPv4Tests() - { - HELPER_SetDescriptorDefaults(); - } - - ~test_UDPv4Tests() - { - eprosima::fastdds::dds::Log::KillThread(); - } - - void HELPER_SetDescriptorDefaults(); - void HELPER_WarmUpOutput( - test_UDPv4Transport& transport); - void HELPER_FillDataMessage( - CDRMessage_t& message, - SequenceNumber_t sequenceNumber); - void HELPER_FillAckNackMessage( - CDRMessage_t& message); - void HELPER_FillHeartbeatMessage( - CDRMessage_t& message); - - test_UDPv4TransportDescriptor descriptor; - std::unique_ptr<std::thread> senderThread; - std::unique_ptr<std::thread> receiverThread; -}; - -/* - TEST_F(test_UDPv4Tests, DATA_messages_dropped) - { - // Given - descriptor.dropDataMessagesPercentage = 100; - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillDataMessage(testDataMessage, SequenceNumber_t()); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(1u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, ACKNACK_messages_dropped) - { - // Given - descriptor.dropAckNackMessagesPercentage = 100; - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillAckNackMessage(testDataMessage); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(1u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, HEARTBEAT_messages_dropped) - { - // Given - descriptor.dropHeartbeatMessagesPercentage = 100; - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillHeartbeatMessage(testDataMessage); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(1u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, Dropping_by_random_chance) - { - // Given - descriptor.percentageOfMessagesToDrop = 100; // To avoid a non-deterministic test - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillAckNackMessage(testDataMessage); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(3u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, dropping_by_sequence_number) - { - // Given - std::vector<SequenceNumber_t> sequenceNumbersToDrop(1); - sequenceNumbersToDrop.back().low = 1; - - descriptor.sequenceNumberDataMessagesToDrop = sequenceNumbersToDrop; - test_UDPv4Transport transportUnderTest(descriptor); - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillDataMessage(testDataMessage, sequenceNumbersToDrop.back()); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(1u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - TEST_F(test_UDPv4Tests, No_drops_when_unrequested) - { - // Given - descriptor.dropHeartbeatMessagesPercentage = 100; - descriptor.dropDataMessagesPercentage = 100; - descriptor.granularMode = false; - - test_UDPv4Transport transportUnderTest(descriptor); // Default, no drops - transportUnderTest.init(); - CDRMessage_t testDataMessage; - HELPER_FillAckNackMessage(testDataMessage); - HELPER_WarmUpOutput(transportUnderTest); - Locator_t locator; - locator.port = g_default_port; - locator.kind = LOCATOR_KIND_UDPv4; - IPLocator::setIPv4(locator, 239, 255, 1, 4); - - // Then - ASSERT_TRUE(transportUnderTest.send(testDataMessage.buffer, testDataMessage.length, locator, locator)); - ASSERT_EQ(0u, test_UDPv4Transport::test_UDPv4Transport_DropLog.size()); - ASSERT_TRUE(transportUnderTest.CloseOutputChannel(locator)); - } - - void test_UDPv4Tests::HELPER_SetDescriptorDefaults() - { - descriptor.sendBufferSize = 80; - descriptor.receiveBufferSize = 80; - descriptor.dropDataMessagesPercentage = 0; - descriptor.dropDataFragMessagesPercentage = 0; - descriptor.dropAckNackMessagesPercentage = 0; - descriptor.dropHeartbeatMessagesPercentage = 0; - descriptor.percentageOfMessagesToDrop = 0; - descriptor.dropLogLength = 10; - descriptor.granularMode = false; - } - - void test_UDPv4Tests::HELPER_WarmUpOutput(test_UDPv4Transport& transport) - { - Locator_t outputChannelLocator; - outputChannelLocator.port = g_default_port; - outputChannelLocator.kind = LOCATOR_KIND_UDPv4; - ASSERT_TRUE(transport.OpenOutputChannel(outputChannelLocator)); - } - */ - -void test_UDPv4Tests::HELPER_FillDataMessage( - CDRMessage_t& message, - SequenceNumber_t sequenceNumber) -{ - GuidPrefix_t prefix; - TopicKind_t topic = WITH_KEY; - EntityId_t entityID; - CacheChange_t change; - change.sequenceNumber = sequenceNumber; // Here is where the SN propagates from - RTPSMessageCreator::addMessageData(&message, prefix, &change, topic, entityID, false, nullptr); -} - -void test_UDPv4Tests::HELPER_FillAckNackMessage( - CDRMessage_t& message) -{ - GuidPrefix_t prefix; - EntityId_t entityID; - SequenceNumberSet_t set; - RTPSMessageCreator::addMessageAcknack(&message, prefix, prefix, entityID, entityID, set, 0, false); -} - -void test_UDPv4Tests::HELPER_FillHeartbeatMessage( - CDRMessage_t& message) -{ - GuidPrefix_t prefix; - EntityId_t entityID; - SequenceNumber_t sn1; - SequenceNumber_t sn2; - RTPSMessageCreator::addMessageHeartbeat(&message, prefix, entityID, entityID, sn1, sn2, 0, false, false); -} - -int main( - int argc, - char** argv) -{ - g_default_port = get_port(); - - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -}