Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for multithreaded execution #156

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions rclcpp/include/rclcpp/any_executable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define RCLCPP__ANY_EXECUTABLE_HPP_

#include <memory>
#include <mutex>

#include "rclcpp/macros.hpp"
#include "rclcpp/node.hpp"
Expand All @@ -36,15 +37,36 @@ struct AnyExecutable
RCLCPP_PUBLIC
virtual ~AnyExecutable();

bool is_one_field_set() const;

rclcpp::subscription::SubscriptionBase::ConstSharedPtr get_subscription() const;
rclcpp::subscription::SubscriptionBase::ConstSharedPtr get_subscription_intra_process() const;
rclcpp::timer::TimerBase::ConstSharedPtr get_timer() const;
rclcpp::service::ServiceBase::SharedPtr get_service() const;
rclcpp::client::ClientBase::SharedPtr get_client() const;
// These are used to keep the scope on the containing items
rclcpp::callback_group::CallbackGroup::SharedPtr get_callback_group() const;
rclcpp::node::Node::SharedPtr get_node() const;

void set_subscription(const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription);
void set_subscription_intra_process(const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription);
void set_timer(const rclcpp::timer::TimerBase::ConstSharedPtr timer);
void set_service(const rclcpp::service::ServiceBase::SharedPtr service);
void set_client(const rclcpp::client::ClientBase::SharedPtr client);
// These are used to keep the scope on the containing items
void set_callback_group(const rclcpp::callback_group::CallbackGroup::SharedPtr callback_group);
void set_node(const rclcpp::node::Node::SharedPtr node);

private:
// Only one of the following pointers will be set.
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process;
rclcpp::timer::TimerBase::SharedPtr timer;
rclcpp::service::ServiceBase::SharedPtr service;
rclcpp::client::ClientBase::SharedPtr client;
rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription_;
rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription_intra_process_;
rclcpp::timer::TimerBase::ConstSharedPtr timer_;
rclcpp::service::ServiceBase::SharedPtr service_;
rclcpp::client::ClientBase::SharedPtr client_;
// These are used to keep the scope on the containing items
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group;
rclcpp::node::Node::SharedPtr node;
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group_;
rclcpp::node::Node::SharedPtr node_;
};

} // namespace executor
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/any_subscription_callback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class AnySubscriptionCallback
}

void dispatch(
std::shared_ptr<MessageT> message, const rmw_message_info_t & message_info)
std::shared_ptr<MessageT> message, const rmw_message_info_t & message_info) const
{
(void)message_info;
if (shared_ptr_callback_) {
Expand All @@ -179,7 +179,7 @@ class AnySubscriptionCallback
}

void dispatch_intra_process(
MessageUniquePtr & message, const rmw_message_info_t & message_info)
MessageUniquePtr & message, const rmw_message_info_t & message_info) const
{
(void)message_info;
if (shared_ptr_callback_) {
Expand Down
12 changes: 7 additions & 5 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <iostream>
#include <list>
#include <memory>
#include <mutex>
#include <vector>

#include "rclcpp/any_executable.hpp"
Expand Down Expand Up @@ -207,21 +208,21 @@ class Executor
*/
RCLCPP_PUBLIC
void
execute_any_executable(AnyExecutable::SharedPtr any_exec);
execute_any_executable(const AnyExecutable::ConstSharedPtr any_exec);

RCLCPP_PUBLIC
static void
execute_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription);
const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription);

RCLCPP_PUBLIC
static void
execute_intra_process_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription);
const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription);

RCLCPP_PUBLIC
static void
execute_timer(rclcpp::timer::TimerBase::SharedPtr timer);
execute_timer(const rclcpp::timer::TimerBase::ConstSharedPtr timer);

RCLCPP_PUBLIC
static void
Expand All @@ -244,7 +245,7 @@ class Executor
get_group_by_timer(rclcpp::timer::TimerBase::SharedPtr timer);

RCLCPP_PUBLIC
void
bool
get_next_timer(AnyExecutable::SharedPtr any_exec);

RCLCPP_PUBLIC
Expand All @@ -264,6 +265,7 @@ class Executor

/// Guard condition for signaling the rmw layer to wake up for special events.
rmw_guard_condition_t * interrupt_guard_condition_;
std::mutex trigger_guard_condition_mutex_;

/// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
Expand Down
5 changes: 4 additions & 1 deletion rclcpp/include/rclcpp/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ class IntraProcessManager
uint64_t intra_process_publisher_id,
std::unique_ptr<MessageT, Deleter> & message)
{
std::lock_guard<std::mutex> lock(mutex_);
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
uint64_t message_seq = 0;
Expand Down Expand Up @@ -303,6 +304,7 @@ class IntraProcessManager
uint64_t requesting_subscriptions_intra_process_id,
std::unique_ptr<MessageT, Deleter> & message)
{
std::lock_guard<std::mutex> lock(mutex_);
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
using TypedMRB = mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
message = nullptr;
Expand Down Expand Up @@ -331,14 +333,15 @@ class IntraProcessManager
/// Return true if the given rmw_gid_t matches any stored Publishers.
RCLCPP_PUBLIC
bool
matches_any_publishers(const rmw_gid_t * id) const;
matches_any_publishers(const rmw_gid_t * id);

private:
RCLCPP_PUBLIC
static uint64_t
get_next_unique_id();

IntraProcessManagerStateBase::SharedPtr state_;
std::mutex mutex_;
};

} // namespace intra_process_manager
Expand Down
44 changes: 26 additions & 18 deletions rclcpp/include/rclcpp/intra_process_manager_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <limits>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -90,20 +91,19 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
void
add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)
{
subscriptions_[id] = subscription;
subscription_ids_by_topic_[subscription->get_topic_name()].insert(id);
}

void
remove_subscription(uint64_t intra_process_subscription_id)
{
subscriptions_.erase(intra_process_subscription_id);
for (auto & pair : subscription_ids_by_topic_) {
pair.second.erase(intra_process_subscription_id);
}
// Iterate over all publisher infos and all stored subscription id's and
// remove references to this subscription's id.
for (auto & publisher_pair : publishers_) {
std::lock_guard<std::mutex> lock(publisher_pair.second.target_subscriptions_mutex_);
for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
sub_pair.second.erase(intra_process_subscription_id);
}
Expand All @@ -123,7 +123,10 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
publishers_[id].sequence_number.store(0);

publishers_[id].buffer = mrb;
publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
{
std::lock_guard<std::mutex> lock(publishers_[id].target_subscriptions_mutex_);
publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
}
}

void
Expand All @@ -138,6 +141,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
uint64_t intra_process_publisher_id,
uint64_t & message_seq)
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id");
Expand All @@ -152,6 +156,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
void
store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id");
Expand All @@ -165,16 +170,19 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
// Figure out what subscriptions should receive the message.
auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()];
// Store the list for later comparison.
info.target_subscriptions_by_message_sequence[message_seq].clear();
std::copy(
destined_subscriptions.begin(), destined_subscriptions.end(),
// Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq]
std::inserter(
info.target_subscriptions_by_message_sequence[message_seq],
// This ends up only being a hint to std::set, could also be .begin().
info.target_subscriptions_by_message_sequence[message_seq].end()
)
);
{
std::lock_guard<std::mutex> lock(info.target_subscriptions_mutex_);
info.target_subscriptions_by_message_sequence[message_seq].clear();
std::copy(
destined_subscriptions.begin(), destined_subscriptions.end(),
// Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq]
std::inserter(
info.target_subscriptions_by_message_sequence[message_seq],
// This ends up only being a hint to std::set, could also be .begin().
info.target_subscriptions_by_message_sequence[message_seq].end()
)
);
}
}

