Skip to content

Commit

Permalink
Unique network flows (ros2#502)
Browse files Browse the repository at this point in the history
* Changes required to incorporate the network flows feature

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Update API names to match rmw updated ones

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Update to new options interface.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Solve windows linkage issues.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Remove unique flows support on publishers.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Update subscription creation to new Fast DDS interface

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Update to new locator getters interface.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Update to endpoints interface.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Fix linting

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

* Make linters happy

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

* Use updated rmw interface

Signed-off-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>

* Uncrustify

Signed-off-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>

* Include C++ header before others

Signed-off-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>

* Use updated rmw interface to populate flow endpoint

Signed-off-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>

* Fixed unreferenced local variable warnings

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: Miguel Barro <miguelbarro@eprosima.com>
Co-authored-by: EduPonz <eduardoponz@eprosima.com>
Co-authored-by: Ananya Muddukrishna <ananya.x.muddukrishna@ericsson.com>
  • Loading branch information
4 people authored Apr 5, 2021
1 parent a7be807 commit ae4485e
Show file tree
Hide file tree
Showing 11 changed files with 402 additions and 3 deletions.
1 change: 1 addition & 0 deletions rmw_fastrtps_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ add_library(rmw_fastrtps_cpp
src/serialization_format.cpp
src/subscription.cpp
src/type_support_common.cpp
src/rmw_get_endpoint_network_flow.cpp
)
target_link_libraries(rmw_fastrtps_cpp
fastcdr fastrtps)
Expand Down
7 changes: 7 additions & 0 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ rmw_fastrtps_cpp::create_publisher(
}
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr);

if (RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED ==
publisher_options->require_unique_network_flow_endpoints)
{
RMW_SET_ERROR_MSG("Unique network flow endpoints not supported on publishers");
return nullptr;
}

Participant * participant = participant_info->participant;
RMW_CHECK_ARGUMENT_FOR_NULL(participant, nullptr);

Expand Down
46 changes: 46 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_get_endpoint_network_flow.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rmw/get_network_flow_endpoints.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rmw/types.h"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"

extern "C"
{
rmw_ret_t
rmw_publisher_get_network_flow_endpoints(
const rmw_publisher_t * publisher,
rcutils_allocator_t * allocator,
rmw_network_flow_endpoint_array_t * network_flow_endpoint_array)
{
return rmw_fastrtps_shared_cpp::__rmw_publisher_get_network_flow_endpoints(
publisher,
allocator,
network_flow_endpoint_array);
}

rmw_ret_t
rmw_subscription_get_network_flow_endpoints(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_network_flow_endpoint_array_t * network_flow_endpoint_array)
{
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_network_flow_endpoints(
subscription,
allocator,
network_flow_endpoint_array);
}
} // extern "C"
36 changes: 35 additions & 1 deletion rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
using Domain = eprosima::fastrtps::Domain;
using Participant = eprosima::fastrtps::Participant;
using TopicDataType = eprosima::fastrtps::TopicDataType;
using PropertyPolicyHelper = eprosima::fastrtps::rtps::PropertyPolicyHelper;
using XMLProfileManager = eprosima::fastrtps::xmlparser::XMLProfileManager;

namespace rmw_fastrtps_cpp
Expand Down Expand Up @@ -169,7 +170,40 @@ create_subscription(
}
}

info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_);
eprosima::fastrtps::SubscriberAttributes originalParam = subscriberParam;
switch (subscription_options->require_unique_network_flow_endpoints) {
default:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_SYSTEM_DEFAULT:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED:
// Unique network flow endpoints not required. We leave the decission to the XML profile.
break;

case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED:
// Ensure we request unique network flow endpoints
if (nullptr ==
PropertyPolicyHelper::find_property(
subscriberParam.properties,
"fastdds.unique_network_flows"))
{
subscriberParam.properties.properties().emplace_back("fastdds.unique_network_flows", "");
}
break;
}

info->subscriber_ = Domain::createSubscriber(
participant,
subscriberParam,
info->listener_);
if (!info->subscriber_ &&
(RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED ==
subscription_options->require_unique_network_flow_endpoints))
{
info->subscriber_ = Domain::createSubscriber(
participant,
originalParam,
info->listener_);
}
if (!info->subscriber_) {
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber");
return nullptr;
Expand Down
1 change: 1 addition & 0 deletions rmw_fastrtps_dynamic_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ add_library(rmw_fastrtps_dynamic_cpp
src/type_support_common.cpp
src/type_support_proxy.cpp
src/type_support_registry.cpp
src/rmw_get_endpoint_network_flow.cpp
)
target_link_libraries(rmw_fastrtps_dynamic_cpp
fastcdr fastrtps)
Expand Down
12 changes: 11 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
}
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr);

