Skip to content

Commit

Permalink
Service feature added to sustainml_cpp
Browse files Browse the repository at this point in the history
Signed-off-by: Javier Gil Aviles <javiergil@eprosima.com>
  • Loading branch information
Javgilavi committed Jan 22, 2025
1 parent 61e2015 commit 61eac3f
Show file tree
Hide file tree
Showing 8 changed files with 465 additions and 25 deletions.
142 changes: 142 additions & 0 deletions sustainml_cpp/include/sustainml_cpp/core/RequestReplier.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file RequestReplier.hpp
*/

#ifndef SUSTAINMLCPP_CORE_REQUESTREPLEIR_HPP
#define SUSTAINMLCPP_CORE_REQUESTREPLEIR_HPP

#include <fastdds/dds/subscriber/DataReaderListener.hpp>
#include <sustainml_cpp/types/types.hpp>
#include <types/typesImplPubSubTypes.hpp>

#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
#include <fastdds/dds/publisher/qos/PublisherQos.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include <fastdds/dds/subscriber/qos/SubscriberQos.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>

namespace sustainml {
namespace core {

/**
* @brief This class in charge of sending request or response
* and listen for "response" or "request" respectively.
*
*/
class RequestReplier
{

friend class Node;
friend class OrchestratorNode;

public:

RequestReplier(
std::function<void(void*)> callback,
const char* topicw,
const char* topicr);

~RequestReplier();

/**
* @brief Method used to send the response to the request in the Nodes.
*
* @param res Response message
*/
void write_res(
ResponseTypeImpl* res);

/**
* @brief Method used to send the request of configuration in the OrchestratorNode.
*
* @param req Request message
*/
void write_req(
RequestTypeImpl* req);

protected:

std::function<void(void*)> callback_;
const char* topicr_;
const char* topicw_;

eprosima::fastdds::dds::DomainParticipant* participant_;

eprosima::fastdds::dds::Publisher* publisher_;

eprosima::fastdds::dds::Subscriber* subscriber_;

eprosima::fastdds::dds::TypeSupport typeRes_;
eprosima::fastdds::dds::TypeSupport typeReq_;

eprosima::fastdds::dds::Topic* topicR_;
eprosima::fastdds::dds::Topic* topicW_;

eprosima::fastdds::dds::DataWriter* writer_;

eprosima::fastdds::dds::DataReader* reader_;

private:

class RequestReplyControlListener : public eprosima::fastdds::dds::DataReaderListener
{
public:

RequestReplyControlListener(
RequestReplier* node);

virtual ~RequestReplyControlListener();

/**
* @brief Callback executed when a new sample is available on the DataReader.
*
* @param reader The DataReader having new available samples.
*/
void on_data_available(
eprosima::fastdds::dds::DataReader* reader);

/**
* @brief Callback executed when a DataReader matching status change.
*
* @param reader The DataReader.
* @param status The status of the subscription.
*/
void on_subscription_matched(
eprosima::fastdds::dds::DataReader* reader,
const eprosima::fastdds::dds::SubscriptionMatchedStatus& status);

private:

RequestReplier* node_;
size_t matched_;
RequestTypeImpl req_;
ResponseTypeImpl res_;

}
listener_;
};

} // namespace core
} // namespace sustainml

#endif // SUSTAINMLCPP_CORE_REQUESTREPLEIR_HPP
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class DataWriter;
} // namespace eprosima

