Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fuse -> ROS 2 fuse_core : Nodes and Waitables #284

Merged
merged 17 commits into from
Nov 12, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions fuse_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,22 @@ include(boost-extras.cmake)

## fuse_core library
add_library(${PROJECT_NAME} SHARED
# src/async_motion_model.cpp
# src/async_publisher.cpp
# src/async_sensor_model.cpp
# src/ceres_options.cpp
src/constraint.cpp
src/graph.cpp
src/graph_deserializer.cpp
src/loss.cpp
src/serialization.cpp
src/timestamp_manager.cpp
src/transaction.cpp
src/transaction_deserializer.cpp
src/uuid.cpp
src/variable.cpp
src/async_motion_model.cpp
src/async_publisher.cpp
src/async_sensor_model.cpp
src/callback_wrapper.cpp
# src/ceres_options.cpp
src/constraint.cpp
src/graph.cpp
src/graph_deserializer.cpp
src/loss.cpp
src/serialization.cpp
src/time.cpp
src/timestamp_manager.cpp
src/transaction.cpp
src/transaction_deserializer.cpp
src/uuid.cpp
src/variable.cpp
)
target_include_directories(${PROJECT_NAME} PUBLIC
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>"
Expand Down
22 changes: 10 additions & 12 deletions fuse_core/include/fuse_core/async_motion_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@
#ifndef FUSE_CORE_ASYNC_MOTION_MODEL_H
#define FUSE_CORE_ASYNC_MOTION_MODEL_H

#include <fuse_core/graph.h>
#include <fuse_core/fuse_macros.h>
#include <fuse_core/motion_model.h>

#include <fuse_core/transaction.h>
// #include <rclcpp/rclcpp.hpp> TODO(CH3): Uncomment when ready
#include <ros/callback_queue.h>
#include <ros/node_handle.h>
#include <ros/spinner.h>
#include <fuse_core/graph.h>
#include <fuse_core/callback_wrapper.h>

#include <rclcpp/rclcpp.hpp>

#include <string>

Expand Down Expand Up @@ -171,13 +170,12 @@ class AsyncMotionModel : public MotionModel
void stop() override;

protected:
ros::CallbackQueue callback_queue_; //!< The local callback queue used for all subscriptions
std::shared_ptr<fuse_core::CallbackAdapter> callback_queue_; //!< The callback queue used for fuse internal callbacks
std::string name_; //!< The unique name for this motion model instance
// rclcpp::Node::SharedPtr node_; //!< The node for this motion model (TODO(CH3): Uncomment when it's time)
ros::NodeHandle node_handle_; //!< A node handle in the global namespace using the local callback queue
ros::NodeHandle private_node_handle_; //!< A node handle in the private namespace using the local callback queue
ros::AsyncSpinner spinner_; //!< A single/multi-threaded spinner assigned to the local callback queue

rclcpp::Node::SharedPtr node_; //!< The node for this motion model
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the node is unused except for adding the waitable to the executor.

Maybe we could pass the node or node interfaces via MotionModel::Initialize()? That eliminates the extra context and node, and it looks doable higher in the stack where MotionModel::initialize() is called by an Optimizer which does have a Node.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The node here is mostly used to structure parameters in the config files
It's reasonable to run the graph with dozens of named motion_models, and each one needs a unique name and parameters, each motion model can be associated with a different physical component that may or may not be included in any given deployment.

The separated node also allows complex motion_models (usually associated with a single robot chassis) to be targeted by ros cli tools as a separate entity in the ROS graph

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BrettRD would you say that it's preferable then to keep the nodes separated? This was one of the places I planned to circle back on to try to combine it all into a single node (with the iffy bit being what happens when all the callback queues are merged), but the point you raised about CLI tools is pretty valid..

I'm not sure of the implications on lifecycle nodes and node components though.

Copy link
Collaborator

@BrettRD BrettRD Nov 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what's better. (edit: it's a pretty huge API divergence to change the shape of Fuse on the node graph)

Have a think about sensor_models and fuse_publishers too, they will need to interact with sensor nodes and planning tools that will definitely appear under ros namespaces, so it would be nice for a plugin to exist in the node graph under the relevant namespace

