Skip to content

Commit

Permalink
Improve multiple task processing. Add additional QoS to endpoints. (#23)
Browse files Browse the repository at this point in the history
* Refs #20090: Add Reliability QoS to endpoints

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20090: Store task data in a Sample pool instead of std::map. Refactor publish_to_user_method accordingly().

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20090: Store one tuple of callback args per task_id instead of only one shared tuple. This lets multiple callback invocations effectively have its own arguments.

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20090: Start Dispatcher before endpoints are created and enabled

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20090: Uncrusitfy

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
  • Loading branch information
Mario-DL authored Dec 11, 2023
1 parent a3edd7d commit c93ae2a
Show file tree
Hide file tree
Showing 12 changed files with 1,087 additions and 1,015 deletions.
32 changes: 27 additions & 5 deletions sustainml_cpp/include/sustainml_cpp/core/Callable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <functional>
#include <iostream>
#include <mutex>
#include <tuple>

#include <sustainml_cpp/config/Macros.hpp>
Expand Down Expand Up @@ -86,25 +87,46 @@ class Callable
*/
template <std::size_t... Is>
void invoke_user_cb(
int task_id,
helper::index<Is...>)
{
on_new_task_available(*std::get<Is>(user_cb_args_)...);
tuple* args;
{
std::lock_guard<std::mutex> lock (mtx_);
args = &user_cb_args_[task_id];
}
on_new_task_available(*std::get<Is>(*args)...);
}

/**
* @brief Returns the arguments with which the user callback
* will be later invoked
*
* @return Reference to the user callback args
* @return Reference to the user callback args (simple pointers)
*/
tuple& get_user_cb_args()
tuple& create_and_get_user_cb_args(
const int& task_id)
{
return user_cb_args_;
std::lock_guard<std::mutex> lock (mtx_);
user_cb_args_.insert({task_id, tuple()});
return user_cb_args_[task_id];
}

/**
* @brief Erases the element from the map by key
*
*/
void remove_task_args(
const int& task_id)
{
std::lock_guard<std::mutex> lock (mtx_);
user_cb_args_.erase(task_id);
}

private:

tuple user_cb_args_;
std::mutex mtx_;
std::map<int, tuple> user_cb_args_;
#else

public:
Expand Down
221 changes: 113 additions & 108 deletions sustainml_cpp/include/sustainml_cpp/core/Node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,121 +47,126 @@ class DataReaderListener;
namespace sustainml {
namespace core {

class NodeImpl;
class NodeControlListener;
class Dispatcher;
struct Options;
class NodeImpl;
class NodeControlListener;
class Dispatcher;
struct Options;

/**
* @brief This abstract class is the principal class of the project.
* Handles the DDS comunications, aggregates the dispatcher and provides
* the main methods for interacting with the user.
*
* This class is meant to be inherited by the different
* SustainML Nodes.
*/
class Node
{

friend class Dispatcher;
template<class T> friend class SamplesQueue;
template<class T> friend class NodeListener;

public:

SUSTAINML_CPP_DLL_API Node(
const std::string& name);

SUSTAINML_CPP_DLL_API Node(
const std::string& name,
const Options& opts);

SUSTAINML_CPP_DLL_API virtual ~Node();

/**
* @brief Called by the user to run the run.
*/
SUSTAINML_CPP_DLL_API void spin();

/**
* @brief Stops the execution of the node.
*/
SUSTAINML_CPP_DLL_API static void terminate();

/**
* @brief Retrieves the node name
*/
SUSTAINML_CPP_DLL_API const std::string& name();

/**
* @brief Retrieves the node status
*/
SUSTAINML_CPP_DLL_API const Status& status();

protected:

/**
* @brief This abstract class is the principal class of the project.
* Handles the DDS comunications, aggregates the dispatcher and provides
* the main methods for interacting with the user.
*
* This class is meant to be inherited by the different
* SustainML Nodes.
*/
class Node
{

friend class Dispatcher;
template<class T> friend class SamplesQueue;
template<class T> friend class NodeListener;

public:

SUSTAINML_CPP_DLL_API Node(const std::string &name);

SUSTAINML_CPP_DLL_API Node(const std::string &name,
const Options &opts);

SUSTAINML_CPP_DLL_API virtual ~Node();

/**
* @brief Called by the user to run the run.
*/
SUSTAINML_CPP_DLL_API void spin();

/**
* @brief Stops the execution of the node.
*/
SUSTAINML_CPP_DLL_API static void terminate();

/**
* @brief Retrieves the node name
*/
SUSTAINML_CPP_DLL_API const std::string& name();

/**
* @brief Retrieves the node status
*/
SUSTAINML_CPP_DLL_API const Status& status();

protected:

/**
* @brief Starts a new subscription (DataReader) in the
* given topic.
*
* @param topic The topic name
* @param type_name The type name
* @param listener Listener object inheriting from DataReaderListener
* @param opts Options to configure subscription QoS
*/
bool initialize_subscription(
* @brief Starts a new subscription (DataReader) in the
* given topic.
*
* @param topic The topic name
* @param type_name The type name
* @param listener Listener object inheriting from DataReaderListener
* @param opts Options to configure subscription QoS
*/
bool initialize_subscription(
const char* topic_name,
const char* type_name,
eprosima::fastdds::dds::DataReaderListener* listener,
const Options &opts);

/**
* @brief Starts a new publication (DataWriter) in the
* given topic.
*
* @param topic The topic name
* @param type_name The type name
* @param opts Options to configure publication QoS
*/
bool initialize_publication(
const Options& opts);

/**
* @brief Starts a new publication (DataWriter) in the
* given topic.
*
* @param topic The topic name
* @param type_name The type name
* @param opts Options to configure publication QoS
*/
bool initialize_publication(
const char* topic_name,
const char* type_name,
const Options &opts);

/**
* @brief Invokes the user callback with the provided inputs.
*
* @param inputs A vector containing the required samples. All the samples
* must correspond to the same task_id.
*/
virtual void publish_to_user(const std::vector<std::pair<int,void*>> inputs) = 0;

/**
* @brief Publishes the internal status of the node to DDS.
*/
void publish_node_status();

/**
* @brief Retrieves the node status
*/
void status(const Status& status);

/**
* @brief Retrieves the inner writers
*/
const std::vector<eprosima::fastdds::dds::DataWriter*>& writers();

private:

/**
* @brief Getter for the dispatcher
*
* @return A weak pointer to the Dispatcher object
*/
std::weak_ptr<Dispatcher> get_dispatcher();

//! Impl
NodeImpl* impl_;

};
const Options& opts);

/**
* @brief Invokes the user callback with the provided inputs.
*
* @param inputs A vector containing the required samples. All the samples
* must correspond to the same task_id.
*/
virtual void publish_to_user(
const int& task_id,
const std::vector<std::pair<int, void*>> inputs) = 0;

/**
* @brief Publishes the internal status of the node to DDS.
*/
void publish_node_status();

/**
* @brief Retrieves the node status
*/
void status(
const Status& status);

/**
* @brief Retrieves the inner writers
*/
const std::vector<eprosima::fastdds::dds::DataWriter*>& writers();

private:

/**
* @brief Getter for the dispatcher
*
* @return A weak pointer to the Dispatcher object
*/
std::weak_ptr<Dispatcher> get_dispatcher();

//! Impl
NodeImpl* impl_;

};

} // namespace core
} // namespace sustainml
Expand Down
Loading

0 comments on commit c93ae2a

Please sign in to comment.