namespace sustainml {
namespace core {
class RequestReplier;
} // namespace core
namespace orchestrator {

class ModuleNodeProxy;
Expand Down Expand Up @@ -174,14 +177,14 @@ class OrchestratorNode
void send_control_command(
const types::NodeControl& cmd);

/**
/**
* @brief This method sends a request to node via service for changing its configuration.
* @param [in] req configuration request, contain which node and configuration file
* @param [out] res response from the node with its new configuration
*/
bool configuration_request(
const types::RequestType req,
const types::ResponseType& res);
types::RequestType req,
types::ResponseType& res);

/**
* @brief Public method to get the mutex in order to correctly synchronise user
Expand Down Expand Up @@ -267,6 +270,11 @@ class OrchestratorNode
std::atomic_bool terminated_{false};
std::condition_variable initialization_cv_;

std::condition_variable cv_;

types::ResponseType res_;
sustainml::core::RequestReplier* req_res_;

/**
* @brief This class implements the callbacks for the DomainParticipant
*/
Expand Down
14 changes: 14 additions & 0 deletions sustainml_cpp/include/sustainml_cpp/types/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2435,6 +2435,13 @@ class RequestType
eProsima_user_DllExport RequestType& operator =(
RequestType&& x) noexcept;

/*!
* @brief Copy assignment.
* @param x Reference to the object RequestTypeImpl that will be copied.
*/
eProsima_user_DllExport RequestType& operator =(
const RequestTypeImpl* x);

/*!
* @brief Comparison operator.
* @param x RequestType object to compare.
Expand Down Expand Up @@ -2582,6 +2589,13 @@ class ResponseType
eProsima_user_DllExport ResponseType& operator =(
ResponseType&& x) noexcept;

/*!
* @brief Copy assignment.
* @param x Reference to the object ResponseTypeImpl that will be copied.
*/
eProsima_user_DllExport ResponseType& operator =(
const ResponseTypeImpl* x);

/*!
* @brief Comparison operator.
* @param x ResponseType object to compare.
Expand Down
43 changes: 41 additions & 2 deletions sustainml_cpp/src/cpp/core/NodeImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
#include <fastdds/dds/topic/TypeSupport.hpp>

#include <common/Common.hpp>
#include <core/NodeImpl.hpp>
#include <core/Dispatcher.hpp>
#include <core/NodeImpl.hpp>
#include <core/Options.hpp>
#include <types/typesImplPubSubTypes.hpp>
#include <types/typesImplTypeObjectSupport.hpp>

Expand Down Expand Up @@ -83,9 +84,28 @@ NodeImpl::NodeImpl(
, control_listener_(this)
, req_res_listener_(req_res_listener)
{
req_res_ = new RequestReplier([this, name](void* input)
{
RequestTypeImpl* in = static_cast<RequestTypeImpl*>(input);
types::RequestType req;
req = in;
std::cout << "Size of request received: " << sizeof(req) << " bytes" << std::endl;
std::cout << "Configuration received: " << static_cast<RequestTypeImpl*>(input)->configuration() << std::endl;
std::cout << "Node ID " << static_cast<int32_t>(common::get_node_id_from_name(name))
<< " receive request for node_id " << req.node_id() << std::endl;
if (req.node_id() == static_cast<int32_t>(common::get_node_id_from_name(name)))
{
std::cout << "Request successfully receive in " << name << std::endl;
std::cout << "Configuring..." << std::endl;
types::ResponseType res;
req_res_listener_.on_configuration_request(req, res);
req_res_->write_res(res.get_impl());
}
}, "sustainml/response", "sustainml/request");

if (!init(name))
{
EPROSIMA_LOG_ERROR(NODE, "Initialization Failed with the provided Options");
EPROSIMA_LOG_ERROR(NODE, "Initialization Failed");
}
}

Expand All @@ -102,6 +122,25 @@ NodeImpl::NodeImpl(
, control_listener_(this)
, req_res_listener_(req_res_listener)
{
req_res_ = new RequestReplier([this, name](void* input)
{
RequestTypeImpl* in = static_cast<RequestTypeImpl*>(input);
types::RequestType req;
req = in;
std::cout << "Size of request received: " << sizeof(req) << " bytes" << std::endl;
std::cout << "Configuration received: " << static_cast<RequestTypeImpl*>(input)->configuration() << std::endl;
std::cout << "Node ID " << static_cast<int32_t>(common::get_node_id_from_name(name))
<< " receive request for node_id " << req.node_id() << std::endl;
if (req.node_id() == static_cast<int32_t>(common::get_node_id_from_name(name)))
{
std::cout << "Request successfully receive in " << name << std::endl;
std::cout << "Configuring..." << std::endl;
types::ResponseType res;
req_res_listener_.on_configuration_request(req, res);
req_res_->write_res(res.get_impl());
}
}, "sustainml/response", "sustainml/request");

if (!init(name, opts))
{
EPROSIMA_LOG_ERROR(NODE, "Initialization Failed with the provided Options");
Expand Down
4 changes: 3 additions & 1 deletion sustainml_cpp/src/cpp/core/NodeImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <core/Options.hpp>
#include <core/RequestReplyListener.hpp>
#include <core/RequestReplier.hpp>
#include <types/typesImplPubSubTypes.hpp>

#include <utility>
Expand Down Expand Up @@ -53,7 +54,6 @@ namespace core {

class Dispatcher;
class Node;
class RequestReplyListener;
struct Options;

/**
Expand Down Expand Up @@ -166,6 +166,8 @@ class NodeImpl

NodeStatusImpl node_status_;

RequestReplier* req_res_;

RequestReplyListener& req_res_listener_;

private:
Expand Down
Loading

0 comments on commit 61eac3f

Please sign in to comment.