Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in more of the necessary logic on the publish side. #2

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 78 additions & 28 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class Publisher : public PublisherBase
get_subscription_count() > get_intra_process_subscription_count();

if (inter_process_publish_needed) {
auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg));
auto shared_msg = this->do_intra_process_publish_and_return_shared<T>(std::move(msg));
this->do_inter_process_publish(*shared_msg);
} else {
this->do_intra_process_publish(std::move(msg));
Expand Down Expand Up @@ -321,15 +321,29 @@ class Publisher : public PublisherBase
>
publish(std::unique_ptr<T, PublishedTypeDeleter> msg)
{
// TODO(wjwwood): later update this to give the unique_ptr to the intra
// process manager and let it decide if it needs to be converted or not.
// For now, convert it unconditionally and pass it the ROSMessageType
// publish function specialization.
std::cout << "Publisher LALA" << std::endl;
// auto unique_ros_msg = this->create_ros_message_unique_ptr();
// rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg);
//this->publish(std::move(unique_ros_msg));
this->do_intra_process_publish(std::move(msg));
if (!intra_process_is_enabled_) {
// If we aren't using intraprocess at all, do the type conversion
// immediately and publish interprocess.
auto unique_ros_msg = this->create_ros_message_unique_ptr();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg);
this->do_inter_process_publish(*unique_ros_msg);
return;
}
// If an interprocess subscription exists, then the unique_ptr is promoted
// to a shared_ptr and published.
// This allows doing the intraprocess publish first and then doing the
// interprocess publish, resulting in lower publish-to-subscribe latency.
// It's not possible to do that with an unique_ptr,
// as do_intra_process_publish takes the ownership of the message.
bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count();

if (inter_process_publish_needed) {
auto shared_msg = this->do_intra_process_publish_and_return_shared<T>(std::move(msg));
// this->do_inter_process_publish(*shared_msg);
} else {
this->do_intra_process_publish(std::move(msg));
}
}

/// Publish a message on the topic.
Expand All @@ -350,26 +364,18 @@ class Publisher : public PublisherBase
>
publish(const T & msg)
{
// TODO(wjwwood): later update this to give the unique_ptr to the intra
// process manager and let it decide if it needs to be converted or not.
// For now, convert it unconditionally and pass it the ROSMessageType
// publish function specialization.

// Avoid allocating when not using intra process.
// Avoid double allocation when not using intra process.
if (!intra_process_is_enabled_) {
// Convert to the ROS message equivalent and publish it.
ROSMessageType ros_msg;
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, ros_msg);
// In this case we're not using intra process.
return this->do_inter_process_publish(ros_msg);
this->do_inter_process_publish(ros_msg);
return;
}
// Otherwise we have to allocate memory in a unique_ptr, convert it,
// and pass it along.
// As the message is not const, a copy should be made.
// A shared_ptr<const MessageT> could also be constructed here.
auto unique_ros_msg = this->create_ros_message_unique_ptr();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(msg, *unique_ros_msg);
this->publish(std::move(unique_ros_msg));
// Otherwise we have to allocate memory in a unique_ptr and pass it along.
auto unique_msg = this->duplicate_type_adapt_message_as_unique_ptr(msg);
this->publish(std::move(unique_msg));
}

void
Expand Down Expand Up @@ -494,8 +500,8 @@ class Publisher : public PublisherBase
}
}

void
do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
void
do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
Expand All @@ -513,7 +519,12 @@ do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> ms
}


std::shared_ptr<const PublishedType>
template<typename T>
typename
std::enable_if_t<
rosidl_generator_traits::is_message<T>::value &&
std::is_same<T, ROSMessageType>::value, std::shared_ptr<const ROSMessageType>
>
do_intra_process_publish_and_return_shared(
std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
{
Expand All @@ -533,6 +544,36 @@ do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> ms
published_type_allocator_);
}

template<typename T>
typename
std::enable_if_t<
rclcpp::TypeAdapter<MessageT>::is_specialized::value &&
std::is_same<T, PublishedType>::value, std::shared_ptr<const ROSMessageType>
>
do_intra_process_publish_and_return_shared(
std::unique_ptr<PublishedType, PublishedTypeDeleter> msg)
{
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
if (!msg) {
throw std::runtime_error("cannot publish msg which is a null pointer");
}

ipm->template do_intra_process_publish_and_return_shared<PublishedType,
AllocatorT>(
intra_process_publisher_id_,
std::move(msg),
published_type_allocator_);

auto unique_ros_msg = this->create_ros_message_unique_ptr();
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*msg, *unique_ros_msg);

return unique_ros_msg;
}

/// Return a new unique_ptr using the ROSMessageType of the publisher.
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
create_ros_message_unique_ptr()
Expand All @@ -542,7 +583,7 @@ do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> ms
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
}

/// Duplicate a given ros message as a unique_ptr.
/// Duplicate a given ROS message as a unique_ptr.
std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>
duplicate_ros_message_as_unique_ptr(const ROSMessageType & msg)
{
Expand All @@ -551,6 +592,15 @@ do_intra_process_publish(std::unique_ptr<PublishedType, PublishedTypeDeleter> ms
return std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, ros_message_type_deleter_);
}

/// Duplicate a given type adapted message as a unique_ptr.
std::unique_ptr<PublishedType, PublishedTypeDeleter>
duplicate_type_adapt_message_as_unique_ptr(const PublishedType & msg)
{
auto ptr = PublishedTypeAllocatorTraits::allocate(published_type_allocator_, 1);
PublishedTypeAllocatorTraits::construct(published_type_allocator_, ptr, msg);
return std::unique_ptr<PublishedType, PublishedTypeDeleter>(ptr, published_type_deleter_);
}

/// Copy of original options passed during construction.
/**
* It is important to save a copy of this so that the rmw payload which it
Expand Down