One node wouldn't be a disaster.
You can still isolate device-specific configs into separate conf-only packages and load multiple config files into the optimiser node at launch.
However, if it's one node, it's going to make a blob in the node graph so dense it'll become a running joke. edit: (I have this exact problem exposing gstreamer properties on a pipeline node, it's not unmanageable)

I was expecting to run separate nodes and have to do some fancy footwork to allow fuse_optimiser to side-load its plugins into its host composable node container, and then have a lot of trouble getting access to the parameters.
Either that, or have fuse_optimiser inherit from container, and cause a headache when any other package tries to do the same
I haven't studied lifecycle nodes enough to comment there

I don't think the callback queues will be a problem; executors are built to cope with multiple queues, and most of the fuse plugins are sharing the one optimiser queue

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The node here is mostly used to structure parameters in the config files
It's reasonable to run the graph with dozens of named motion_models, and each one needs a unique name and parameters, each motion model can be associated with a different physical component that may or may not be included in any given deployment.

I think we can still structure parameters in the config file by prepending name_ + '.param_name' to each parameter name.

The separated node also allows complex motion_models (usually associated with a single robot chassis) to be targeted by ros cli tools as a separate entity in the ROS graph

I can think of a few reasons to avoid multiple nodes, but I'm not familiar enough with fuse to judge how useful it would be to a node per async motion model/publisher/sensor model. @svwilliams what are your thoughts?

Reasons to avoid multiple nodes:

  • Nodes come with extra overhead like a graph guard condition, a rosout publisher, a parameter events publisher, and services for setting and getting parameters.
  • Atomically changing parameters is scoped to a single node. A single node would allow someone to change parameters to two motion models atomically, such that if one failed (maybe from an invalid value) none of the changes would go through
  • Building the nodes internally prevents choosing whether they use regular or lifecycle nodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where my lack of experience with ROS2 is going to hinder me offering sound advice.

I can discuss the design intent in ROS1 and see if that helps.

  • The motion models and sensors models act very node-like. They subscribe to sensor topics, process the data, and "publish" the results for consumption by the optimizer.
  • The reason the sensor models and motion models are not stand-alone nodes or nodelets in ROS1 is the each model "publishes" a derived class object instead of a message. Each model creates derived Constraints and Variables for use by the Optimizer. New user-derived models may create Constraints and Variables the Optimizer node cannot know about at compile-time. I could not think of an elegant solution to transmit derived objects using the standard pub-sub mechanisms in ROS1.
  • Using ROS1 plugins, however, I can define an interface to the Optimizer that sends derived object pointers around, so that was what I implemented.
  • But there is a downside to the plugin system. The Optimizer class creates the sensor and motion model instances as class variables. The Optimizer can call methods of the loaded plugins, so it would be easy to poll each loaded sensor and motion model for new data. But the sensor models and motion models are producing data for the Optimizer. In an ideal world, the models would decide for themselves when they had new data and would push that into the Optimizer. It is difficult for the plugins to call methods of the Optimizer.
  • To solve the polling/pushing issues, there are a series callbacks registered between the Optimizer and the Models. This results in rather complex threading implications.
  • To hide all all of that complexity, the AsyncModels each run their own thread(s) and callback spinner. This makes them act a lot like a conventional ROS node or nodelet. Each derived Model can define the number of threads used to service its callbacks, it gets a private node handle in its namespace for parameter reading, etc.

Does any of that help?

rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_; //!< A single/multi-threaded spinner assigned to the local callback queue
rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr waitables_interface_;
size_t executor_thread_count_;
/**
* @brief Constructor
*
Expand Down
28 changes: 16 additions & 12 deletions fuse_core/include/fuse_core/async_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,22 @@
#ifndef FUSE_CORE_ASYNC_PUBLISHER_H
#define FUSE_CORE_ASYNC_PUBLISHER_H

#include <fuse_core/graph.h>
#include <fuse_core/fuse_macros.h>
#include <fuse_core/publisher.h>

#include <fuse_core/transaction.h>
// #include <rclcpp/rclcpp.hpp> TODO(CH3): Uncomment when ready
#include <ros/callback_queue.h>
#include <ros/node_handle.h>
#include <ros/spinner.h>
#include <fuse_core/graph.h>
#include <fuse_core/callback_wrapper.h>

#include <fuse_core/fuse_macros.h>

#include <rclcpp/rclcpp.hpp>

#include <functional>
#include <utility>
#include <string>



namespace fuse_core
{

Expand Down Expand Up @@ -134,12 +138,12 @@ class AsyncPublisher : public Publisher
void stop() override;

protected:
ros::CallbackQueue callback_queue_; //!< The local callback queue used for all subscriptions
std::shared_ptr<fuse_core::CallbackAdapter> callback_queue_; //!< The callback queue used for fuse internal callbacks
std::string name_; //!< The unique name for this publisher instance
// rclcpp::Node::SharedPtr node_; //!< The node for this publisher (TODO(CH3): Uncomment when it's time)
ros::NodeHandle node_handle_; //!< A node handle in the global namespace using the local callback queue
ros::NodeHandle private_node_handle_; //!< A node handle in the private namespace using the local callback queue
ros::AsyncSpinner spinner_; //!< A single/multi-threaded spinner assigned to the local callback queue
rclcpp::Node::SharedPtr node_; //!< The node for this publisher
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_; //!< A single/multi-threaded spinner assigned to the local callback queue
rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr waitables_interface_;
size_t executor_thread_count_;

/**
* @brief Constructor
Expand All @@ -153,7 +157,7 @@ class AsyncPublisher : public Publisher
/**
* @brief Perform any required initialization for the publisher
*
* The node_handle_ and private_node_handle_ member variables will be completely initialized before this
* The node_ and member variables will be completely initialized before this
* call occurs. Spinning of the callback queue will not begin until after the call to AsyncPublisher::onInit()
* completes.
*
Expand Down
19 changes: 8 additions & 11 deletions fuse_core/include/fuse_core/async_sensor_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,11 @@
#ifndef FUSE_CORE_ASYNC_SENSOR_MODEL_H
#define FUSE_CORE_ASYNC_SENSOR_MODEL_H

#include <fuse_core/graph.h>
#include <fuse_core/fuse_macros.h>
#include <fuse_core/sensor_model.h>

#include <fuse_core/transaction.h>
// #include <rclcpp/rclcpp.hpp> TODO(CH3): Uncomment when ready
#include <ros/callback_queue.h>
#include <ros/node_handle.h>
#include <ros/spinner.h>
#include <fuse_core/graph.h>
#include <fuse_core/callback_wrapper.h>

#include <functional>
#include <string>
Expand Down Expand Up @@ -175,13 +172,13 @@ class AsyncSensorModel : public SensorModel
void stop() override;

protected:
ros::CallbackQueue callback_queue_; //!< The local callback queue used for all subscriptions
std::shared_ptr<fuse_core::CallbackAdapter> callback_queue_; //!< The callback queue used for fuse internal callbacks
std::string name_; //!< The unique name for this sensor model instance
// rclcpp::Node::SharedPtr node_; //!< The node for this sensor model (TODO(CH3): Uncomment when it's time)
ros::NodeHandle node_handle_; //!< A node handle in the global namespace using the local callback queue
ros::NodeHandle private_node_handle_; //!< A node handle in the private namespace using the local callback queue
ros::AsyncSpinner spinner_; //!< A single/multi-threaded spinner assigned to the local callback queue
rclcpp::Node::SharedPtr node_; //!< The node for this sensor model
rclcpp::executors::MultiThreadedExecutor::SharedPtr executor_; //!< A single/multi-threaded spinner assigned to the local callback queue
TransactionCallback transaction_callback_; //!< The function to be executed every time a Transaction is "published"
rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr waitables_interface_;
size_t executor_thread_count_;

/**
* @brief Constructor
Expand Down
77 changes: 69 additions & 8 deletions fuse_core/include/fuse_core/callback_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef FUSE_CORE_CALLBACK_WRAPPER_H
#define FUSE_CORE_CALLBACK_WRAPPER_H

#include <ros/callback_queue_interface.h>

#include <functional>
#include <future>
#include <deque>

#include <rclcpp/rclcpp.hpp>

namespace fuse_core
{
Expand Down Expand Up @@ -76,18 +77,32 @@ namespace fuse_core
* }
* };
*
* auto callback_queue = std::make_shared<CallbackAdapter>(
* rclcpp::contexts::get_global_default_context());
* node->get_node_waitables_interface()->add_waitable(callback_queue, (rclcpp::CallbackGroup::SharedPtr) nullptr);
* MyClass my_object;
* std::vector<double> really_big_data(1000000);
* auto callback = boost::make_shared<CallbackWrapper<double> >(
* std::bind(&MyClass::processData, &my_object, std::ref(really_big_data)));
* callback_queue->add_callback(callback);
* std::future<double> result = callback->getFuture();
* ros::getGlobalCallbackQueue()->addCallback(callback);
* result.wait();
* RCLCPP_INFO_STREAM(rclcpp::get_logger("fuse"), "The result is: " << result.get());
* @endcode
*/