mapped_ring_buffer::MappedRingBufferBase::SharedPtr
Expand All @@ -184,6 +192,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
size_t & size
)
{
std::lock_guard<std::mutex> lock(mutex_);
PublisherInfo * info;
{
auto it = publishers_.find(intra_process_publisher_id);
Expand All @@ -196,6 +205,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
// Figure out how many subscriptions are left.
AllocSet * target_subs;
{
std::lock_guard<std::mutex> lock(info->target_subscriptions_mutex_);
auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number);
if (it == info->target_subscriptions_by_message_sequence.end()) {
// Message is no longer being stored by this publisher.
Expand Down Expand Up @@ -239,14 +249,9 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
using RebindAlloc = typename std::allocator_traits<Allocator>::template rebind_alloc<T>;

using AllocSet = std::set<uint64_t, std::less<uint64_t>, RebindAlloc<uint64_t>>;
using SubscriptionMap = std::unordered_map<uint64_t, subscription::SubscriptionBase::WeakPtr,
std::hash<uint64_t>, std::equal_to<uint64_t>,
RebindAlloc<std::pair<const uint64_t, subscription::SubscriptionBase::WeakPtr>>>;
using IDTopicMap = std::map<std::string, AllocSet,
std::less<std::string>, RebindAlloc<std::pair<std::string, AllocSet>>>;

SubscriptionMap subscriptions_;

IDTopicMap subscription_ids_by_topic_;

struct PublisherInfo
Expand All @@ -263,13 +268,16 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
std::hash<uint64_t>, std::equal_to<uint64_t>,
RebindAlloc<std::pair<const uint64_t, AllocSet>>>;
TargetSubscriptionsMap target_subscriptions_by_message_sequence;
std::mutex target_subscriptions_mutex_;
};

using PublisherMap = std::unordered_map<uint64_t, PublisherInfo,
std::hash<uint64_t>, std::equal_to<uint64_t>,
RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;

PublisherMap publishers_;

std::mutex mutex_;
};

RCLCPP_PUBLIC
Expand Down
4 changes: 3 additions & 1 deletion rclcpp/include/rclcpp/macros.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
RCLCPP_WEAK_PTR_DEFINITIONS(__VA_ARGS__) \
__RCLCPP_UNIQUE_PTR_ALIAS(__VA_ARGS__)

#define __RCLCPP_SHARED_PTR_ALIAS(...) using SharedPtr = std::shared_ptr<__VA_ARGS__>;
#define __RCLCPP_SHARED_PTR_ALIAS(...)\
using SharedPtr = std::shared_ptr<__VA_ARGS__>; \
using ConstSharedPtr = std::shared_ptr<const __VA_ARGS__>;

#define __RCLCPP_MAKE_SHARED_DEFINITION(...) \
template<typename ... Args> \
Expand Down
7 changes: 7 additions & 0 deletions rclcpp/include/rclcpp/mapped_ring_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <vector>

#include "rclcpp/allocator/allocator_common.hpp"
Expand Down Expand Up @@ -97,6 +98,7 @@ class MappedRingBuffer : public MappedRingBufferBase
void
get_copy_at_key(uint64_t key, ElemUniquePtr & value)
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = get_iterator_of_key(key);
value = nullptr;
if (it != elements_.end() && it->in_use) {
Expand Down Expand Up @@ -128,6 +130,7 @@ class MappedRingBuffer : public MappedRingBufferBase
void
get_ownership_at_key(uint64_t key, ElemUniquePtr & value)
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = get_iterator_of_key(key);
value = nullptr;
if (it != elements_.end() && it->in_use) {
Expand Down Expand Up @@ -155,6 +158,7 @@ class MappedRingBuffer : public MappedRingBufferBase
void
pop_at_key(uint64_t key, ElemUniquePtr & value)
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = get_iterator_of_key(key);
value = nullptr;
if (it != elements_.end() && it->in_use) {
Expand All @@ -177,6 +181,7 @@ class MappedRingBuffer : public MappedRingBufferBase
bool
push_and_replace(uint64_t key, ElemUniquePtr & value)
{
std::lock_guard<std::mutex> lock(mutex_);
bool did_replace = elements_[head_].in_use;
elements_[head_].key = key;
elements_[head_].value.swap(value);
Expand All @@ -196,6 +201,7 @@ class MappedRingBuffer : public MappedRingBufferBase
bool
has_key(uint64_t key)
{
std::lock_guard<std::mutex> lock(mutex_);
return elements_.end() != get_iterator_of_key(key);
}

Expand Down Expand Up @@ -225,6 +231,7 @@ class MappedRingBuffer : public MappedRingBufferBase
std::vector<element, VectorAlloc> elements_;
size_t head_;
std::shared_ptr<ElemAlloc> allocator_;
std::mutex mutex_;
};

} // namespace mapped_ring_buffer
Expand Down
10 changes: 5 additions & 5 deletions rclcpp/include/rclcpp/memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ class RCLCPP_PUBLIC MemoryStrategy
// \return Shared pointer to the fresh executable.
virtual rclcpp::executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0;

virtual void
virtual bool
get_next_subscription(rclcpp::executor::AnyExecutable::SharedPtr any_exec,
const WeakNodeVector & weak_nodes) = 0;

virtual void
virtual bool
get_next_service(rclcpp::executor::AnyExecutable::SharedPtr any_exec,
const WeakNodeVector & weak_nodes) = 0;

virtual void
virtual bool
get_next_client(rclcpp::executor::AnyExecutable::SharedPtr any_exec,
const WeakNodeVector & weak_nodes) = 0;

static rclcpp::subscription::SubscriptionBase::SharedPtr
static rclcpp::subscription::SubscriptionBase::ConstSharedPtr
get_subscription_by_handle(void * subscriber_handle,
const WeakNodeVector & weak_nodes);

Expand All @@ -87,7 +87,7 @@ class RCLCPP_PUBLIC MemoryStrategy

static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription,
const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription,
const WeakNodeVector & weak_nodes);

static rclcpp::callback_group::CallbackGroup::SharedPtr
Expand Down
Loading