Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fuse -> ROS 2 fuse_core: Fix Async #293

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions fuse_core/include/fuse_core/async_motion_model.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace fuse_core
{

/**
* @brief A motion model base class that provides node handles and a private callback queue.
* @brief A motion model base class that provides an internal node and an internal callback queue.
*
* A model model plugin is responsible for generating constraints that link together timestamps
* introduced by other sensors in the system. The AsyncMotionModel class is designed similar to a
Expand All @@ -56,11 +56,11 @@ namespace fuse_core
* There are a few notable differences between the AsyncMotionModel class and a standard ROS
* node. First and most obvious, the AsyncMotionModel class is designed as a plugin, with all of
* the stipulations and requirements that come with all ROS plugins (must be derived from a known
* base class, will be default constructed). Second, the AsyncMotionModel class provides a global
* and private node handle, both hooked to a local callback queue and local spinner. This makes
* base class, will be default constructed). Second, the AsyncMotionModel class provides an internal
* node that is hooked to a local callback queue and local executor on init. This makes
* it act like a full ROS node -- subscriptions trigger message callbacks, callbacks will fire
* sequentially, etc. However, authors of derived classes should be aware of this fact and avoid
* creating additional node handles, or at least take care when creating new node handles and
* creating additional sub-nodes, or at least take care when creating new sub-nodes and
* additional callback queues. Finally, the interaction of motion model nodes is best compared to
* a service call -- an external actor will provide a set of timestamps and wait for the motion
* model to respond with the required set of constraints to link the timestamps together (along
Expand Down Expand Up @@ -120,9 +120,9 @@ class AsyncMotionModel : public MotionModel
* This method will be called by the optimizer, in the optimizer's thread, after each Graph
* update is complete. This implementation repackages the provided \p graph, and inserts a
* call to onGraphUpdate() into this motion model's callback queue. This is meant to simplify
* thread synchronization. If this motion model uses a single-threaded spinner, then all
* thread synchronization. If this motion model uses a single-threaded executor, then all
* callbacks will fire sequentially and no semaphores are needed. If this motion model uses a
* multi-threaded spinner, then normal multithreading rules apply and data accessed in more
* multi-threaded executor, then normal multithreading rules apply and data accessed in more
* than one place should be guarded.
*
* @param[in] graph A read-only pointer to the graph object, allowing queries to be performed
Expand All @@ -135,9 +135,9 @@ class AsyncMotionModel : public MotionModel
* reading from the parameter server.
*
* This will be called for each plugin after construction and after the ros node has been
* initialized. The provided private node handle will be in a namespace based on the plugin's
* name. This should prevent conflicts and allow the same plugin to be used multiple times
* with different settings and topics.
* initialized. The provided node will be in a namespace based on the plugin's name. This should
* prevent conflicts and allow the same plugin to be used multiple times with different settings
* and topics.
*
* @param[in] name A unique name to give this plugin instance
*/
Expand All @@ -159,9 +159,9 @@ class AsyncMotionModel : public MotionModel
*
* This implementation inserts a call to onStart() into this motion model's callback queue.
* This method then blocks until the call to onStart() has completed. This is meant to
* simplify thread synchronization. If this motion model uses a single-threaded spinner, then
* simplify thread synchronization. If this motion model uses a single-threaded executor, then
* all callbacks will fire sequentially and no semaphores are needed. If this motion model
* uses a multithreaded spinner, then normal multithreading rules apply and data accessed in
* uses a multithreaded executor, then normal multithreading rules apply and data accessed in
* more than one place should be guarded.
*/
void start() override;
Expand All @@ -178,9 +178,9 @@ class AsyncMotionModel : public MotionModel
*
* This implementation inserts a call to onStop() into this motion model's callback queue.
* This method then blocks until the call to onStop() has completed. This is meant to
* simplify thread synchronization. If this motion model uses a single-threaded spinner, then
* simplify thread synchronization. If this motion model uses a single-threaded executor, then
* all callbacks will fire sequentially and no semaphores are needed. If this motion model
* uses a multithreaded spinner, then normal multithreading rules apply and data accessed in
* uses a multithreaded executor, then normal multithreading rules apply and data accessed in
* more than one place should be guarded.
*/
void stop() override;
Expand All @@ -192,16 +192,17 @@ class AsyncMotionModel : public MotionModel
std::string name_; //!< The unique name for this motion model instance
rclcpp::Node::SharedPtr node_; //!< The node for this motion model

//! A single/multi-threaded spinner assigned to the local callback queue
//! A single/multi-threaded executor assigned to the local callback queue
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_;

size_t executor_thread_count_;
std::thread spinner_; //!< Internal thread for spinning the executor
std::atomic<bool> initialized_ = false; //!< True if instance has been fully initialized

/**
* @brief Constructor
*
* Construct a new motion model and create a local callback queue and thread spinner.
* Construct a new motion model and create a local callback queue and internal executor.
*
* @param[in] thread_count The number of threads used to service the local callback queue
*/
Expand Down Expand Up @@ -250,7 +251,7 @@ class AsyncMotionModel : public MotionModel
* @brief Perform any required initialization for the motion model
*
* This could include things like reading from the parameter server or subscribing to topics.
* The class's node handles will be properly initialized before onInit() is called. Spinning
* The class's node will be properly initialized before onInit() is called. Spinning
* of the callback queue will not begin until after the call to onInit() completes.
*/
virtual void onInit() {}
Expand Down
26 changes: 13 additions & 13 deletions fuse_core/include/fuse_core/async_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,16 @@ namespace fuse_core
{

/**
* @brief A publisher base class that provides node handles attached to an internal callback queue
* serviced by a local thread (or threads) using a spinner.
* @brief A publisher base class that provides an internal callback queue attached to a ROS 2 Node
* serviced by a local executor.
*
* This allows publishers derived from this base class to operate similarly to a stand alone node
* -- any subscription or service callbacks will be executed within an independent thread. The
* notable differences include:
* - a default constructor is required (because this is a plugin)
* - subscriptions, services, parameter server interactions, etc. should be placed in the onInit()
* call instead of in the constructor
* - a global node handle and private node handle have already been created and attached to a local
* callback queue
* - a node has already been created and attached to a local callback queue
* - a special callback, notifyCallback(), will be fired every time the optimizer completes an
* optimization cycle
*/
Expand All @@ -77,10 +76,10 @@ class AsyncPublisher : public Publisher
/**
* @brief Initialize the AsyncPublisher object
*
* This will store the provided name and graph object, and create a global and private node
* handle that both use an internal callback queue serviced by a local thread. The
* AsyncPublisher::onInit() method will be called from here, once the node handles are
* properly configured.
* This will store the provided name and graph object, and create an internal node for this
* instance that will use an internal callback queue serviced by a local thread. The
* AsyncPublisher::onInit() method will be called from here, once the internal node is properly
* configured.
*
* @param[in] name A unique name to give this plugin instance
*/
Expand Down Expand Up @@ -120,9 +119,9 @@ class AsyncPublisher : public Publisher
*
* This implementation inserts a call to onStart() into this publisher's callback queue. This
* method then blocks until the call to onStart() has completed. This is meant to simplify
* thread synchronization. If this publisher uses a single-threaded spinner, then all
* thread synchronization. If this publisher uses a single-threaded executor, then all
* callbacks will fire sequentially and no semaphores are needed. If this publisher uses a
* multithreaded spinner, then normal multithreading rules apply and data accessed in more
* multithreaded executor, then normal multithreading rules apply and data accessed in more
* than one place should be guarded.
*/
void start() override;
Expand All @@ -139,7 +138,7 @@ class AsyncPublisher : public Publisher
*
* This implementation inserts a call to onStop() into this publisher's callback queue. This
* method then blocks until the call to onStop() has completed. This is meant to simplify
* thread synchronization. If this publisher uses a single-threaded spinner, then all
* thread synchronization. If this publisher uses a single-threaded executor, then all
* callbacks will fire sequentially and no semaphores are needed. If this publisher uses a
* multithreaded spinner, then normal multithreading rules apply and data accessed in more
* than one place should be guarded.
Expand All @@ -153,15 +152,16 @@ class AsyncPublisher : public Publisher
std::string name_; //!< The unique name for this publisher instance
rclcpp::Node::SharedPtr node_; //!< The node for this publisher

//! A single/multi-threaded spinner assigned to the local callback queue
//! A single/multi-threaded executor assigned to the local callback queue
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_;
size_t executor_thread_count_;
std::thread spinner_; //!< Internal thread for spinning the executor
std::atomic<bool> initialized_ = false; //!< True if instance has been fully initialized

/**
* @brief Constructor
*
* Constructs a new publisher with node handles, a local callback queue, and thread spinner.
* Constructs a new publisher with a local node, a local callback queue, and internal executor.
*
* @param[in] thread_count The number of threads used to service the local callback queue
*/
Expand Down
37 changes: 19 additions & 18 deletions fuse_core/include/fuse_core/async_sensor_model.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@ namespace fuse_core
{

/**
* @brief A sensor model base class that provides node handles and a private callback queue.
* @brief A sensor model base class that provides an internal node and an internal callback queue.
*
* A sensor model plugin is responsible for generating new constraints, packaging them in a
* fuse_core::Transaction object, and passing them along to the optimizer. The asynchronous
* sensor model plugin is designed similar to the nodelet plugin, attempting to be as generic and
* sensor model plugin is designed similar to a nodelet plugin, attempting to be as generic and
* easy to use as a standard ROS node.
*
* There are a few notable differences between the asynchronous sensor model plugin and a
* standard ROS node. First and most obvious, the sensor model is designed as a plugin, with all
* of the stipulations and requirements that come with all ROS plugins (must be derived from a
* known base class, will be default constructed). Second, the base AsyncSensorModel class
* provides a global and private node handle, both hooked to a local callback queue and local
* spinner. This makes it act like a full ROS node -- subscriptions trigger message callbacks,
* provides an internal node that is hooked to a local callback queue and local executor on
* init. This makes it act like a full ROS node -- subscriptions trigger message callbacks,
* callbacks will fire sequentially, etc. However, authors of derived sensor models should be
* aware of this fact and avoid creating additional node handles, or at least take care when
* creating new node handles and additional callback queues. Finally, instead of publishing the
* aware of this fact and avoid creating additional sub-nodes, or at least take care when
* creating new sub-nodes and additional callback queues. Finally, instead of publishing the
* generated constraints using a ROS publisher, the asynchronous sensor model provides a
* "sendTransaction()" method that should be called whenever the sensor is ready to send a
* fuse_core::Transaction object to the Optimizer. Under the hood, this executes the
Expand Down Expand Up @@ -103,9 +103,9 @@ class AsyncSensorModel : public SensorModel
* This method will be called by the optimizer, in the optimizer's thread, after each Graph
* update is complete. This implementation repackages the provided \p graph, and inserts a
* call to onGraphUpdate() into this sensor's callback queue. This is meant to simplify
* thread synchronization. If this sensor uses a single-threaded spinner, then all callbacks
* thread synchronization. If this sensor uses a single-threaded executor, then all callbacks
* will fire sequentially and no semaphores are needed. If this sensor uses a multi-threaded
* spinner, then normal multithreading rules apply and data accessed in more than one place
* executor, then normal multithreading rules apply and data accessed in more than one place
* should be guarded.
*
* @param[in] graph A read-only pointer to the graph object, allowing queries to be performed
Expand All @@ -118,9 +118,9 @@ class AsyncSensorModel : public SensorModel
* reading from the parameter server.
*
* This will be called for each plugin after construction and after the ROS node has been
* initialized. The provided private node handle will be in a namespace based on the plugin's
* name. This should prevent conflicts and allow the same plugin to be used multiple times
* with different settings and topics.
* initialized. The provided node will be in a namespace based on the plugin's name. This should
* prevent conflicts and allow the same plugin to be used multiple times with different settings
* and topics.
*
* @param[in] name A unique name to give this plugin instance
* @param[in] transaction_callback The function to call every time a transaction is published
Expand Down Expand Up @@ -162,9 +162,9 @@ class AsyncSensorModel : public SensorModel
*
* This implementation inserts a call to onStart() into this sensor model's callback queue.
* This method then blocks until the call to onStart() has completed. This is meant to
* simplify thread synchronization. If this sensor model uses a single-threaded spinner, then
* simplify thread synchronization. If this sensor model uses a single-threaded executor, then
* all callbacks will fire sequentially and no semaphores are needed. If this sensor model
* uses a multithreaded spinner, then normal multithreading rules apply and data accessed in
* uses a multithreaded executor, then normal multithreading rules apply and data accessed in
* more than one place should be guarded.
*/
void start() override;
Expand All @@ -180,9 +180,9 @@ class AsyncSensorModel : public SensorModel
*
* This implementation inserts a call to onStop() into this sensor model's callback queue.
* This method then blocks until the call to onStop() has completed. This is meant to
* simplify thread synchronization. If this sensor model uses a single-threaded spinner, then
* simplify thread synchronization. If this sensor model uses a single-threaded executor, then
* all callbacks will fire sequentially and no semaphores are needed. If this sensor model
* uses a multithreaded spinner, then normal multithreading rules apply and data accessed in
* uses a multithreaded executor, then normal multithreading rules apply and data accessed in
* more than one place should be guarded.
*/
void stop() override;
Expand All @@ -194,18 +194,19 @@ class AsyncSensorModel : public SensorModel
std::string name_; //!< The unique name for this sensor model instance
rclcpp::Node::SharedPtr node_; //!< The node for this sensor model

//! A single/multi-threaded spinner assigned to the local callback queue
//! A single/multi-threaded executor assigned to the local callback queue
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_;

//! The function to be executed every time a Transaction is "published"
TransactionCallback transaction_callback_;
size_t executor_thread_count_;
std::thread spinner_; //!< Internal thread for spinning the executor
std::atomic<bool> initialized_ = false; //!< True if instance has been fully initialized

/**
* @brief Constructor
*
* Construct a new sensor model and create a local callback queue and thread spinner.
* Construct a new sensor model and create a local callback queue and internal executor.
*
* @param[in] thread_count The number of threads used to service the local callback queue
*/
Expand Down Expand Up @@ -234,7 +235,7 @@ class AsyncSensorModel : public SensorModel
* @brief Perform any required initialization for the sensor model
*
* This could include things like reading from the parameter server or subscribing to topics.
* The class's node handles will be properly initialized before onInit() is called. Spinning
* The class's node will be properly initialized before onInit() is called. Spinning
* of the callback queue will not begin until after the call to onInit() completes.
*
* Derived sensor models classes must implement this function, because otherwise I'm not sure
Expand Down
2 changes: 2 additions & 0 deletions fuse_core/include/fuse_core/callback_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ class CallbackAdapter : public rclcpp::Waitable

void removeAllCallbacks();

void triggerGuardCondition();

private:
rcl_guard_condition_t gc_; //!< guard condition to drive the waitable

Expand Down
1 change: 1 addition & 0 deletions fuse_core/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

<buildtool_depend>ament_cmake</buildtool_depend>

<depend>boost</depend>
<depend>eigen</depend>
<depend>libceres-dev</depend>
<depend>libgoogle-glog-dev</depend>
Expand Down
Loading