From 998bcb260b33bbf8631857dde811dca364849282 Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Fri, 13 Nov 2015 18:18:08 -0800 Subject: [PATCH 1/8] fix multithreaded error --- .../executors/multi_threaded_executor.hpp | 1 + .../rclcpp/intra_process_manager_state.hpp | 7 ------- .../executors/multi_threaded_executor.cpp | 19 ++++++++++++++----- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 30ca1b045d..cd5cbfb1e3 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -62,6 +62,7 @@ class MultiThreadedExecutor : public executor::Executor std::mutex wait_mutex_; size_t number_of_threads_; std::unordered_map thread_number_by_thread_id_; + std::vector thread_executables; }; } // namespace multi_threaded_executor diff --git a/rclcpp/include/rclcpp/intra_process_manager_state.hpp b/rclcpp/include/rclcpp/intra_process_manager_state.hpp index 0649505f02..fddaca4c15 100644 --- a/rclcpp/include/rclcpp/intra_process_manager_state.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager_state.hpp @@ -90,14 +90,12 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase void add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) { - subscriptions_[id] = subscription; subscription_ids_by_topic_[subscription->get_topic_name()].insert(id); } void remove_subscription(uint64_t intra_process_subscription_id) { - subscriptions_.erase(intra_process_subscription_id); for (auto & pair : subscription_ids_by_topic_) { pair.second.erase(intra_process_subscription_id); } @@ -239,14 +237,9 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase using RebindAlloc = typename std::allocator_traits::template rebind_alloc; using AllocSet = std::set, RebindAlloc>; - using SubscriptionMap = std::unordered_map, std::equal_to, - RebindAlloc>>; using IDTopicMap = std::map, RebindAlloc>>; - SubscriptionMap subscriptions_; - IDTopicMap subscription_ids_by_topic_; struct PublisherInfo diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 63d555ff12..e4f37beba8 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -14,6 +14,7 @@ #include "rclcpp/executors/multi_threaded_executor.hpp" +#include #include #include #include @@ -31,6 +32,7 @@ MultiThreadedExecutor::MultiThreadedExecutor(rclcpp::memory_strategy::MemoryStra if (number_of_threads_ == 0) { number_of_threads_ = 1; } + thread_executables.resize(number_of_threads_); } MultiThreadedExecutor::~MultiThreadedExecutor() {} @@ -45,10 +47,10 @@ MultiThreadedExecutor::spin() std::vector threads; { std::lock_guard wait_lock(wait_mutex_); - size_t thread_id = 1; - for (size_t i = number_of_threads_; i > 0; --i) { + //for (size_t i = number_of_threads_-1; i >= 0; --i) { + for(size_t i = 0; i < number_of_threads_; ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); - auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id++); + auto func = std::bind(&MultiThreadedExecutor::run, this, i); threads.emplace_back(func); } } @@ -74,8 +76,15 @@ MultiThreadedExecutor::run(size_t this_thread_number) if (!rclcpp::utilities::ok() || !spinning.load()) { return; } - any_exec = get_next_executable(); + assert(this_thread_number < thread_executables.size()); + thread_executables[this_thread_number] = get_next_executable(); + } + + execute_any_executable(thread_executables[this_thread_number]); + + { + std::lock_guard wait_lock(wait_mutex_); + thread_executables[this_thread_number] = nullptr; } - execute_any_executable(any_exec); } } From 4dc7e6720ddca1dc54066c90b98bc79318a7cf29 Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Mon, 16 Nov 2015 10:37:49 -0800 Subject: [PATCH 2/8] explicitly construct thread vector --- rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index e4f37beba8..28665fec32 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -32,7 +32,7 @@ MultiThreadedExecutor::MultiThreadedExecutor(rclcpp::memory_strategy::MemoryStra if (number_of_threads_ == 0) { number_of_threads_ = 1; } - thread_executables.resize(number_of_threads_); + thread_executables.resize(number_of_threads_, nullptr); } MultiThreadedExecutor::~MultiThreadedExecutor() {} @@ -47,7 +47,6 @@ MultiThreadedExecutor::spin() std::vector threads; { std::lock_guard wait_lock(wait_mutex_); - //for (size_t i = number_of_threads_-1; i >= 0; --i) { for(size_t i = 0; i < number_of_threads_; ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); auto func = std::bind(&MultiThreadedExecutor::run, this, i); From 10f4b98312add50b2c335bc47af9b408a1a1cd8b Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Mon, 16 Nov 2015 18:01:29 -0800 Subject: [PATCH 3/8] ospl intraprocess is still very buggy --- .../rclcpp/any_subscription_callback.hpp | 4 +- rclcpp/include/rclcpp/executor.hpp | 10 ++--- .../include/rclcpp/intra_process_manager.hpp | 3 ++ .../rclcpp/intra_process_manager_state.hpp | 37 +++++++++++++------ rclcpp/include/rclcpp/macros.hpp | 4 +- rclcpp/include/rclcpp/mapped_ring_buffer.hpp | 7 ++++ .../strategies/allocator_memory_strategy.hpp | 2 + rclcpp/include/rclcpp/subscription.hpp | 16 ++++---- rclcpp/src/rclcpp/executor.cpp | 34 ++++++++--------- .../executors/multi_threaded_executor.cpp | 9 +---- 10 files changed, 72 insertions(+), 54 deletions(-) diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp index 0e657ee468..5dddf59db9 100644 --- a/rclcpp/include/rclcpp/any_subscription_callback.hpp +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -154,7 +154,7 @@ class AnySubscriptionCallback } void dispatch( - std::shared_ptr message, const rmw_message_info_t & message_info) + std::shared_ptr message, const rmw_message_info_t & message_info) const { (void)message_info; if (shared_ptr_callback_) { @@ -179,7 +179,7 @@ class AnySubscriptionCallback } void dispatch_intra_process( - MessageUniquePtr & message, const rmw_message_info_t & message_info) + MessageUniquePtr & message, const rmw_message_info_t & message_info) const { (void)message_info; if (shared_ptr_callback_) { diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 1e6bd95cfc..b54c5136d5 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -207,21 +207,21 @@ class Executor */ RCLCPP_PUBLIC void - execute_any_executable(AnyExecutable::SharedPtr any_exec); + execute_any_executable(AnyExecutable::ConstSharedPtr any_exec) const; RCLCPP_PUBLIC static void execute_subscription( - rclcpp::subscription::SubscriptionBase::SharedPtr subscription); + rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription); RCLCPP_PUBLIC static void execute_intra_process_subscription( - rclcpp::subscription::SubscriptionBase::SharedPtr subscription); + rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription); RCLCPP_PUBLIC static void - execute_timer(rclcpp::timer::TimerBase::SharedPtr timer); + execute_timer(rclcpp::timer::TimerBase::ConstSharedPtr timer); RCLCPP_PUBLIC static void @@ -263,7 +263,7 @@ class Executor std::atomic_bool spinning; /// Guard condition for signaling the rmw layer to wake up for special events. - rmw_guard_condition_t * interrupt_guard_condition_; + std::atomic interrupt_guard_condition_; /// The memory strategy: an interface for handling user-defined memory allocation strategies. memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index 96c682dc88..b13bc39004 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -239,6 +239,7 @@ class IntraProcessManager uint64_t intra_process_publisher_id, std::unique_ptr & message) { + std::lock_guard lock(mutex_); using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer; uint64_t message_seq = 0; @@ -303,6 +304,7 @@ class IntraProcessManager uint64_t requesting_subscriptions_intra_process_id, std::unique_ptr & message) { + std::lock_guard lock(mutex_); using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; using TypedMRB = mapped_ring_buffer::MappedRingBuffer; message = nullptr; @@ -339,6 +341,7 @@ class IntraProcessManager get_next_unique_id(); IntraProcessManagerStateBase::SharedPtr state_; + std::mutex mutex_; }; } // namespace intra_process_manager diff --git a/rclcpp/include/rclcpp/intra_process_manager_state.hpp b/rclcpp/include/rclcpp/intra_process_manager_state.hpp index fddaca4c15..c2e0e83b7a 100644 --- a/rclcpp/include/rclcpp/intra_process_manager_state.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager_state.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -102,6 +103,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase // Iterate over all publisher infos and all stored subscription id's and // remove references to this subscription's id. for (auto & publisher_pair : publishers_) { + std::lock_guard lock(publisher_pair.second.target_subscriptions_mutex_); for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) { sub_pair.second.erase(intra_process_subscription_id); } @@ -121,7 +123,10 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase publishers_[id].sequence_number.store(0); publishers_[id].buffer = mrb; - publishers_[id].target_subscriptions_by_message_sequence.reserve(size); + { + std::lock_guard lock(publishers_[id].target_subscriptions_mutex_); + publishers_[id].target_subscriptions_by_message_sequence.reserve(size); + } } void @@ -136,6 +141,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase uint64_t intra_process_publisher_id, uint64_t & message_seq) { + std::lock_guard lock(mutex_); auto it = publishers_.find(intra_process_publisher_id); if (it == publishers_.end()) { throw std::runtime_error("store_intra_process_message called with invalid publisher id"); @@ -150,6 +156,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) { + std::lock_guard lock(mutex_); auto it = publishers_.find(intra_process_publisher_id); if (it == publishers_.end()) { throw std::runtime_error("store_intra_process_message called with invalid publisher id"); @@ -163,16 +170,19 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase // Figure out what subscriptions should receive the message. auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()]; // Store the list for later comparison. - info.target_subscriptions_by_message_sequence[message_seq].clear(); - std::copy( - destined_subscriptions.begin(), destined_subscriptions.end(), - // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq] - std::inserter( - info.target_subscriptions_by_message_sequence[message_seq], - // This ends up only being a hint to std::set, could also be .begin(). - info.target_subscriptions_by_message_sequence[message_seq].end() - ) - ); + { + std::lock_guard lock(info.target_subscriptions_mutex_); + info.target_subscriptions_by_message_sequence[message_seq].clear(); + std::copy( + destined_subscriptions.begin(), destined_subscriptions.end(), + // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq] + std::inserter( + info.target_subscriptions_by_message_sequence[message_seq], + // This ends up only being a hint to std::set, could also be .begin(). + info.target_subscriptions_by_message_sequence[message_seq].end() + ) + ); + } } mapped_ring_buffer::MappedRingBufferBase::SharedPtr @@ -182,6 +192,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase size_t & size ) { + std::lock_guard lock(mutex_); PublisherInfo * info; { auto it = publishers_.find(intra_process_publisher_id); @@ -194,6 +205,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase // Figure out how many subscriptions are left. AllocSet * target_subs; { + std::lock_guard lock(info->target_subscriptions_mutex_); auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number); if (it == info->target_subscriptions_by_message_sequence.end()) { // Message is no longer being stored by this publisher. @@ -256,6 +268,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase std::hash, std::equal_to, RebindAlloc>>; TargetSubscriptionsMap target_subscriptions_by_message_sequence; + std::mutex target_subscriptions_mutex_; }; using PublisherMap = std::unordered_map>>; PublisherMap publishers_; + + std::mutex mutex_; }; RCLCPP_PUBLIC diff --git a/rclcpp/include/rclcpp/macros.hpp b/rclcpp/include/rclcpp/macros.hpp index b6cc88853a..6fa09972ed 100644 --- a/rclcpp/include/rclcpp/macros.hpp +++ b/rclcpp/include/rclcpp/macros.hpp @@ -47,7 +47,9 @@ RCLCPP_WEAK_PTR_DEFINITIONS(__VA_ARGS__) \ __RCLCPP_UNIQUE_PTR_ALIAS(__VA_ARGS__) -#define __RCLCPP_SHARED_PTR_ALIAS(...) using SharedPtr = std::shared_ptr<__VA_ARGS__>; +#define __RCLCPP_SHARED_PTR_ALIAS(...)\ + using SharedPtr = std::shared_ptr<__VA_ARGS__>; \ + using ConstSharedPtr = std::shared_ptr; #define __RCLCPP_MAKE_SHARED_DEFINITION(...) \ template \ diff --git a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp index ce66ceb91e..48b792d19a 100644 --- a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp +++ b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include "rclcpp/allocator/allocator_common.hpp" @@ -97,6 +98,7 @@ class MappedRingBuffer : public MappedRingBufferBase void get_copy_at_key(uint64_t key, ElemUniquePtr & value) { + std::lock_guard lock(mutex_); auto it = get_iterator_of_key(key); value = nullptr; if (it != elements_.end() && it->in_use) { @@ -128,6 +130,7 @@ class MappedRingBuffer : public MappedRingBufferBase void get_ownership_at_key(uint64_t key, ElemUniquePtr & value) { + std::lock_guard lock(mutex_); auto it = get_iterator_of_key(key); value = nullptr; if (it != elements_.end() && it->in_use) { @@ -155,6 +158,7 @@ class MappedRingBuffer : public MappedRingBufferBase void pop_at_key(uint64_t key, ElemUniquePtr & value) { + std::lock_guard lock(mutex_); auto it = get_iterator_of_key(key); value = nullptr; if (it != elements_.end() && it->in_use) { @@ -177,6 +181,7 @@ class MappedRingBuffer : public MappedRingBufferBase bool push_and_replace(uint64_t key, ElemUniquePtr & value) { + std::lock_guard lock(mutex_); bool did_replace = elements_[head_].in_use; elements_[head_].key = key; elements_[head_].value.swap(value); @@ -196,6 +201,7 @@ class MappedRingBuffer : public MappedRingBufferBase bool has_key(uint64_t key) { + std::lock_guard lock(mutex_); return elements_.end() != get_iterator_of_key(key); } @@ -225,6 +231,7 @@ class MappedRingBuffer : public MappedRingBufferBase std::vector elements_; size_t head_; std::shared_ptr allocator_; + std::mutex mutex_; }; } // namespace mapped_ring_buffer diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index 6d14acae03..fb42da33ba 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_ #define RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_ +#include #include #include @@ -173,6 +174,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy get_next_subscription(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) { + assert(any_exec); auto it = subscriber_handles_.begin(); while (it != subscriber_handles_.end()) { auto subscription = get_subscription_by_handle(*it, weak_nodes); diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 0044b205c3..a15e3f1169 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -82,23 +82,23 @@ class SubscriptionBase /// Borrow a new message. // \return Shared pointer to the fresh message. virtual std::shared_ptr - create_message() = 0; + create_message() const = 0; /// Check if we need to handle the message, and execute the callback if we do. /** * \param[in] message Shared pointer to the message to handle. * \param[in] message_info Metadata associated with this message. */ virtual void - handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) = 0; + handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) const = 0; /// Return the message borrowed in create_message. // \param[in] Shared pointer to the returned message. virtual void - return_message(std::shared_ptr & message) = 0; + return_message(std::shared_ptr & message) const = 0; virtual void handle_intra_process_message( rcl_interfaces::msg::IntraProcessMessage & ipm, - const rmw_message_info_t & message_info) = 0; + const rmw_message_info_t & message_info) const = 0; protected: rmw_subscription_t * intra_process_subscription_handle_; @@ -168,7 +168,7 @@ class Subscription : public SubscriptionBase { message_memory_strategy_ = message_memory_strategy; } - std::shared_ptr create_message() + std::shared_ptr create_message() const { /* The default message memory strategy provides a dynamically allocated message on each call to * create_message, though alternative memory strategies that re-use a preallocated message may be @@ -177,7 +177,7 @@ class Subscription : public SubscriptionBase return message_memory_strategy_->borrow_message(); } - void handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) + void handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) const { if (matches_any_intra_process_publishers_) { if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) { @@ -190,7 +190,7 @@ class Subscription : public SubscriptionBase any_callback_.dispatch(typed_message, message_info); } - void return_message(std::shared_ptr & message) + void return_message(std::shared_ptr & message) const { auto typed_message = std::static_pointer_cast(message); message_memory_strategy_->return_message(typed_message); @@ -198,7 +198,7 @@ class Subscription : public SubscriptionBase void handle_intra_process_message( rcl_interfaces::msg::IntraProcessMessage & ipm, - const rmw_message_info_t & message_info) + const rmw_message_info_t & message_info) const { if (!get_intra_process_message_callback_) { // throw std::runtime_error( diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 7a39c0b9ab..de1d9ddf1f 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -31,7 +31,7 @@ Executor::~Executor() { // Try to deallocate the interrupt guard condition. if (interrupt_guard_condition_ != nullptr) { - rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_); + rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_.load()); if (status != RMW_RET_OK) { fprintf(stderr, "[rclcpp::error] failed to destroy guard condition: %s\n", rmw_get_error_string_safe()); @@ -53,7 +53,7 @@ Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify) weak_nodes_.push_back(node_ptr); if (notify) { // Interrupt waiting to handle new node - rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); + rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_.load()); if (status != RMW_RET_OK) { throw std::runtime_error(rmw_get_error_string_safe()); } @@ -80,7 +80,7 @@ Executor::remove_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify) if (notify) { // If the node was matched and removed, interrupt waiting if (node_removed) { - rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); + rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_.load()); if (status != RMW_RET_OK) { throw std::runtime_error(rmw_get_error_string_safe()); } @@ -137,7 +137,7 @@ void Executor::cancel() { spinning.store(false); - rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); + rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_.load()); if (status != RMW_RET_OK) { throw std::runtime_error(rmw_get_error_string_safe()); } @@ -153,31 +153,27 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr } void -Executor::execute_any_executable(AnyExecutable::SharedPtr any_exec) +Executor::execute_any_executable(AnyExecutable::ConstSharedPtr any_exec) const { if (!any_exec || !spinning.load()) { return; } if (any_exec->timer) { execute_timer(any_exec->timer); - } - if (any_exec->subscription) { + } else if (any_exec->subscription) { execute_subscription(any_exec->subscription); - } - if (any_exec->subscription_intra_process) { + } else if (any_exec->subscription_intra_process) { execute_intra_process_subscription(any_exec->subscription_intra_process); - } - if (any_exec->service) { + } else if (any_exec->service) { execute_service(any_exec->service); - } - if (any_exec->client) { + } else if (any_exec->client) { execute_client(any_exec->client); } // Reset the callback_group, regardless of type any_exec->callback_group->can_be_taken_from().store(true); // Wake the wait, because it may need to be recalculated or work that // was previously blocked is now available. - rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); + rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_.load()); if (status != RMW_RET_OK) { throw std::runtime_error(rmw_get_error_string_safe()); } @@ -185,7 +181,7 @@ Executor::execute_any_executable(AnyExecutable::SharedPtr any_exec) void Executor::execute_subscription( - rclcpp::subscription::SubscriptionBase::SharedPtr subscription) + rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription) { std::shared_ptr message = subscription->create_message(); bool taken = false; @@ -208,7 +204,7 @@ Executor::execute_subscription( void Executor::execute_intra_process_subscription( - rclcpp::subscription::SubscriptionBase::SharedPtr subscription) + rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription) { rcl_interfaces::msg::IntraProcessMessage ipm; bool taken = false; @@ -232,7 +228,7 @@ Executor::execute_intra_process_subscription( void Executor::execute_timer( - rclcpp::timer::TimerBase::SharedPtr timer) + rclcpp::timer::TimerBase::ConstSharedPtr timer) { timer->execute_callback(); } @@ -337,7 +333,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) rclcpp::utilities::get_global_sigint_guard_condition()->data; // Put the executor's guard condition in guard_condition_handles.guard_conditions[1] = \ - interrupt_guard_condition_->data; + interrupt_guard_condition_.load()->data; rmw_time_t * wait_timeout = NULL; rmw_time_t rmw_timeout; @@ -537,7 +533,7 @@ Executor::get_next_executable(std::chrono::nanoseconds timeout) callback_group::CallbackGroupType::MutuallyExclusive) { // It should not have been taken otherwise - assert(any_exec->callback_group->can_be_taken_from().load()); + //assert(any_exec->callback_group->can_be_taken_from().load()); // Set to false to indicate something is being run from this group // This is reset to true either when the any_exec is executed or when the // any_exec is destructued diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 28665fec32..5198803be9 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -47,8 +47,7 @@ MultiThreadedExecutor::spin() std::vector threads; { std::lock_guard wait_lock(wait_mutex_); - for(size_t i = 0; i < number_of_threads_; ++i) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + for (size_t i = 0; i < number_of_threads_; ++i) { auto func = std::bind(&MultiThreadedExecutor::run, this, i); threads.emplace_back(func); } @@ -75,15 +74,9 @@ MultiThreadedExecutor::run(size_t this_thread_number) if (!rclcpp::utilities::ok() || !spinning.load()) { return; } - assert(this_thread_number < thread_executables.size()); thread_executables[this_thread_number] = get_next_executable(); } execute_any_executable(thread_executables[this_thread_number]); - - { - std::lock_guard wait_lock(wait_mutex_); - thread_executables[this_thread_number] = nullptr; - } } } From 2ea80daaba20be514b8ac858af1cb95f67e96ea2 Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Tue, 17 Nov 2015 16:39:58 -0800 Subject: [PATCH 4/8] messy state --- rclcpp/include/rclcpp/any_executable.hpp | 36 ++++-- rclcpp/include/rclcpp/executor.hpp | 14 ++- .../executors/multi_threaded_executor.hpp | 1 - .../include/rclcpp/intra_process_manager.hpp | 2 +- rclcpp/include/rclcpp/memory_strategy.hpp | 8 +- .../strategies/allocator_memory_strategy.hpp | 38 +++--- rclcpp/src/rclcpp/any_executable.cpp | 108 ++++++++++++++++-- rclcpp/src/rclcpp/executor.cpp | 85 ++++++++------ .../executors/multi_threaded_executor.cpp | 26 ++++- rclcpp/src/rclcpp/intra_process_manager.cpp | 3 +- rclcpp/src/rclcpp/memory_strategy.cpp | 4 +- 11 files changed, 234 insertions(+), 91 deletions(-) diff --git a/rclcpp/include/rclcpp/any_executable.hpp b/rclcpp/include/rclcpp/any_executable.hpp index 656b2c7d34..429bfb8334 100644 --- a/rclcpp/include/rclcpp/any_executable.hpp +++ b/rclcpp/include/rclcpp/any_executable.hpp @@ -16,6 +16,7 @@ #define RCLCPP__ANY_EXECUTABLE_HPP_ #include +#include #include "rclcpp/macros.hpp" #include "rclcpp/node.hpp" @@ -36,15 +37,36 @@ struct AnyExecutable RCLCPP_PUBLIC virtual ~AnyExecutable(); + bool is_one_field_set() const; + + rclcpp::subscription::SubscriptionBase::SharedPtr get_subscription() const; + rclcpp::subscription::SubscriptionBase::SharedPtr get_subscription_intra_process() const; + rclcpp::timer::TimerBase::SharedPtr get_timer() const; + rclcpp::service::ServiceBase::SharedPtr get_service() const; + rclcpp::client::ClientBase::SharedPtr get_client() const; + // These are used to keep the scope on the containing items + rclcpp::callback_group::CallbackGroup::SharedPtr get_callback_group() const; + rclcpp::node::Node::SharedPtr get_node() const; + + void set_subscription(const rclcpp::subscription::SubscriptionBase::SharedPtr subscription); + void set_subscription_intra_process(const rclcpp::subscription::SubscriptionBase::SharedPtr subscription); + void set_timer(const rclcpp::timer::TimerBase::SharedPtr timer); + void set_service(const rclcpp::service::ServiceBase::SharedPtr service); + void set_client(const rclcpp::client::ClientBase::SharedPtr client); + // These are used to keep the scope on the containing items + void set_callback_group(const rclcpp::callback_group::CallbackGroup::SharedPtr callback_group); + void set_node(const rclcpp::node::Node::SharedPtr node); + +private: // Only one of the following pointers will be set. - rclcpp::subscription::SubscriptionBase::SharedPtr subscription; - rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process; - rclcpp::timer::TimerBase::SharedPtr timer; - rclcpp::service::ServiceBase::SharedPtr service; - rclcpp::client::ClientBase::SharedPtr client; + rclcpp::subscription::SubscriptionBase::SharedPtr subscription_; + rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process_; + rclcpp::timer::TimerBase::SharedPtr timer_; + rclcpp::service::ServiceBase::SharedPtr service_; + rclcpp::client::ClientBase::SharedPtr client_; // These are used to keep the scope on the containing items - rclcpp::callback_group::CallbackGroup::SharedPtr callback_group; - rclcpp::node::Node::SharedPtr node; + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group_; + rclcpp::node::Node::SharedPtr node_; }; } // namespace executor diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index b54c5136d5..924c2b0eec 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "rclcpp/any_executable.hpp" @@ -207,21 +208,21 @@ class Executor */ RCLCPP_PUBLIC void - execute_any_executable(AnyExecutable::ConstSharedPtr any_exec) const; + execute_any_executable(const AnyExecutable::ConstSharedPtr any_exec); RCLCPP_PUBLIC static void execute_subscription( - rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription); + const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription); RCLCPP_PUBLIC static void execute_intra_process_subscription( - rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription); + const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription); RCLCPP_PUBLIC static void - execute_timer(rclcpp::timer::TimerBase::ConstSharedPtr timer); + execute_timer(const rclcpp::timer::TimerBase::ConstSharedPtr timer); RCLCPP_PUBLIC static void @@ -244,7 +245,7 @@ class Executor get_group_by_timer(rclcpp::timer::TimerBase::SharedPtr timer); RCLCPP_PUBLIC - void + bool get_next_timer(AnyExecutable::SharedPtr any_exec); RCLCPP_PUBLIC @@ -263,7 +264,8 @@ class Executor std::atomic_bool spinning; /// Guard condition for signaling the rmw layer to wake up for special events. - std::atomic interrupt_guard_condition_; + rmw_guard_condition_t * interrupt_guard_condition_; + std::mutex trigger_guard_condition_mutex_; /// The memory strategy: an interface for handling user-defined memory allocation strategies. memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index cd5cbfb1e3..30ca1b045d 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -62,7 +62,6 @@ class MultiThreadedExecutor : public executor::Executor std::mutex wait_mutex_; size_t number_of_threads_; std::unordered_map thread_number_by_thread_id_; - std::vector thread_executables; }; } // namespace multi_threaded_executor diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index b13bc39004..070bb87e4e 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -333,7 +333,7 @@ class IntraProcessManager /// Return true if the given rmw_gid_t matches any stored Publishers. RCLCPP_PUBLIC bool - matches_any_publishers(const rmw_gid_t * id) const; + matches_any_publishers(const rmw_gid_t * id); private: RCLCPP_PUBLIC diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index e363914b60..4bed157f93 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -59,15 +59,15 @@ class RCLCPP_PUBLIC MemoryStrategy // \return Shared pointer to the fresh executable. virtual rclcpp::executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0; - virtual void + virtual bool get_next_subscription(rclcpp::executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) = 0; - virtual void + virtual bool get_next_service(rclcpp::executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) = 0; - virtual void + virtual bool get_next_client(rclcpp::executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) = 0; @@ -87,7 +87,7 @@ class RCLCPP_PUBLIC MemoryStrategy static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_subscription( - rclcpp::subscription::SubscriptionBase::SharedPtr subscription, + const rclcpp::subscription::SubscriptionBase::SharedPtr subscription, const WeakNodeVector & weak_nodes); static rclcpp::callback_group::CallbackGroup::SharedPtr diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index fb42da33ba..d808887828 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -170,14 +170,13 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy return std::allocate_shared(*executable_allocator_.get()); } - virtual void + virtual bool get_next_subscription(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) { - assert(any_exec); auto it = subscriber_handles_.begin(); while (it != subscriber_handles_.end()) { - auto subscription = get_subscription_by_handle(*it, weak_nodes); + const subscription::SubscriptionBase::SharedPtr subscription = get_subscription_by_handle(*it, weak_nodes); if (subscription) { // Figure out if this is for intra-process or not. bool is_intra_process = false; @@ -200,21 +199,22 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } // Otherwise it is safe to set and return the any_exec if (is_intra_process) { - any_exec->subscription_intra_process = subscription; + any_exec->set_subscription_intra_process(subscription); } else { - any_exec->subscription = subscription; + any_exec->set_subscription(subscription); } - any_exec->callback_group = group; - any_exec->node = get_node_by_group(group, weak_nodes); + any_exec->set_callback_group(group); + any_exec->set_node(get_node_by_group(group, weak_nodes)); subscriber_handles_.erase(it); - return; + return true; } // Else, the subscription is no longer valid, remove it and continue subscriber_handles_.erase(it); } + return false; } - virtual void + virtual bool get_next_service(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) { @@ -237,18 +237,19 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy continue; } // Otherwise it is safe to set and return the any_exec - any_exec->service = service; - any_exec->callback_group = group; - any_exec->node = get_node_by_group(group, weak_nodes); + any_exec->set_service(service); + any_exec->set_callback_group(group); + any_exec->set_node(get_node_by_group(group, weak_nodes)); service_handles_.erase(it); - return; + return true; } // Else, the service is no longer valid, remove it and continue service_handles_.erase(it); } + return false; } - virtual void + virtual bool get_next_client(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) { auto it = client_handles_.begin(); @@ -270,15 +271,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy continue; } // Otherwise it is safe to set and return the any_exec - any_exec->client = client; - any_exec->callback_group = group; - any_exec->node = get_node_by_group(group, weak_nodes); + any_exec->set_client(client); + any_exec->set_callback_group(group); + any_exec->set_node(get_node_by_group(group, weak_nodes)); client_handles_.erase(it); - return; + return true; } // Else, the service is no longer valid, remove it and continue client_handles_.erase(it); } + return false; } private: diff --git a/rclcpp/src/rclcpp/any_executable.cpp b/rclcpp/src/rclcpp/any_executable.cpp index 0cef731053..bc328c95a7 100644 --- a/rclcpp/src/rclcpp/any_executable.cpp +++ b/rclcpp/src/rclcpp/any_executable.cpp @@ -14,16 +14,18 @@ #include "rclcpp/any_executable.hpp" +#include + using rclcpp::executor::AnyExecutable; -AnyExecutable::AnyExecutable() -: subscription(nullptr), - subscription_intra_process(nullptr), - timer(nullptr), - service(nullptr), - client(nullptr), - callback_group(nullptr), - node(nullptr) +AnyExecutable::AnyExecutable() : + subscription_(nullptr), + subscription_intra_process_(nullptr), + timer_(nullptr), + service_(nullptr), + client_(nullptr), + callback_group_(nullptr), + node_(nullptr) {} AnyExecutable::~AnyExecutable() @@ -31,7 +33,93 @@ AnyExecutable::~AnyExecutable() // Make sure that discarded (taken but not executed) AnyExecutable's have // their callback groups reset. This can happen when an executor is canceled // between taking an AnyExecutable and executing it. - if (callback_group) { - callback_group->can_be_taken_from().store(true); + if (callback_group_) { + callback_group_->can_be_taken_from().store(true); + } +} + +bool AnyExecutable::is_one_field_set() const { + size_t fields_set = 0; + if (timer_) { + ++fields_set; + } + if (subscription_) { + ++fields_set; } + if (subscription_intra_process_) { + ++fields_set; + } + if (service_) { + ++fields_set; + } + if (client_) { + ++fields_set; + } + return fields_set == 1; +} + +rclcpp::subscription::SubscriptionBase::SharedPtr AnyExecutable::get_subscription() const { + return subscription_; +} + +rclcpp::subscription::SubscriptionBase::SharedPtr +AnyExecutable::get_subscription_intra_process() const { + return subscription_intra_process_; +} + +rclcpp::timer::TimerBase::SharedPtr AnyExecutable::get_timer() const { + return timer_; +} + +rclcpp::service::ServiceBase::SharedPtr AnyExecutable::get_service() const { + return service_; +} + +rclcpp::client::ClientBase::SharedPtr AnyExecutable::get_client() const { + return client_; +} + +rclcpp::callback_group::CallbackGroup::SharedPtr AnyExecutable::get_callback_group() const { + return callback_group_; +} + +rclcpp::node::Node::SharedPtr AnyExecutable::get_node() const { + return node_; +} + +void +AnyExecutable::set_subscription( + const rclcpp::subscription::SubscriptionBase::SharedPtr subscription) +{ + subscription_ = subscription; +} + +void +AnyExecutable::set_subscription_intra_process( + const rclcpp::subscription::SubscriptionBase::SharedPtr subscription) +{ + subscription_intra_process_ = subscription; +} + +void AnyExecutable::set_timer(const rclcpp::timer::TimerBase::SharedPtr timer) { + timer_ = timer; +} + +void AnyExecutable::set_service(const rclcpp::service::ServiceBase::SharedPtr service) { + service_ = service; +} + +void AnyExecutable::set_client(const rclcpp::client::ClientBase::SharedPtr client) { + client_ = client; +} + +void +AnyExecutable::set_callback_group( + const rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) +{ + callback_group_ = callback_group; +} + +void AnyExecutable::set_node(const rclcpp::node::Node::SharedPtr node) { + node_ = node; } diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index de1d9ddf1f..7bc5819785 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -31,7 +31,8 @@ Executor::~Executor() { // Try to deallocate the interrupt guard condition. if (interrupt_guard_condition_ != nullptr) { - rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_.load()); + std::lock_guard lock(trigger_guard_condition_mutex_); + rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_); if (status != RMW_RET_OK) { fprintf(stderr, "[rclcpp::error] failed to destroy guard condition: %s\n", rmw_get_error_string_safe()); @@ -52,8 +53,9 @@ Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify) } weak_nodes_.push_back(node_ptr); if (notify) { + std::lock_guard lock(trigger_guard_condition_mutex_); // Interrupt waiting to handle new node - rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_.load()); + rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); if (status != RMW_RET_OK) { throw std::runtime_error(rmw_get_error_string_safe()); } @@ -80,7 +82,8 @@ Executor::remove_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify) if (notify) { // If the node was matched and removed, interrupt waiting if (node_removed) { - rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_.load()); + std::lock_guard lock(trigger_guard_condition_mutex_); + rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); if (status != RMW_RET_OK) { throw std::runtime_error(rmw_get_error_string_safe()); } @@ -137,7 +140,8 @@ void Executor::cancel() { spinning.store(false); - rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_.load()); + std::lock_guard lock(trigger_guard_condition_mutex_); + rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); if (status != RMW_RET_OK) { throw std::runtime_error(rmw_get_error_string_safe()); } @@ -153,35 +157,43 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr } void -Executor::execute_any_executable(AnyExecutable::ConstSharedPtr any_exec) const +Executor::execute_any_executable(const AnyExecutable::ConstSharedPtr any_exec) { if (!any_exec || !spinning.load()) { return; } - if (any_exec->timer) { - execute_timer(any_exec->timer); - } else if (any_exec->subscription) { - execute_subscription(any_exec->subscription); - } else if (any_exec->subscription_intra_process) { - execute_intra_process_subscription(any_exec->subscription_intra_process); - } else if (any_exec->service) { - execute_service(any_exec->service); - } else if (any_exec->client) { - execute_client(any_exec->client); + + if (any_exec->get_callback_group() && any_exec->get_callback_group()->can_be_taken_from().load()) { + return; + } + + if (timer::TimerBase::ConstSharedPtr timer = any_exec->get_timer()) { + execute_timer(timer); + } else if (subscription::SubscriptionBase::ConstSharedPtr subscription = any_exec->get_subscription()) { + execute_subscription(subscription); + } else if (subscription::SubscriptionBase::ConstSharedPtr subscription_intra_process = any_exec->get_subscription_intra_process()) { + execute_intra_process_subscription(subscription_intra_process); + } else if (any_exec->get_service()) { + execute_service(any_exec->get_service()); + } else if (any_exec->get_client()) { + execute_client(any_exec->get_client()); } // Reset the callback_group, regardless of type - any_exec->callback_group->can_be_taken_from().store(true); + any_exec->get_callback_group()->can_be_taken_from().store(true); // Wake the wait, because it may need to be recalculated or work that // was previously blocked is now available. - rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_.load()); - if (status != RMW_RET_OK) { - throw std::runtime_error(rmw_get_error_string_safe()); + { + std::lock_guard lock(trigger_guard_condition_mutex_); + rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); + if (status != RMW_RET_OK) { + throw std::runtime_error(rmw_get_error_string_safe()); + } } } void Executor::execute_subscription( - rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription) + const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription) { std::shared_ptr message = subscription->create_message(); bool taken = false; @@ -204,7 +216,7 @@ Executor::execute_subscription( void Executor::execute_intra_process_subscription( - rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription) + const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription) { rcl_interfaces::msg::IntraProcessMessage ipm; bool taken = false; @@ -228,7 +240,7 @@ Executor::execute_intra_process_subscription( void Executor::execute_timer( - rclcpp::timer::TimerBase::ConstSharedPtr timer) + const rclcpp::timer::TimerBase::ConstSharedPtr timer) { timer->execute_callback(); } @@ -333,7 +345,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) rclcpp::utilities::get_global_sigint_guard_condition()->data; // Put the executor's guard condition in guard_condition_handles.guard_conditions[1] = \ - interrupt_guard_condition_.load()->data; + interrupt_guard_condition_->data; rmw_time_t * wait_timeout = NULL; rmw_time_t rmw_timeout; @@ -423,7 +435,7 @@ Executor::get_group_by_timer(rclcpp::timer::TimerBase::SharedPtr timer) return rclcpp::callback_group::CallbackGroup::SharedPtr(); } -void +bool Executor::get_next_timer(AnyExecutable::SharedPtr any_exec) { for (auto & weak_node : weak_nodes_) { @@ -439,14 +451,15 @@ Executor::get_next_timer(AnyExecutable::SharedPtr any_exec) for (auto & timer_ref : group->get_timer_ptrs()) { auto timer = timer_ref.lock(); if (timer && timer->check_and_trigger()) { - any_exec->timer = timer; - any_exec->callback_group = group; - node = get_node_by_group(group); - return; + any_exec->set_timer(timer); + any_exec->set_callback_group(group); + any_exec->set_node(get_node_by_group(group)); + return true; } } } } + return false; } std::chrono::nanoseconds @@ -485,23 +498,19 @@ Executor::get_next_ready_executable() { auto any_exec = memory_strategy_->instantiate_next_executable(); // Check the timers to see if there are any that are ready, if so return - get_next_timer(any_exec); - if (any_exec->timer) { + if (get_next_timer(any_exec)) { return any_exec; } // Check the subscriptions to see if there are any that are ready - memory_strategy_->get_next_subscription(any_exec, weak_nodes_); - if (any_exec->subscription || any_exec->subscription_intra_process) { + if (memory_strategy_->get_next_subscription(any_exec, weak_nodes_)) { return any_exec; } // Check the services to see if there are any that are ready - memory_strategy_->get_next_service(any_exec, weak_nodes_); - if (any_exec->service) { + if (memory_strategy_->get_next_service(any_exec, weak_nodes_)) { return any_exec; } // Check the clients to see if there are any that are ready - memory_strategy_->get_next_client(any_exec, weak_nodes_); - if (any_exec->client) { + if (memory_strategy_->get_next_client(any_exec, weak_nodes_)) { return any_exec; } // If there is no ready executable, return a null ptr @@ -529,7 +538,7 @@ Executor::get_next_executable(std::chrono::nanoseconds timeout) if (any_exec) { // If it is valid, check to see if the group is mutually exclusive or // not, then mark it accordingly - if (any_exec->callback_group && any_exec->callback_group->type() == \ + if (any_exec->get_callback_group() && any_exec->get_callback_group()->type() == \ callback_group::CallbackGroupType::MutuallyExclusive) { // It should not have been taken otherwise @@ -537,7 +546,7 @@ Executor::get_next_executable(std::chrono::nanoseconds timeout) // Set to false to indicate something is being run from this group // This is reset to true either when the any_exec is executed or when the // any_exec is destructued - any_exec->callback_group->can_be_taken_from().store(false); + any_exec->get_callback_group()->can_be_taken_from().store(false); } } return any_exec; diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 5198803be9..3f2ba63ff5 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -32,7 +32,7 @@ MultiThreadedExecutor::MultiThreadedExecutor(rclcpp::memory_strategy::MemoryStra if (number_of_threads_ == 0) { number_of_threads_ = 1; } - thread_executables.resize(number_of_threads_, nullptr); + //thread_executables.resize(number_of_threads_, nullptr); } MultiThreadedExecutor::~MultiThreadedExecutor() {} @@ -74,9 +74,29 @@ MultiThreadedExecutor::run(size_t this_thread_number) if (!rclcpp::utilities::ok() || !spinning.load()) { return; } - thread_executables[this_thread_number] = get_next_executable(); + //printf("Create executable in thread %u\n", this_thread_number); + //fflush(stdout); + any_exec = get_next_executable(); + /* + if (any_exec) { + assert(any_exec->is_one_field_set()); + } + */ } - execute_any_executable(thread_executables[this_thread_number]); + // Argh. + execute_any_executable(any_exec); + //printf("Execute executable in thread %u\n", this_thread_number); + //fflush(stdout); + + /* + if (any_exec) { + std::lock_guard wait_lock(wait_mutex_); + if (any_exec) { + assert(any_exec->is_one_field_set()); + } + any_exec.reset(); + } + */ } } diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index b919f4c10a..f977d58c14 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -51,8 +51,9 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id) } bool -IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const +IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) { + std::lock_guard lock(mutex_); return state_->matches_any_publishers(id); } diff --git a/rclcpp/src/rclcpp/memory_strategy.cpp b/rclcpp/src/rclcpp/memory_strategy.cpp index 28c4bfddd2..af47d808f2 100644 --- a/rclcpp/src/rclcpp/memory_strategy.cpp +++ b/rclcpp/src/rclcpp/memory_strategy.cpp @@ -118,7 +118,7 @@ MemoryStrategy::get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedP rclcpp::callback_group::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_subscription( - rclcpp::subscription::SubscriptionBase::SharedPtr subscription, + const rclcpp::subscription::SubscriptionBase::SharedPtr subscription, const WeakNodeVector & weak_nodes) { for (auto & weak_node : weak_nodes) { @@ -132,7 +132,7 @@ MemoryStrategy::get_group_by_subscription( continue; } for (auto & weak_sub : group->get_subscription_ptrs()) { - auto sub = weak_sub.lock(); + const rclcpp::subscription::SubscriptionBase::SharedPtr sub = weak_sub.lock(); if (sub == subscription) { return group; } From c3ada4059610c4bc82ba70a7caf1a15f0a7b18f7 Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Tue, 17 Nov 2015 17:20:42 -0800 Subject: [PATCH 5/8] small correction to exit condition in execute_any_executable --- rclcpp/src/rclcpp/executor.cpp | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 7bc5819785..5ff3ebec65 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -118,7 +118,7 @@ Executor::spin_some() } RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); AnyExecutable::SharedPtr any_exec; - while ((any_exec = get_next_executable(std::chrono::milliseconds::zero())) && spinning.load()) { + while ((any_exec = get_next_executable(std::chrono::milliseconds(0))) && spinning.load()) { execute_any_executable(any_exec); } } @@ -163,10 +163,6 @@ Executor::execute_any_executable(const AnyExecutable::ConstSharedPtr any_exec) return; } - if (any_exec->get_callback_group() && any_exec->get_callback_group()->can_be_taken_from().load()) { - return; - } - if (timer::TimerBase::ConstSharedPtr timer = any_exec->get_timer()) { execute_timer(timer); } else if (subscription::SubscriptionBase::ConstSharedPtr subscription = any_exec->get_subscription()) { From 35865cb4ba785abb51ec33f43fc78ff9bd23689b Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Wed, 18 Nov 2015 12:01:58 -0800 Subject: [PATCH 6/8] constness, atomic counter in is_one_field_set --- rclcpp/include/rclcpp/any_executable.hpp | 6 +++--- rclcpp/include/rclcpp/executor.hpp | 2 +- rclcpp/src/rclcpp/any_executable.cpp | 20 +++++++++---------- rclcpp/src/rclcpp/executor.cpp | 5 ++++- .../executors/multi_threaded_executor.cpp | 15 ++------------ 5 files changed, 20 insertions(+), 28 deletions(-) diff --git a/rclcpp/include/rclcpp/any_executable.hpp b/rclcpp/include/rclcpp/any_executable.hpp index 429bfb8334..71067d373e 100644 --- a/rclcpp/include/rclcpp/any_executable.hpp +++ b/rclcpp/include/rclcpp/any_executable.hpp @@ -39,9 +39,9 @@ struct AnyExecutable bool is_one_field_set() const; - rclcpp::subscription::SubscriptionBase::SharedPtr get_subscription() const; - rclcpp::subscription::SubscriptionBase::SharedPtr get_subscription_intra_process() const; - rclcpp::timer::TimerBase::SharedPtr get_timer() const; + rclcpp::subscription::SubscriptionBase::ConstSharedPtr get_subscription() const; + rclcpp::subscription::SubscriptionBase::ConstSharedPtr get_subscription_intra_process() const; + rclcpp::timer::TimerBase::ConstSharedPtr get_timer() const; rclcpp::service::ServiceBase::SharedPtr get_service() const; rclcpp::client::ClientBase::SharedPtr get_client() const; // These are used to keep the scope on the containing items diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 924c2b0eec..7b181f4ebe 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -208,7 +208,7 @@ class Executor */ RCLCPP_PUBLIC void - execute_any_executable(const AnyExecutable::ConstSharedPtr any_exec); + execute_any_executable(const AnyExecutable::ConstSharedPtr & any_exec); RCLCPP_PUBLIC static void diff --git a/rclcpp/src/rclcpp/any_executable.cpp b/rclcpp/src/rclcpp/any_executable.cpp index bc328c95a7..84fb86c650 100644 --- a/rclcpp/src/rclcpp/any_executable.cpp +++ b/rclcpp/src/rclcpp/any_executable.cpp @@ -39,35 +39,35 @@ AnyExecutable::~AnyExecutable() } bool AnyExecutable::is_one_field_set() const { - size_t fields_set = 0; + std::atomic fields_set(0); if (timer_) { - ++fields_set; + fields_set.fetch_add(1, std::memory_order_relaxed); } if (subscription_) { - ++fields_set; + fields_set.fetch_add(1, std::memory_order_relaxed); } if (subscription_intra_process_) { - ++fields_set; + fields_set.fetch_add(1, std::memory_order_relaxed); } if (service_) { - ++fields_set; + fields_set.fetch_add(1, std::memory_order_relaxed); } if (client_) { - ++fields_set; + fields_set.fetch_add(1, std::memory_order_relaxed); } - return fields_set == 1; + return fields_set <= 1; } -rclcpp::subscription::SubscriptionBase::SharedPtr AnyExecutable::get_subscription() const { +rclcpp::subscription::SubscriptionBase::ConstSharedPtr AnyExecutable::get_subscription() const { return subscription_; } -rclcpp::subscription::SubscriptionBase::SharedPtr +rclcpp::subscription::SubscriptionBase::ConstSharedPtr AnyExecutable::get_subscription_intra_process() const { return subscription_intra_process_; } -rclcpp::timer::TimerBase::SharedPtr AnyExecutable::get_timer() const { +rclcpp::timer::TimerBase::ConstSharedPtr AnyExecutable::get_timer() const { return timer_; } diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 5ff3ebec65..c9b05e6823 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -157,7 +157,7 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr } void -Executor::execute_any_executable(const AnyExecutable::ConstSharedPtr any_exec) +Executor::execute_any_executable(const AnyExecutable::ConstSharedPtr & any_exec) { if (!any_exec || !spinning.load()) { return; @@ -185,6 +185,7 @@ Executor::execute_any_executable(const AnyExecutable::ConstSharedPtr any_exec) throw std::runtime_error(rmw_get_error_string_safe()); } } + //assert(any_exec->is_one_field_set()); } void @@ -493,6 +494,8 @@ AnyExecutable::SharedPtr Executor::get_next_ready_executable() { auto any_exec = memory_strategy_->instantiate_next_executable(); + assert(any_exec->is_one_field_set()); + // Check the timers to see if there are any that are ready, if so return if (get_next_timer(any_exec)) { return any_exec; diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 3f2ba63ff5..bb21eca9b1 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -32,7 +32,6 @@ MultiThreadedExecutor::MultiThreadedExecutor(rclcpp::memory_strategy::MemoryStra if (number_of_threads_ == 0) { number_of_threads_ = 1; } - //thread_executables.resize(number_of_threads_, nullptr); } MultiThreadedExecutor::~MultiThreadedExecutor() {} @@ -68,35 +67,25 @@ MultiThreadedExecutor::run(size_t this_thread_number) { thread_number_by_thread_id_[std::this_thread::get_id()] = this_thread_number; while (rclcpp::utilities::ok() && spinning.load()) { - executor::AnyExecutable::SharedPtr any_exec; + executor::AnyExecutable::SharedPtr any_exec = nullptr; { std::lock_guard wait_lock(wait_mutex_); if (!rclcpp::utilities::ok() || !spinning.load()) { return; } - //printf("Create executable in thread %u\n", this_thread_number); - //fflush(stdout); any_exec = get_next_executable(); - /* if (any_exec) { assert(any_exec->is_one_field_set()); } - */ } // Argh. execute_any_executable(any_exec); - //printf("Execute executable in thread %u\n", this_thread_number); - //fflush(stdout); - /* if (any_exec) { std::lock_guard wait_lock(wait_mutex_); - if (any_exec) { - assert(any_exec->is_one_field_set()); - } + assert(any_exec->is_one_field_set()); any_exec.reset(); } - */ } } From 95a816fd70315d014165189e36e9b0a3f38e59d0 Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Wed, 18 Nov 2015 16:21:47 -0800 Subject: [PATCH 7/8] guard condition mutex --- rclcpp/include/rclcpp/any_executable.hpp | 12 +++--- rclcpp/include/rclcpp/executor.hpp | 2 +- rclcpp/include/rclcpp/memory_strategy.hpp | 4 +- .../strategies/allocator_memory_strategy.hpp | 15 +++++--- rclcpp/src/rclcpp/any_executable.cpp | 14 ++++--- rclcpp/src/rclcpp/executor.cpp | 38 ++++++++++--------- rclcpp/src/rclcpp/memory_strategy.cpp | 18 ++++----- 7 files changed, 57 insertions(+), 46 deletions(-) diff --git a/rclcpp/include/rclcpp/any_executable.hpp b/rclcpp/include/rclcpp/any_executable.hpp index 71067d373e..9d39b4765e 100644 --- a/rclcpp/include/rclcpp/any_executable.hpp +++ b/rclcpp/include/rclcpp/any_executable.hpp @@ -48,9 +48,9 @@ struct AnyExecutable rclcpp::callback_group::CallbackGroup::SharedPtr get_callback_group() const; rclcpp::node::Node::SharedPtr get_node() const; - void set_subscription(const rclcpp::subscription::SubscriptionBase::SharedPtr subscription); - void set_subscription_intra_process(const rclcpp::subscription::SubscriptionBase::SharedPtr subscription); - void set_timer(const rclcpp::timer::TimerBase::SharedPtr timer); + void set_subscription(const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription); + void set_subscription_intra_process(const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription); + void set_timer(const rclcpp::timer::TimerBase::ConstSharedPtr timer); void set_service(const rclcpp::service::ServiceBase::SharedPtr service); void set_client(const rclcpp::client::ClientBase::SharedPtr client); // These are used to keep the scope on the containing items @@ -59,9 +59,9 @@ struct AnyExecutable private: // Only one of the following pointers will be set. - rclcpp::subscription::SubscriptionBase::SharedPtr subscription_; - rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process_; - rclcpp::timer::TimerBase::SharedPtr timer_; + rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription_; + rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription_intra_process_; + rclcpp::timer::TimerBase::ConstSharedPtr timer_; rclcpp::service::ServiceBase::SharedPtr service_; rclcpp::client::ClientBase::SharedPtr client_; // These are used to keep the scope on the containing items diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 7b181f4ebe..924c2b0eec 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -208,7 +208,7 @@ class Executor */ RCLCPP_PUBLIC void - execute_any_executable(const AnyExecutable::ConstSharedPtr & any_exec); + execute_any_executable(const AnyExecutable::ConstSharedPtr any_exec); RCLCPP_PUBLIC static void diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index 4bed157f93..51149b8935 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -71,7 +71,7 @@ class RCLCPP_PUBLIC MemoryStrategy get_next_client(rclcpp::executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) = 0; - static rclcpp::subscription::SubscriptionBase::SharedPtr + static rclcpp::subscription::SubscriptionBase::ConstSharedPtr get_subscription_by_handle(void * subscriber_handle, const WeakNodeVector & weak_nodes); @@ -87,7 +87,7 @@ class RCLCPP_PUBLIC MemoryStrategy static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_subscription( - const rclcpp::subscription::SubscriptionBase::SharedPtr subscription, + const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription, const WeakNodeVector & weak_nodes); static rclcpp::callback_group::CallbackGroup::SharedPtr diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index d808887828..f06054fefd 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_ #define RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_ +#include #include #include #include @@ -65,7 +66,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy size_t fill_subscriber_handles(void ** & ptr) { - for (auto & subscription : subscriptions_) { + for (const auto subscription : subscriptions_) { subscriber_handles_.push_back(subscription->get_subscription_handle()->data); if (subscription->get_intra_process_subscription_handle()) { subscriber_handles_.push_back(subscription->get_intra_process_subscription_handle()->data); @@ -131,18 +132,18 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy { bool has_invalid_weak_nodes = false; for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); + const auto node = weak_node.lock(); if (!node) { has_invalid_weak_nodes = false; continue; } for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); + const auto group = weak_group.lock(); if (!group || !group->can_be_taken_from().load()) { continue; } for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); + const auto subscription = weak_subscription.lock(); if (subscription) { subscriptions_.push_back(subscription); } @@ -176,7 +177,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy { auto it = subscriber_handles_.begin(); while (it != subscriber_handles_.end()) { - const subscription::SubscriptionBase::SharedPtr subscription = get_subscription_by_handle(*it, weak_nodes); + const subscription::SubscriptionBase::ConstSharedPtr subscription = get_subscription_by_handle(*it, weak_nodes); if (subscription) { // Figure out if this is for intra-process or not. bool is_intra_process = false; @@ -184,7 +185,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy is_intra_process = subscription->get_intra_process_subscription_handle()->data == *it; } // Find the group for this handle and see if it can be serviced - auto group = get_group_by_subscription(subscription, weak_nodes); + const auto group = get_group_by_subscription(subscription, weak_nodes); if (!group) { // Group was not found, meaning the subscription is not valid... // Remove it from the ready list and continue looking @@ -298,6 +299,8 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy std::shared_ptr executable_allocator_; std::shared_ptr allocator_; + + std::mutex subscription_mutex_; }; } // namespace allocator_memory_strategy diff --git a/rclcpp/src/rclcpp/any_executable.cpp b/rclcpp/src/rclcpp/any_executable.cpp index 84fb86c650..646e67253e 100644 --- a/rclcpp/src/rclcpp/any_executable.cpp +++ b/rclcpp/src/rclcpp/any_executable.cpp @@ -42,10 +42,14 @@ bool AnyExecutable::is_one_field_set() const { std::atomic fields_set(0); if (timer_) { fields_set.fetch_add(1, std::memory_order_relaxed); - } + }/* else { + assert(timer_.use_count() == 0); + }*/ if (subscription_) { fields_set.fetch_add(1, std::memory_order_relaxed); - } + }/* else { + assert(subscription_.use_count() == 0); + }*/ if (subscription_intra_process_) { fields_set.fetch_add(1, std::memory_order_relaxed); } @@ -89,19 +93,19 @@ rclcpp::node::Node::SharedPtr AnyExecutable::get_node() const { void AnyExecutable::set_subscription( - const rclcpp::subscription::SubscriptionBase::SharedPtr subscription) + const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription) { subscription_ = subscription; } void AnyExecutable::set_subscription_intra_process( - const rclcpp::subscription::SubscriptionBase::SharedPtr subscription) + const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription) { subscription_intra_process_ = subscription; } -void AnyExecutable::set_timer(const rclcpp::timer::TimerBase::SharedPtr timer) { +void AnyExecutable::set_timer(const rclcpp::timer::TimerBase::ConstSharedPtr timer) { timer_ = timer; } diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index c9b05e6823..dca586ec11 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -157,17 +157,17 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr } void -Executor::execute_any_executable(const AnyExecutable::ConstSharedPtr & any_exec) +Executor::execute_any_executable(const AnyExecutable::ConstSharedPtr any_exec) { if (!any_exec || !spinning.load()) { return; } - if (timer::TimerBase::ConstSharedPtr timer = any_exec->get_timer()) { + if (const timer::TimerBase::ConstSharedPtr timer = any_exec->get_timer()) { execute_timer(timer); - } else if (subscription::SubscriptionBase::ConstSharedPtr subscription = any_exec->get_subscription()) { + } else if (const subscription::SubscriptionBase::ConstSharedPtr subscription = any_exec->get_subscription()) { execute_subscription(subscription); - } else if (subscription::SubscriptionBase::ConstSharedPtr subscription_intra_process = any_exec->get_subscription_intra_process()) { + } else if (const subscription::SubscriptionBase::ConstSharedPtr subscription_intra_process = any_exec->get_subscription_intra_process()) { execute_intra_process_subscription(subscription_intra_process); } else if (any_exec->get_service()) { execute_service(any_exec->get_service()); @@ -341,8 +341,12 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) guard_condition_handles.guard_conditions[0] = \ rclcpp::utilities::get_global_sigint_guard_condition()->data; // Put the executor's guard condition in - guard_condition_handles.guard_conditions[1] = \ - interrupt_guard_condition_->data; + + { + std::lock_guard lock(trigger_guard_condition_mutex_); + guard_condition_handles.guard_conditions[1] = \ + interrupt_guard_condition_->data; + } rmw_time_t * wait_timeout = NULL; rmw_time_t rmw_timeout; @@ -394,12 +398,12 @@ Executor::get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr gro return rclcpp::node::Node::SharedPtr(); } for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); + const auto node = weak_node.lock(); if (!node) { continue; } for (auto & weak_group : node->get_callback_groups()) { - auto callback_group = weak_group.lock(); + const auto callback_group = weak_group.lock(); if (callback_group == group) { return node; } @@ -412,17 +416,17 @@ rclcpp::callback_group::CallbackGroup::SharedPtr Executor::get_group_by_timer(rclcpp::timer::TimerBase::SharedPtr timer) { for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); + const auto node = weak_node.lock(); if (!node) { continue; } for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); + const auto group = weak_group.lock(); if (!group) { continue; } for (auto & weak_timer : group->get_timer_ptrs()) { - auto t = weak_timer.lock(); + const auto t = weak_timer.lock(); if (t == timer) { return group; } @@ -436,17 +440,17 @@ bool Executor::get_next_timer(AnyExecutable::SharedPtr any_exec) { for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); + const auto node = weak_node.lock(); if (!node) { continue; } for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); + const auto group = weak_group.lock(); if (!group || !group->can_be_taken_from().load()) { continue; } for (auto & timer_ref : group->get_timer_ptrs()) { - auto timer = timer_ref.lock(); + const auto timer = timer_ref.lock(); if (timer && timer->check_and_trigger()) { any_exec->set_timer(timer); any_exec->set_callback_group(group); @@ -465,19 +469,19 @@ Executor::get_earliest_timer() std::chrono::nanoseconds latest = std::chrono::nanoseconds::max(); bool timers_empty = true; for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); + const auto node = weak_node.lock(); if (!node) { continue; } for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); + const auto group = weak_group.lock(); if (!group || !group->can_be_taken_from().load()) { continue; } for (auto & timer_ref : group->get_timer_ptrs()) { timers_empty = false; // Check the expected trigger time - auto timer = timer_ref.lock(); + const auto timer = timer_ref.lock(); if (timer && timer->time_until_trigger() < latest) { latest = timer->time_until_trigger(); } diff --git a/rclcpp/src/rclcpp/memory_strategy.cpp b/rclcpp/src/rclcpp/memory_strategy.cpp index af47d808f2..0ed60e5f97 100644 --- a/rclcpp/src/rclcpp/memory_strategy.cpp +++ b/rclcpp/src/rclcpp/memory_strategy.cpp @@ -16,22 +16,22 @@ using rclcpp::memory_strategy::MemoryStrategy; -rclcpp::subscription::SubscriptionBase::SharedPtr +rclcpp::subscription::SubscriptionBase::ConstSharedPtr MemoryStrategy::get_subscription_by_handle( void * subscriber_handle, const WeakNodeVector & weak_nodes) { for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); + const auto node = weak_node.lock(); if (!node) { continue; } for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); + const auto group = weak_group.lock(); if (!group) { continue; } for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); + const auto subscription = weak_subscription.lock(); if (subscription) { if (subscription->get_subscription_handle()->data == subscriber_handle) { return subscription; @@ -102,12 +102,12 @@ MemoryStrategy::get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedP return nullptr; } for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); + const auto node = weak_node.lock(); if (!node) { continue; } for (auto & weak_group : node->get_callback_groups()) { - auto callback_group = weak_group.lock(); + const auto callback_group = weak_group.lock(); if (callback_group == group) { return node; } @@ -118,16 +118,16 @@ MemoryStrategy::get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedP rclcpp::callback_group::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_subscription( - const rclcpp::subscription::SubscriptionBase::SharedPtr subscription, + const rclcpp::subscription::SubscriptionBase::ConstSharedPtr subscription, const WeakNodeVector & weak_nodes) { for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); + const auto node = weak_node.lock(); if (!node) { continue; } for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); + const auto group = weak_group.lock(); if (!group) { continue; } From ecde7205609b564b442d8330043b95eaf1424888 Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Thu, 19 Nov 2015 10:51:31 -0800 Subject: [PATCH 8/8] troubleshooting deadlock --- rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index bb21eca9b1..6e020c4e8c 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -82,10 +82,12 @@ MultiThreadedExecutor::run(size_t this_thread_number) // Argh. execute_any_executable(any_exec); + /* if (any_exec) { std::lock_guard wait_lock(wait_mutex_); assert(any_exec->is_one_field_set()); any_exec.reset(); } + */ } }