class CallbackWrapperBase
{
public:
/**
* @brief Call this function. This is used by the callback queue.
*/
virtual void call() = 0;

};

template <typename T>
class CallbackWrapper : public ros::CallbackInterface
class CallbackWrapper : public CallbackWrapperBase
{
public:
using CallbackFunction = std::function<T(void)>;
Expand All @@ -113,10 +128,9 @@ class CallbackWrapper : public ros::CallbackInterface
/**
* @brief Call this function. This is used by the callback queue.
*/
CallResult call() override
void call()
{
promise_.set_value(callback_());
return Success;
}

private:
Expand All @@ -127,13 +141,60 @@ class CallbackWrapper : public ros::CallbackInterface
// Specialization to handle 'void' return types
// Specifically, promise_.set_value(callback_()) does not work if callback_() returns void.
template <>
inline ros::CallbackInterface::CallResult CallbackWrapper<void>::call()
inline void CallbackWrapper<void>::call()
{
callback_();
promise_.set_value();
return Success;
}


class CallbackAdapter : public rclcpp::Waitable
{
public:

CallbackAdapter(std::shared_ptr<rclcpp::Context> context_ptr);

/**
* @brief tell the CallbackGroup how many guard conditions are ready in this waitable
*/
size_t get_number_of_ready_guard_conditions() override;


/**
* @brief tell the CallbackGroup that this waitable is ready to execute anything
*/
bool is_ready(rcl_wait_set_t * wait_set) override;


/**
* @brief add_to_wait_set is called by rclcpp during NodeWaitables::add_waitable() and CallbackGroup::add_waitable()
waitable_ptr = std::make_shared<CallbackWrapper>();
node->get_node_waitables_interface()->add_waitable(waitable_ptr, (rclcpp::CallbackGroup::SharedPtr) nullptr);
*/
void add_to_wait_set(rcl_wait_set_t * wait_set) override;

std::shared_ptr< void > take_data() override;

// XXX check this against the threading model of the multi-threaded executor.
void execute(std::shared_ptr<void> & data) override;

void addCallback(const std::shared_ptr<CallbackWrapperBase> &callback);

void addCallback(std::shared_ptr<CallbackWrapperBase> && callback);

void removeAllCallbacks();


private:
std::recursive_mutex reentrant_mutex_; //!< mutex to allow multiple threads to add callbacks into a single queue simultaneously
rcl_guard_condition_t gc_; //!< guard condition to drive the waitable

std::recursive_mutex queue_mutex_; //!< mutex to allow this callback to be added to multiple callback groups simultaneously
std::mutex ready_mutex_; //!< mutex to prevent multiple simultaneous calls of is_ready
std::deque<std::shared_ptr<CallbackWrapperBase> > callback_queue_;
};


} // namespace fuse_core

#endif // FUSE_CORE_CALLBACK_WRAPPER_H
2 changes: 0 additions & 2 deletions fuse_core/include/fuse_core/throttled_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
#ifndef FUSE_CORE_THROTTLED_CALLBACK_H
#define FUSE_CORE_THROTTLED_CALLBACK_H

//#include <ros/subscriber.h>

#include <functional>
#include <utility>

Expand Down
Loading