diff --git a/rmw_fastrtps_cpp/CMakeLists.txt b/rmw_fastrtps_cpp/CMakeLists.txt index 3b376b9cc..35ef66d0e 100644 --- a/rmw_fastrtps_cpp/CMakeLists.txt +++ b/rmw_fastrtps_cpp/CMakeLists.txt @@ -89,6 +89,7 @@ add_library(rmw_fastrtps_cpp src/serialization_format.cpp src/subscription.cpp src/type_support_common.cpp + src/rmw_get_endpoint_network_flow.cpp ) target_link_libraries(rmw_fastrtps_cpp fastcdr fastrtps) diff --git a/rmw_fastrtps_cpp/src/publisher.cpp b/rmw_fastrtps_cpp/src/publisher.cpp index 845cd825d..41df66259 100644 --- a/rmw_fastrtps_cpp/src/publisher.cpp +++ b/rmw_fastrtps_cpp/src/publisher.cpp @@ -81,6 +81,13 @@ rmw_fastrtps_cpp::create_publisher( } RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr); + if (RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED == + publisher_options->require_unique_network_flow_endpoints) + { + RMW_SET_ERROR_MSG("Unique network flow endpoints not supported on publishers"); + return nullptr; + } + Participant * participant = participant_info->participant; RMW_CHECK_ARGUMENT_FOR_NULL(participant, nullptr); diff --git a/rmw_fastrtps_cpp/src/rmw_get_endpoint_network_flow.cpp b/rmw_fastrtps_cpp/src/rmw_get_endpoint_network_flow.cpp new file mode 100644 index 000000000..371c43d88 --- /dev/null +++ b/rmw_fastrtps_cpp/src/rmw_get_endpoint_network_flow.cpp @@ -0,0 +1,46 @@ +// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw/get_network_flow_endpoints.h" +#include "rmw/error_handling.h" +#include "rmw/rmw.h" +#include "rmw/types.h" +#include "rmw_fastrtps_shared_cpp/rmw_common.hpp" + +extern "C" +{ +rmw_ret_t +rmw_publisher_get_network_flow_endpoints( + const rmw_publisher_t * publisher, + rcutils_allocator_t * allocator, + rmw_network_flow_endpoint_array_t * network_flow_endpoint_array) +{ + return rmw_fastrtps_shared_cpp::__rmw_publisher_get_network_flow_endpoints( + publisher, + allocator, + network_flow_endpoint_array); +} + +rmw_ret_t +rmw_subscription_get_network_flow_endpoints( + const rmw_subscription_t * subscription, + rcutils_allocator_t * allocator, + rmw_network_flow_endpoint_array_t * network_flow_endpoint_array) +{ + return rmw_fastrtps_shared_cpp::__rmw_subscription_get_network_flow_endpoints( + subscription, + allocator, + network_flow_endpoint_array); +} +} // extern "C" diff --git a/rmw_fastrtps_cpp/src/subscription.cpp b/rmw_fastrtps_cpp/src/subscription.cpp index c8f69488e..30cbf6fb0 100644 --- a/rmw_fastrtps_cpp/src/subscription.cpp +++ b/rmw_fastrtps_cpp/src/subscription.cpp @@ -46,6 +46,7 @@ using Domain = eprosima::fastrtps::Domain; using Participant = eprosima::fastrtps::Participant; using TopicDataType = eprosima::fastrtps::TopicDataType; +using PropertyPolicyHelper = eprosima::fastrtps::rtps::PropertyPolicyHelper; using XMLProfileManager = eprosima::fastrtps::xmlparser::XMLProfileManager; namespace rmw_fastrtps_cpp @@ -169,7 +170,40 @@ create_subscription( } } - info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_); + eprosima::fastrtps::SubscriberAttributes originalParam = subscriberParam; + switch (subscription_options->require_unique_network_flow_endpoints) { + default: + case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_SYSTEM_DEFAULT: + case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED: + // Unique network flow endpoints not required. We leave the decission to the XML profile. + break; + + case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED: + case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED: + // Ensure we request unique network flow endpoints + if (nullptr == + PropertyPolicyHelper::find_property( + subscriberParam.properties, + "fastdds.unique_network_flows")) + { + subscriberParam.properties.properties().emplace_back("fastdds.unique_network_flows", ""); + } + break; + } + + info->subscriber_ = Domain::createSubscriber( + participant, + subscriberParam, + info->listener_); + if (!info->subscriber_ && + (RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED == + subscription_options->require_unique_network_flow_endpoints)) + { + info->subscriber_ = Domain::createSubscriber( + participant, + originalParam, + info->listener_); + } if (!info->subscriber_) { RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber"); return nullptr; diff --git a/rmw_fastrtps_dynamic_cpp/CMakeLists.txt b/rmw_fastrtps_dynamic_cpp/CMakeLists.txt index ec43fe20f..f91079b4d 100644 --- a/rmw_fastrtps_dynamic_cpp/CMakeLists.txt +++ b/rmw_fastrtps_dynamic_cpp/CMakeLists.txt @@ -94,6 +94,7 @@ add_library(rmw_fastrtps_dynamic_cpp src/type_support_common.cpp src/type_support_proxy.cpp src/type_support_registry.cpp + src/rmw_get_endpoint_network_flow.cpp ) target_link_libraries(rmw_fastrtps_dynamic_cpp fastcdr fastrtps) diff --git a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp index f6cfde1a1..899b05a9e 100644 --- a/rmw_fastrtps_dynamic_cpp/src/publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/publisher.cpp @@ -81,6 +81,13 @@ rmw_fastrtps_dynamic_cpp::create_publisher( } RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr); + if (RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED == + publisher_options->require_unique_network_flow_endpoints) + { + RMW_SET_ERROR_MSG("Unique network flow endpoints not supported on publishers"); + return nullptr; + } + Participant * participant = participant_info->participant; RMW_CHECK_ARGUMENT_FOR_NULL(participant, nullptr); @@ -193,7 +200,10 @@ rmw_fastrtps_dynamic_cpp::create_publisher( return nullptr; } - info->publisher_ = Domain::createPublisher(participant, publisherParam, info->listener_); + info->publisher_ = Domain::createPublisher( + participant, + publisherParam, + info->listener_); if (!info->publisher_) { RMW_SET_ERROR_MSG("create_publisher() could not create publisher"); return nullptr; diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_get_endpoint_network_flow.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_get_endpoint_network_flow.cpp new file mode 100644 index 000000000..7740818e7 --- /dev/null +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_get_endpoint_network_flow.cpp @@ -0,0 +1,46 @@ +// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw/error_handling.h" +#include "rmw/rmw.h" +#include "rmw/types.h" +#include "rmw/get_network_flow_endpoints.h" +#include "rmw_fastrtps_shared_cpp/rmw_common.hpp" + +extern "C" +{ +rmw_ret_t +rmw_publisher_get_network_flow_endpoints( + const rmw_publisher_t * publisher, + rcutils_allocator_t * allocator, + rmw_network_flow_endpoint_array_t * network_flow_endpoint_array) +{ + return rmw_fastrtps_shared_cpp::__rmw_publisher_get_network_flow_endpoints( + publisher, + allocator, + network_flow_endpoint_array); +} + +rmw_ret_t +rmw_subscription_get_network_flow_endpoints( + const rmw_subscription_t * subscription, + rcutils_allocator_t * allocator, + rmw_network_flow_endpoint_array_t * network_flow_endpoint_array) +{ + return rmw_fastrtps_shared_cpp::__rmw_subscription_get_network_flow_endpoints( + subscription, + allocator, + network_flow_endpoint_array); +} +} // extern "C" diff --git a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp index 156fc6ece..41091471e 100644 --- a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp @@ -44,6 +44,7 @@ using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport; using Domain = eprosima::fastrtps::Domain; using Participant = eprosima::fastrtps::Participant; +using PropertyPolicyHelper = eprosima::fastrtps::rtps::PropertyPolicyHelper; using TopicDataType = eprosima::fastrtps::TopicDataType; using TypeSupportProxy = rmw_fastrtps_dynamic_cpp::TypeSupportProxy; using XMLProfileManager = eprosima::fastrtps::xmlparser::XMLProfileManager; @@ -180,7 +181,40 @@ create_subscription( return nullptr; } - info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_); + eprosima::fastrtps::SubscriberAttributes originalParam = subscriberParam; + switch (subscription_options->require_unique_network_flow_endpoints) { + default: + case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_SYSTEM_DEFAULT: + case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED: + // Unique network flow endpoints not required. We leave the decission to the XML profile. + break; + + case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED: + case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED: + // Ensure we request unique network flow endpoints + if (nullptr == + PropertyPolicyHelper::find_property( + subscriberParam.properties, + "fastdds.unique_network_flows")) + { + subscriberParam.properties.properties().emplace_back("fastdds.unique_network_flows", ""); + } + break; + } + + info->subscriber_ = Domain::createSubscriber( + participant, + subscriberParam, + info->listener_); + if (!info->subscriber_ && + (RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED == + subscription_options->require_unique_network_flow_endpoints)) + { + info->subscriber_ = Domain::createSubscriber( + participant, + originalParam, + info->listener_); + } if (!info->subscriber_) { RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber"); return nullptr; diff --git a/rmw_fastrtps_shared_cpp/CMakeLists.txt b/rmw_fastrtps_shared_cpp/CMakeLists.txt index 71c39d2e6..5bf283fc9 100644 --- a/rmw_fastrtps_shared_cpp/CMakeLists.txt +++ b/rmw_fastrtps_shared_cpp/CMakeLists.txt @@ -86,6 +86,7 @@ add_library(rmw_fastrtps_shared_cpp src/rmw_wait_set.cpp src/subscription.cpp src/TypeSupport_impl.cpp + src/rmw_get_endpoint_network_flow.cpp ) target_include_directories(rmw_fastrtps_shared_cpp PUBLIC diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 1a9a0148b..082e5629a 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -23,6 +23,7 @@ #include "rmw/topic_endpoint_info_array.h" #include "rmw/types.h" #include "rmw/names_and_types.h" +#include "rmw/network_flow_endpoint_array.h" namespace rmw_fastrtps_shared_cpp { @@ -401,6 +402,20 @@ __rmw_qos_profile_check_compatible( char * reason, size_t reason_size); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_publisher_get_network_flow_endpoints( + const rmw_publisher_t * publisher, + rcutils_allocator_t * allocator, + rmw_network_flow_endpoint_array_t * network_flow_endpoint_array); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_subscription_get_network_flow_endpoints( + const rmw_subscription_t * subscription, + rcutils_allocator_t * allocator, + rmw_network_flow_endpoint_array_t * network_flow_endpoint_array); + } // namespace rmw_fastrtps_shared_cpp #endif // RMW_FASTRTPS_SHARED_CPP__RMW_COMMON_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/rmw_get_endpoint_network_flow.cpp b/rmw_fastrtps_shared_cpp/src/rmw_get_endpoint_network_flow.cpp new file mode 100644 index 000000000..f227eaf7d --- /dev/null +++ b/rmw_fastrtps_shared_cpp/src/rmw_get_endpoint_network_flow.cpp @@ -0,0 +1,204 @@ +// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rmw/get_network_flow_endpoints.h" +#include "rmw/error_handling.h" +#include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp" +#include "rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp" +#include "rmw_fastrtps_shared_cpp/rmw_common.hpp" +#include "fastrtps/utils/IPLocator.h" + + +namespace rmw_fastrtps_shared_cpp +{ + +using Locator_t = eprosima::fastrtps::rtps::Locator_t; +using LocatorList_t = eprosima::fastrtps::rtps::LocatorList_t; +using IPLocator = eprosima::fastrtps::rtps::IPLocator; + +rmw_ret_t fill_network_flow_endpoint(rmw_network_flow_endpoint_t *, const Locator_t &); + +rmw_ret_t +__rmw_publisher_get_network_flow_endpoints( + const rmw_publisher_t * publisher, + rcutils_allocator_t * allocator, + rmw_network_flow_endpoint_array_t * network_flow_endpoint_array) +{ + rmw_ret_t res = RMW_RET_OK; + + // Retrieve the sender locators + CustomPublisherInfo * data = + static_cast(publisher->data); + LocatorList_t locators; + data->publisher_->get_sending_locators(locators); + + if (locators.empty()) { + return res; + } + + // It must be a non-initialized array + if (RMW_RET_OK != + (res = rmw_network_flow_endpoint_array_check_zero(network_flow_endpoint_array))) + { + return res; + } + + // Allocate + if (RMW_RET_OK != + (res = rmw_network_flow_endpoint_array_init( + network_flow_endpoint_array, + locators.size(), + allocator))) + { + return res; + } + + // Translate the locators, on error reset the array + try { + auto rmw_nf = network_flow_endpoint_array->network_flow_endpoint; + for (const Locator_t & loc : locators) { + if (RMW_RET_OK != + (res = fill_network_flow_endpoint(rmw_nf++, loc))) + { + throw res; + } + } + } catch (rmw_ret_t) { + // clear the array + rmw_network_flow_endpoint_array_fini( + network_flow_endpoint_array); + + // set error message + RMW_SET_ERROR_MSG("Failed to compose network_flow_endpoint_array"); + } + + return res; +} + +rmw_ret_t +__rmw_subscription_get_network_flow_endpoints( + const rmw_subscription_t * subscription, + rcutils_allocator_t * allocator, + rmw_network_flow_endpoint_array_t * network_flow_endpoint_array) +{ + rmw_ret_t res = RMW_RET_OK; + + // Retrieve the listener locators + CustomSubscriberInfo * data = + static_cast(subscription->data); + LocatorList_t locators; + data->subscriber_->get_listening_locators(locators); + + if (locators.empty()) { + return res; + } + + // It must be a non-initialized array + if (RMW_RET_OK != + (res = rmw_network_flow_endpoint_array_check_zero(network_flow_endpoint_array))) + { + return res; + } + + // Allocate + if (RMW_RET_OK != + (res = rmw_network_flow_endpoint_array_init( + network_flow_endpoint_array, + locators.size(), + allocator))) + { + return res; + } + + // Translate the locators, on error reset the array + try { + auto rmw_nf = network_flow_endpoint_array->network_flow_endpoint; + for (const Locator_t & loc : locators) { + if (RMW_RET_OK != + (res = fill_network_flow_endpoint(rmw_nf++, loc))) + { + throw res; + } + } + } catch (rmw_ret_t) { + // clear the array + rmw_network_flow_endpoint_array_fini( + network_flow_endpoint_array); + + // set error message + RMW_SET_ERROR_MSG("Failed to compose network_flow_endpoint_array"); + } + + return res; +} + +// Ancillary translation methods +rmw_transport_protocol_t +get_transport_protocol(const Locator_t & loc) +{ + if (loc.kind & (LOCATOR_KIND_UDPv4 | LOCATOR_KIND_UDPv6)) { + return RMW_TRANSPORT_PROTOCOL_UDP; + } else if (loc.kind & (LOCATOR_KIND_TCPv4 | LOCATOR_KIND_TCPv6)) { + return RMW_TRANSPORT_PROTOCOL_TCP; + } + + return RMW_TRANSPORT_PROTOCOL_UNKNOWN; +} + +rmw_internet_protocol_t +get_internet_protocol(const Locator_t & loc) +{ + if (loc.kind & (LOCATOR_KIND_UDPv4 | LOCATOR_KIND_TCPv4)) { + return RMW_INTERNET_PROTOCOL_IPV4; + } else if (loc.kind & (LOCATOR_KIND_TCPv6 | LOCATOR_KIND_UDPv6)) { + return RMW_INTERNET_PROTOCOL_IPV6; + } + + return RMW_INTERNET_PROTOCOL_UNKNOWN; +} + +rmw_ret_t +fill_network_flow_endpoint( + rmw_network_flow_endpoint_t * network_flow_endpoint, + const Locator_t & locator) +{ + rmw_ret_t res = RMW_RET_OK; + + // Translate transport protocol + network_flow_endpoint->transport_protocol = get_transport_protocol(locator); + + // Translate internet protocol + network_flow_endpoint->internet_protocol = get_internet_protocol(locator); + + // Set the port + network_flow_endpoint->transport_port = IPLocator::getPhysicalPort(locator); + + // Set the address + std::string address = IPLocator::ip_to_string(locator); + + if (RMW_RET_OK != + (res = rmw_network_flow_endpoint_set_internet_address( + network_flow_endpoint, + address.c_str(), + address.length()))) + { + return res; + } + + return res; +} + +} // namespace rmw_fastrtps_shared_cpp