Skip to content

Commit

Permalink
TCP Client&Server Participant Decision-Making (#4277)
Browse files Browse the repository at this point in the history
* Refs #20180: Spelling fix

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Create function to check pending_logical_ports

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Add new TCP Connection type

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Add new method to enable TCP channel after connection in LARGE DATA

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Refactor OpenOutputChannel method

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Include LARGE DATA logic in SocketAccepted method

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Include LARGE DATA logic in processBindConnectionRequest method

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Add new TCP connection and methods SECURITY

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Add SECURITY support to OpenOutputChannel

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Include LARGE DATA logic in SecureSocketAccepted method

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Bugfix setting environment variable in tests

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Add multicast IPv6 to LARGE_DATA locator

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Add TCP test in LARGE_DATA Topology

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Clean code & Uncrustify

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Modified participant populated test

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20180: Add new api to update channels

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Refactor OpenOutputChannel with new channel map

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Refactor bind_socket with new channel map

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Refactor SocketAccepted with new channel map (Revert to old behavior)

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Refactor CloseOutputChannel to support cases with new channel map

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Update Secure channel logic with new channel map

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Use same IPv6 as defaultMetatrafficMulticast

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Refactor using locator inside sender resource instead of channel

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Data races associated

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Apply suggestions to tests

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20180: Add OpenLogicalPortRequest scenario before Bind Response is processed

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180. Remove unnecessary methods.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20180. Early exit when logical_port is 0.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20180. Refactor to avoid lambda.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20180. Minor changes.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20180. Simplify with WanToLanLocator.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20180: Apply suggestions

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Revert "Refs #20180: Add new TCP Connection type"

This reverts commit 4b4eb4153553cfdbf007896ff524d3e904f722c9.

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Remove unnecessary lock

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Revert "Refs #20180: Add new TCP connection and methods SECURITY"

This reverts commit c69d92e56dcfab3c3c7287145a7a93e57947d3cb.

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180: Fix test (file descriptors limit)

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20180: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20180. Fix port comparison.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

Co-authored-by: Carlos Ferreira González <carlosferreira@eprosima.com>

* Refs #20180: Fix double lock and update old test with current behavior

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20180: Apply suggestions

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20180: Fix windows test build

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20180: Apply test suggestions

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20180: Bugfix with cv in receiver_resources_ destruction

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>

* Refs #20180. Delete participant on failing ParticipantTests.ParticipantCreationWithBuiltinTransport

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #20180. Do not use non_blocking_send on TCP transport for LARGE_DATA.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

---------

Signed-off-by: cferreiragonz <carlosferreira@eprosima.com>
Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
Co-authored-by: Jesus Perez <jesusperez@eprosima.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
  • Loading branch information
3 people authored Feb 14, 2024
1 parent ec3cdcd commit 709b140
Show file tree
Hide file tree
Showing 19 changed files with 549 additions and 124 deletions.
2 changes: 2 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,7 @@ const DataWriterQos& DataWriterImpl::get_qos() const
ReturnCode_t DataWriterImpl::set_listener(
DataWriterListener* listener)
{
std::lock_guard<std::mutex> scoped_lock(listener_mutex_);
listener_ = listener;
return ReturnCode_t::RETCODE_OK;
}
Expand Down Expand Up @@ -1974,6 +1975,7 @@ bool DataWriterImpl::can_qos_be_updated(
DataWriterListener* DataWriterImpl::get_listener_for(
const StatusMask& status)
{
std::lock_guard<std::mutex> scoped_lock(listener_mutex_);
if (listener_ != nullptr &&
user_datawriter_->get_status_mask().is_active(status))
{
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
//! DataWriterListener
DataWriterListener* listener_ = nullptr;

//! Mutex to protect listener_
std::mutex listener_mutex_;

//!History
DataWriterHistory history_;

Expand Down
3 changes: 1 addition & 2 deletions src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ static void setup_transports_large_data(

auto tcp_transport = create_tcpv4_transport(att);
att.userTransports.push_back(tcp_transport);
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");

Locator_t tcp_loc;
tcp_loc.kind = LOCATOR_KIND_TCPv4;
Expand Down Expand Up @@ -245,7 +244,6 @@ static void setup_transports_large_datav6(

auto tcp_transport = create_tcpv6_transport(att);
att.userTransports.push_back(tcp_transport);
att.properties.properties().emplace_back("fastdds.tcp_transport.non_blocking_send", "true");

Locator_t tcp_loc;
tcp_loc.kind = LOCATOR_KIND_TCPv6;
Expand All @@ -263,6 +261,7 @@ static void setup_transports_large_datav6(
{
Locator_t pdp_locator;
pdp_locator.kind = LOCATOR_KIND_UDPv6;
IPLocator::setIPv6(pdp_locator, "ff1e::ffff:efff:1");
att.builtin.metatrafficMulticastLocatorList.push_back(pdp_locator);
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/cpp/rtps/transport/TCPChannelResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ResponseCode TCPChannelResource::process_bind_request(
if (connection_status_.compare_exchange_strong(expected, eConnectionStatus::eEstablished))
{
locator_ = IPLocator::toPhysicalLocator(locator);
EPROSIMA_LOG_INFO(RTCP_MSG, "Connection Stablished");
EPROSIMA_LOG_INFO(RTCP_MSG, "Connection Established");
return RETCODE_OK;
}
else if (expected == eConnectionStatus::eEstablished)
Expand Down Expand Up @@ -139,9 +139,7 @@ void TCPChannelResource::add_logical_port(
pending_logical_output_ports_.emplace_back(port);
if (connection_established())
{
scopedLock.unlock();
TCPTransactionId id = rtcp_manager->sendOpenLogicalPortRequest(this, port);
scopedLock.lock();
negotiating_logical_ports_[id] = port;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/cpp/rtps/transport/TCPChannelResourceBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ void TCPChannelResourceBasic::disconnect()
{
if (eConnecting < change_status(eConnectionStatus::eDisconnected) && alive())
{
std::lock_guard<std::mutex> read_lock(read_mutex_);
auto socket = socket_;

std::error_code ec;
Expand Down
14 changes: 7 additions & 7 deletions src/cpp/rtps/transport/TCPSenderResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource

TCPSenderResource(
TCPTransportInterface& transport,
std::shared_ptr<TCPChannelResource>& channel)
eprosima::fastrtps::rtps::Locator_t& locator)
: fastrtps::rtps::SenderResource(transport.kind())
, channel_(channel)
, locator_(locator)
{
// Implementation functions are bound to the right transport parameters
clean_up = [this, &transport]()
{
transport.CloseOutputChannel(channel_);
transport.CloseOutputChannel(locator_);
};

send_lambda_ = [this, &transport](
Expand All @@ -49,7 +49,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
fastrtps::rtps::LocatorsIterator* destination_locators_end,
const std::chrono::steady_clock::time_point&) -> bool
{
return transport.send(data, dataSize, channel_, destination_locators_begin,
return transport.send(data, dataSize, locator_, destination_locators_begin,
destination_locators_end);
};
}
Expand All @@ -62,9 +62,9 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
}
}

std::shared_ptr<TCPChannelResource>& channel()
fastrtps::rtps::Locator_t& locator()
{
return channel_;
return locator_;
}

static TCPSenderResource* cast(
Expand Down Expand Up @@ -102,7 +102,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource
TCPSenderResource& operator =(
const SenderResource&) = delete;

std::shared_ptr<TCPChannelResource> channel_;
fastrtps::rtps::Locator_t locator_;
};

} // namespace rtps
Expand Down
Loading

0 comments on commit 709b140

Please sign in to comment.