From 461c04dcbb33452cbb1d77bea4346d6bf1acd836 Mon Sep 17 00:00:00 2001 From: Chris Lalancette <clalancette@openrobotics.org> Date: Tue, 7 Dec 2021 11:12:59 -0500 Subject: [PATCH] Add in more of the necessary logic on the publish side. That is, make sure to deal with inter-process publishing properly. This requires us to introduce two copies of do_intra_process_and_return_shared(), one of which deals with the ROS Message type and the other that deals with the PublishedType. This is kind of wasteful, and we may get rid of this later on, but this works for now. Signed-off-by: Chris Lalancette <clalancette@openrobotics.org> --- rclcpp/include/rclcpp/publisher.hpp | 106 ++++++++++++++++++++-------- 1 file changed, 78 insertions(+), 28 deletions(-) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index f2dbba2846..f38295fda0 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -265,7 +265,7 @@ class Publisher : public PublisherBase get_subscription_count() > get_intra_process_subscription_count(); if (inter_process_publish_needed) { - auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg)); + auto shared_msg = this->do_intra_process_publish_and_return_shared<T>(std::move(msg)); this->do_inter_process_publish(*shared_msg); } else { this->do_intra_process_publish(std::move(msg)); @@ -321,15 +321,29 @@ class Publisher : public PublisherBase > publish(std::unique_ptr<T, PublishedTypeDeleter> msg) { - // TODO(wjwwood): later update this to give the unique_ptr to the intra - // process manager and let it decide if it needs to be converted or not. - // For now, convert it unconditionally and pass it the ROSMessageType - // publish function specialization. - std::cout << "Publisher LALA" << std::endl; - // auto unique_ros_msg = this->create_ros_message_unique_ptr(); - // rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg); - //this->publish(std::move(unique_ros_msg)); - this->do_intra_process_publish(std::move(msg)); + if (!intra_process_is_enabled_) { + // If we aren't using intraprocess at all, do the type conversion + // immediately and publish interprocess. + auto unique_ros_msg = this->create_ros_message_unique_ptr(); + rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg); + this->do_inter_process_publish(*unique_ros_msg); + return; + } + // If an interprocess subscription exists, then the unique_ptr is promoted + // to a shared_ptr and published. + // This allows doing the intraprocess publish first and then doing the + // interprocess publish, resulting in lower publish-to-subscribe latency. + // It's not possible to do that with an unique_ptr, + // as do_intra_process_publish takes the ownership of the message. + bool inter_process_publish_needed = + get_subscription_count() > get_intra_process_subscription_count(); + + if (inter_process_publish_needed) { + auto shared_msg = this->do_intra_process_publish_and_return_shared<T>(std::move(msg)); + // this->do_inter_process_publish(*shared_msg); + } else { + this->do_intra_process_publish(std::move(msg)); + } } /// Publish a message on the topic. @@ -350,26 +364,18 @@ class Publisher : public PublisherBase > publish(const T & msg) { - // TODO(wjwwood): later update this to give the unique_ptr to the intra - // process manager and let it decide if it needs to be converted or not. - // For now, convert it unconditionally and pass it the ROSMessageType - // publish function specialization. - - // Avoid allocating when not using intra process. + // Avoid double allocation when not using intra process. if (!intra_process_is_enabled_) { // Convert to the ROS message equivalent and publish it. ROSMessageType ros_msg; rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, ros_msg); // In this case we're not using intra process. - return this->do_inter_process_publish(ros_msg); + this->do_inter_process_publish(ros_msg); + return; } - // Otherwise we have to allocate memory in a unique_ptr, convert it, - // and pass it along. - // As the message is not const, a copy should be made. - // A shared_ptr<const MessageT> could also be constructed here. - auto unique_ros_msg = this->create_ros_message_unique_ptr(); - rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *unique_ros_msg); - this->publish(std::move(unique_ros_msg)); + // Otherwise we have to allocate memory in a unique_ptr and pass it along. + auto unique_msg = this->duplicate_type_adapt_message_as_unique_ptr(msg); + this->publish(std::move(unique_msg)); } void @@ -494,8 +500,8 @@ class Publisher : public PublisherBase } } -void -do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> msg) + void + do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> msg) { auto ipm = weak_ipm_.lock(); if (!ipm) { @@ -513,7 +519,12 @@ do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> ms } - std::shared_ptr<const PublishedType> + template<typename T> + typename + std::enable_if_t< + rosidl_generator_traits::is_message<T>::value && + std::is_same<T, ROSMessageType>::value, std::shared_ptr<const ROSMessageType> + > do_intra_process_publish_and_return_shared( std::unique_ptr<PublishedType, PublishedTypeDeleter> msg) { @@ -533,6 +544,36 @@ do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> ms published_type_allocator_); } + template<typename T> + typename + std::enable_if_t< + rclcpp::TypeAdapter<MessageT>::is_specialized::value && + std::is_same<T, PublishedType>::value, std::shared_ptr<const ROSMessageType> + > + do_intra_process_publish_and_return_shared( + std::unique_ptr<PublishedType, PublishedTypeDeleter> msg) + { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process publish called after destruction of intra process manager"); + } + if (!msg) { + throw std::runtime_error("cannot publish msg which is a null pointer"); + } + + ipm->template do_intra_process_publish_and_return_shared<PublishedType, + AllocatorT>( + intra_process_publisher_id_, + std::move(msg), + published_type_allocator_); + + auto unique_ros_msg = this->create_ros_message_unique_ptr(); + rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg); + + return unique_ros_msg; + } + /// Return a new unique_ptr using the ROSMessageType of the publisher. std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> create_ros_message_unique_ptr() @@ -542,7 +583,7 @@ do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> ms return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_); } - /// Duplicate a given ros message as a unique_ptr. + /// Duplicate a given ROS message as a unique_ptr. std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter> duplicate_ros_message_as_unique_ptr(const ROSMessageType & msg) { @@ -551,6 +592,15 @@ do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> ms return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_); } + /// Duplicate a given type adapted message as a unique_ptr. + std::unique_ptr<PublishedType, PublishedTypeDeleter> + duplicate_type_adapt_message_as_unique_ptr(const PublishedType & msg) + { + auto ptr = PublishedTypeAllocatorTraits::allocate(published_type_allocator_, 1); + PublishedTypeAllocatorTraits::construct(published_type_allocator_, ptr, msg); + return std::unique_ptr<PublishedType, PublishedTypeDeleter>(ptr, published_type_deleter_); + } + /// Copy of original options passed during construction. /** * It is important to save a copy of this so that the rmw payload which it