if (RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED ==
publisher_options->require_unique_network_flow_endpoints)
{
RMW_SET_ERROR_MSG("Unique network flow endpoints not supported on publishers");
return nullptr;
}

Participant * participant = participant_info->participant;
RMW_CHECK_ARGUMENT_FOR_NULL(participant, nullptr);

Expand Down Expand Up @@ -193,7 +200,10 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
return nullptr;
}

info->publisher_ = Domain::createPublisher(participant, publisherParam, info->listener_);
info->publisher_ = Domain::createPublisher(
participant,
publisherParam,
info->listener_);
if (!info->publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
return nullptr;
Expand Down
46 changes: 46 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_get_endpoint_network_flow.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rmw/types.h"
#include "rmw/get_network_flow_endpoints.h"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"

extern "C"
{
rmw_ret_t
rmw_publisher_get_network_flow_endpoints(
const rmw_publisher_t * publisher,
rcutils_allocator_t * allocator,
rmw_network_flow_endpoint_array_t * network_flow_endpoint_array)
{
return rmw_fastrtps_shared_cpp::__rmw_publisher_get_network_flow_endpoints(
publisher,
allocator,
network_flow_endpoint_array);
}

rmw_ret_t
rmw_subscription_get_network_flow_endpoints(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_network_flow_endpoint_array_t * network_flow_endpoint_array)
{
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_network_flow_endpoints(
subscription,
allocator,
network_flow_endpoint_array);
}
} // extern "C"
36 changes: 35 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport;
using Domain = eprosima::fastrtps::Domain;
using Participant = eprosima::fastrtps::Participant;
using PropertyPolicyHelper = eprosima::fastrtps::rtps::PropertyPolicyHelper;
using TopicDataType = eprosima::fastrtps::TopicDataType;
using TypeSupportProxy = rmw_fastrtps_dynamic_cpp::TypeSupportProxy;
using XMLProfileManager = eprosima::fastrtps::xmlparser::XMLProfileManager;
Expand Down Expand Up @@ -180,7 +181,40 @@ create_subscription(
return nullptr;
}

info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_);
eprosima::fastrtps::SubscriberAttributes originalParam = subscriberParam;
switch (subscription_options->require_unique_network_flow_endpoints) {
default:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_SYSTEM_DEFAULT:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED:
// Unique network flow endpoints not required. We leave the decission to the XML profile.
break;

case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED:
// Ensure we request unique network flow endpoints
if (nullptr ==
PropertyPolicyHelper::find_property(
subscriberParam.properties,
"fastdds.unique_network_flows"))
{
subscriberParam.properties.properties().emplace_back("fastdds.unique_network_flows", "");
}
break;
}

info->subscriber_ = Domain::createSubscriber(
participant,
subscriberParam,
info->listener_);
if (!info->subscriber_ &&
(RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED ==
subscription_options->require_unique_network_flow_endpoints))
{
info->subscriber_ = Domain::createSubscriber(
participant,
originalParam,
info->listener_);
}
if (!info->subscriber_) {
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber");
return nullptr;
Expand Down
1 change: 1 addition & 0 deletions rmw_fastrtps_shared_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ add_library(rmw_fastrtps_shared_cpp
src/rmw_wait_set.cpp
src/subscription.cpp
src/TypeSupport_impl.cpp
src/rmw_get_endpoint_network_flow.cpp
)
target_include_directories(rmw_fastrtps_shared_cpp
PUBLIC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "rmw/topic_endpoint_info_array.h"
#include "rmw/types.h"
#include "rmw/names_and_types.h"
#include "rmw/network_flow_endpoint_array.h"

namespace rmw_fastrtps_shared_cpp
{
Expand Down Expand Up @@ -401,6 +402,20 @@ __rmw_qos_profile_check_compatible(
char * reason,
size_t reason_size);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_publisher_get_network_flow_endpoints(
const rmw_publisher_t * publisher,
rcutils_allocator_t * allocator,
rmw_network_flow_endpoint_array_t * network_flow_endpoint_array);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_subscription_get_network_flow_endpoints(
const rmw_subscription_t * subscription,
rcutils_allocator_t * allocator,
rmw_network_flow_endpoint_array_t * network_flow_endpoint_array);

} // namespace rmw_fastrtps_shared_cpp

#endif // RMW_FASTRTPS_SHARED_CPP__RMW_COMMON_HPP_
Loading

0 comments on commit ae4485e

Please sign in to comment.