diff --git a/sustainml_cpp/include/sustainml_cpp/core/RequestReplier.hpp b/sustainml_cpp/include/sustainml_cpp/core/RequestReplier.hpp new file mode 100644 index 0000000..7e2ac8f --- /dev/null +++ b/sustainml_cpp/include/sustainml_cpp/core/RequestReplier.hpp @@ -0,0 +1,142 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file RequestReplier.hpp + */ + +#ifndef SUSTAINMLCPP_CORE_REQUESTREPLEIR_HPP +#define SUSTAINMLCPP_CORE_REQUESTREPLEIR_HPP + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace sustainml { +namespace core { + +/** + * @brief This class in charge of sending request or response + * and listen for "response" or "request" respectively. + * + */ +class RequestReplier +{ + + friend class Node; + friend class OrchestratorNode; + +public: + + RequestReplier( + std::function callback, + const char* topicw, + const char* topicr); + + ~RequestReplier(); + + /** + * @brief Method used to send the response to the request in the Nodes. + * + * @param res Response message + */ + void write_res( + ResponseTypeImpl* res); + + /** + * @brief Method used to send the request of configuration in the OrchestratorNode. + * + * @param req Request message + */ + void write_req( + RequestTypeImpl* req); + +protected: + + std::function callback_; + const char* topicr_; + const char* topicw_; + + eprosima::fastdds::dds::DomainParticipant* participant_; + + eprosima::fastdds::dds::Publisher* publisher_; + + eprosima::fastdds::dds::Subscriber* subscriber_; + + eprosima::fastdds::dds::TypeSupport typeRes_; + eprosima::fastdds::dds::TypeSupport typeReq_; + + eprosima::fastdds::dds::Topic* topicR_; + eprosima::fastdds::dds::Topic* topicW_; + + eprosima::fastdds::dds::DataWriter* writer_; + + eprosima::fastdds::dds::DataReader* reader_; + +private: + + class RequestReplyControlListener : public eprosima::fastdds::dds::DataReaderListener + { + public: + + RequestReplyControlListener( + RequestReplier* node); + + virtual ~RequestReplyControlListener(); + + /** + * @brief Callback executed when a new sample is available on the DataReader. + * + * @param reader The DataReader having new available samples. + */ + void on_data_available( + eprosima::fastdds::dds::DataReader* reader); + + /** + * @brief Callback executed when a DataReader matching status change. + * + * @param reader The DataReader. + * @param status The status of the subscription. + */ + void on_subscription_matched( + eprosima::fastdds::dds::DataReader* reader, + const eprosima::fastdds::dds::SubscriptionMatchedStatus& status); + + private: + + RequestReplier* node_; + size_t matched_; + RequestTypeImpl req_; + ResponseTypeImpl res_; + + } + listener_; +}; + +} // namespace core +} // namespace sustainml + +#endif // SUSTAINMLCPP_CORE_REQUESTREPLEIR_HPP diff --git a/sustainml_cpp/include/sustainml_cpp/orchestrator/OrchestratorNode.hpp b/sustainml_cpp/include/sustainml_cpp/orchestrator/OrchestratorNode.hpp index bf23565..79f828c 100644 --- a/sustainml_cpp/include/sustainml_cpp/orchestrator/OrchestratorNode.hpp +++ b/sustainml_cpp/include/sustainml_cpp/orchestrator/OrchestratorNode.hpp @@ -42,6 +42,9 @@ class DataWriter; } // namespace eprosima namespace sustainml { +namespace core { +class RequestReplier; +} // namespace core namespace orchestrator { class ModuleNodeProxy; @@ -174,14 +177,14 @@ class OrchestratorNode void send_control_command( const types::NodeControl& cmd); - /** + /** * @brief This method sends a request to node via service for changing its configuration. * @param [in] req configuration request, contain which node and configuration file * @param [out] res response from the node with its new configuration */ bool configuration_request( - const types::RequestType req, - const types::ResponseType& res); + types::RequestType req, + types::ResponseType& res); /** * @brief Public method to get the mutex in order to correctly synchronise user @@ -267,6 +270,11 @@ class OrchestratorNode std::atomic_bool terminated_{false}; std::condition_variable initialization_cv_; + std::condition_variable cv_; + + types::ResponseType res_; + sustainml::core::RequestReplier* req_res_; + /** * @brief This class implements the callbacks for the DomainParticipant */ diff --git a/sustainml_cpp/include/sustainml_cpp/types/types.hpp b/sustainml_cpp/include/sustainml_cpp/types/types.hpp index 851dacb..2864d81 100644 --- a/sustainml_cpp/include/sustainml_cpp/types/types.hpp +++ b/sustainml_cpp/include/sustainml_cpp/types/types.hpp @@ -2435,6 +2435,13 @@ class RequestType eProsima_user_DllExport RequestType& operator =( RequestType&& x) noexcept; + /*! + * @brief Copy assignment. + * @param x Reference to the object RequestTypeImpl that will be copied. + */ + eProsima_user_DllExport RequestType& operator =( + const RequestTypeImpl* x); + /*! * @brief Comparison operator. * @param x RequestType object to compare. @@ -2582,6 +2589,13 @@ class ResponseType eProsima_user_DllExport ResponseType& operator =( ResponseType&& x) noexcept; + /*! + * @brief Copy assignment. + * @param x Reference to the object ResponseTypeImpl that will be copied. + */ + eProsima_user_DllExport ResponseType& operator =( + const ResponseTypeImpl* x); + /*! * @brief Comparison operator. * @param x ResponseType object to compare. diff --git a/sustainml_cpp/src/cpp/core/NodeImpl.cpp b/sustainml_cpp/src/cpp/core/NodeImpl.cpp index eeceda8..a1f833e 100644 --- a/sustainml_cpp/src/cpp/core/NodeImpl.cpp +++ b/sustainml_cpp/src/cpp/core/NodeImpl.cpp @@ -23,8 +23,9 @@ #include #include -#include #include +#include +#include #include #include @@ -83,9 +84,28 @@ NodeImpl::NodeImpl( , control_listener_(this) , req_res_listener_(req_res_listener) { + req_res_ = new RequestReplier([this, name](void* input) + { + RequestTypeImpl* in = static_cast(input); + types::RequestType req; + req = in; + std::cout << "Size of request received: " << sizeof(req) << " bytes" << std::endl; + std::cout << "Configuration received: " << static_cast(input)->configuration() << std::endl; + std::cout << "Node ID " << static_cast(common::get_node_id_from_name(name)) + << " receive request for node_id " << req.node_id() << std::endl; + if (req.node_id() == static_cast(common::get_node_id_from_name(name))) + { + std::cout << "Request successfully receive in " << name << std::endl; + std::cout << "Configuring..." << std::endl; + types::ResponseType res; + req_res_listener_.on_configuration_request(req, res); + req_res_->write_res(res.get_impl()); + } + }, "sustainml/response", "sustainml/request"); + if (!init(name)) { - EPROSIMA_LOG_ERROR(NODE, "Initialization Failed with the provided Options"); + EPROSIMA_LOG_ERROR(NODE, "Initialization Failed"); } } @@ -102,6 +122,25 @@ NodeImpl::NodeImpl( , control_listener_(this) , req_res_listener_(req_res_listener) { + req_res_ = new RequestReplier([this, name](void* input) + { + RequestTypeImpl* in = static_cast(input); + types::RequestType req; + req = in; + std::cout << "Size of request received: " << sizeof(req) << " bytes" << std::endl; + std::cout << "Configuration received: " << static_cast(input)->configuration() << std::endl; + std::cout << "Node ID " << static_cast(common::get_node_id_from_name(name)) + << " receive request for node_id " << req.node_id() << std::endl; + if (req.node_id() == static_cast(common::get_node_id_from_name(name))) + { + std::cout << "Request successfully receive in " << name << std::endl; + std::cout << "Configuring..." << std::endl; + types::ResponseType res; + req_res_listener_.on_configuration_request(req, res); + req_res_->write_res(res.get_impl()); + } + }, "sustainml/response", "sustainml/request"); + if (!init(name, opts)) { EPROSIMA_LOG_ERROR(NODE, "Initialization Failed with the provided Options"); diff --git a/sustainml_cpp/src/cpp/core/NodeImpl.hpp b/sustainml_cpp/src/cpp/core/NodeImpl.hpp index 3ece340..5b9c2b3 100644 --- a/sustainml_cpp/src/cpp/core/NodeImpl.hpp +++ b/sustainml_cpp/src/cpp/core/NodeImpl.hpp @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -53,7 +54,6 @@ namespace core { class Dispatcher; class Node; -class RequestReplyListener; struct Options; /** @@ -166,6 +166,8 @@ class NodeImpl NodeStatusImpl node_status_; + RequestReplier* req_res_; + RequestReplyListener& req_res_listener_; private: diff --git a/sustainml_cpp/src/cpp/core/RequestReplier.cpp b/sustainml_cpp/src/cpp/core/RequestReplier.cpp new file mode 100644 index 0000000..50e2f18 --- /dev/null +++ b/sustainml_cpp/src/cpp/core/RequestReplier.cpp @@ -0,0 +1,188 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file RequestReplier.cpp + */ + +#include + +#include +#include + +using namespace eprosima::fastdds::dds; + +namespace sustainml { +namespace core { + +RequestReplier::RequestReplier( + std::function callback, + const char* topicw, + const char* topicr) + : callback_(callback) + , topicw_(topicw) + , topicr_(topicr) + , participant_(nullptr) + , typeRes_(new ResponseTypeImplPubSubType()) + , typeReq_(new RequestTypeImplPubSubType()) + , publisher_(nullptr) + , subscriber_(nullptr) + , listener_(this) +{ + auto dpf = DomainParticipantFactory::get_instance(); + + DomainParticipantQos pqos; + + // Create a DomainParticipant + participant_ = dpf->create_participant(0, pqos); + + subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT); + + publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT); + + // Register the type + typeRes_.register_type(participant_); + typeReq_.register_type(participant_); + + // Create a Topics + if(std::string(topicw_) == "sustainml/request") + { + topicR_= participant_->create_topic(topicr_, typeRes_.get_type_name(), TOPIC_QOS_DEFAULT); + topicW_= participant_->create_topic(topicw_, typeReq_.get_type_name(), TOPIC_QOS_DEFAULT); + } + else + { + topicR_= participant_->create_topic(topicr_, typeReq_.get_type_name(), TOPIC_QOS_DEFAULT); + topicW_= participant_->create_topic(topicw_, typeRes_.get_type_name(), TOPIC_QOS_DEFAULT); + } + + // Configure DataReader QoS + DataReaderQos rqos = DATAREADER_QOS_DEFAULT; + rqos.reliability().kind = RELIABLE_RELIABILITY_QOS; + rqos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS; + rqos.history().kind = KEEP_LAST_HISTORY_QOS; + rqos.history().depth = 1; + + // Create a DataReader + reader_ = subscriber_->create_datareader(topicR_, rqos, &listener_); + + // Configure DataWriter QoS + DataWriterQos wqos = DATAWRITER_QOS_DEFAULT; + wqos.reliability().kind = RELIABLE_RELIABILITY_QOS; + wqos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS; + wqos.history().kind = KEEP_LAST_HISTORY_QOS; + wqos.history().depth = 1; + + // Create a DataWriter + writer_ = publisher_->create_datawriter(topicW_, wqos); +} + +RequestReplier::~RequestReplier() +{ + +} + +void RequestReplier::write_res( + ResponseTypeImpl* res) +{ + writer_->write(res); +} + +void RequestReplier::write_req( + RequestTypeImpl* req) +{ + writer_->write(req); +} + +RequestReplier::RequestReplyControlListener::RequestReplyControlListener( + RequestReplier* node) + : node_(node) +{ + +} + +RequestReplier::RequestReplyControlListener::~RequestReplyControlListener() +{ + +} + +void RequestReplier::RequestReplyControlListener::on_data_available( + eprosima::fastdds::dds::DataReader* reader) +{ + // Create a data and SampleInfo instance + SampleInfo info; + void* data; + + if (reader->get_topicdescription()->get_type_name() == node_->typeReq_.get_type_name()) + { + data = &req_; + + } + else if (reader->get_topicdescription()->get_type_name() == node_->typeRes_.get_type_name()) + { + data = &res_; + } + else + { + return; + } + + // Keep taking data until there is nothing to take + while (reader->take_next_sample(data, &info) == RETCODE_OK) + { + if (info.valid_data) + { + std::cout << "Received new data value for topic " + << reader->get_topicdescription()->get_name() + << " with type " + << reader->get_topicdescription()->get_type_name() + << std::endl; + node_->callback_(data); + } + else + { + std::cout << "Remote writer for topic " + << reader->get_topicdescription()->get_name() + << " is dead" << std::endl; + } + } +} + +void RequestReplier::RequestReplyControlListener::on_subscription_matched( + eprosima::fastdds::dds::DataReader* reader, + const eprosima::fastdds::dds::SubscriptionMatchedStatus& status) +{ + + // New remote DataWriter discovered + if (status.current_count_change == 1) + { + matched_ = status.current_count; + std::cout << "Subscriber matched." << std::endl; + } + // New remote DataWriter undiscovered + else if (status.current_count_change == -1) + { + matched_ = status.current_count; + std::cout << "Subscriber unmatched." << std::endl; + } + // Non-valid option + else + { + std::cout << status.current_count_change + << " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl; + } +} + +} // namespace core +} // namespace sustainml diff --git a/sustainml_cpp/src/cpp/orchestrator/OrchestratorNode.cpp b/sustainml_cpp/src/cpp/orchestrator/OrchestratorNode.cpp index 8892ddc..04e3ee9 100644 --- a/sustainml_cpp/src/cpp/orchestrator/OrchestratorNode.cpp +++ b/sustainml_cpp/src/cpp/orchestrator/OrchestratorNode.cpp @@ -16,12 +16,14 @@ * @file OrchestratorNode.cpp */ +#include #include #include "ModuleNodeProxyFactory.hpp" #include "TaskDB.ipp" #include +#include #include #include #include @@ -113,6 +115,21 @@ OrchestratorNode::OrchestratorNode( task_man_(new TaskManager()), participant_listener_(new OrchestratorParticipantListener(this)) { + req_res_ = new core::RequestReplier([this](void* input) + { + ResponseTypeImpl* in = static_cast(input); + types::ResponseType res; + res = in; + std::cout << "Size of response received: " << sizeof(res) << " bytes" << std::endl; + std::cout << "Configuration received: " << static_cast(input)->configuration() << std::endl; + std::cout << "Response receive response from node_id " << res.node_id() << std::endl; + { + std::lock_guard lock(this->mtx_); + this->res_ = res; + } + this->cv_.notify_all(); + }, "sustainml/request", "sustainml/response"); + if (!init()) { EPROSIMA_LOG_ERROR(ORCHESTRATOR, "Orchestrator initialization Failed"); @@ -482,11 +499,19 @@ void OrchestratorNode::send_control_command( } bool OrchestratorNode::configuration_request( - const types::RequestType req, - const types::ResponseType& res) + types::RequestType req, + types::ResponseType& res) { //TODO: Implement this method - return true; + req_res_->write_req(req.get_impl()); + std::unique_lock lck(mtx_); + cv_.wait(lck, [this, &req, &res] + { + return res_.node_id() == req.node_id() && res_.transaction_id() == req.transaction_id(); + }); + res = res_; + res_ = types::ResponseType(); + return res.success(); } void OrchestratorNode::spin() diff --git a/sustainml_cpp/src/cpp/types/types.cpp b/sustainml_cpp/src/cpp/types/types.cpp index a8561ba..7a02939 100644 --- a/sustainml_cpp/src/cpp/types/types.cpp +++ b/sustainml_cpp/src/cpp/types/types.cpp @@ -2170,6 +2170,15 @@ RequestType& RequestType::operator =( return *this; } +RequestType& RequestType::operator =( + const RequestTypeImpl* x) +{ + this->impl_->node_id() = x->node_id(); + this->impl_->transaction_id() = x->transaction_id(); + this->impl_->configuration() = x->configuration(); + return *this; +} + bool RequestType::operator ==( const RequestType& x) const { @@ -2183,7 +2192,8 @@ bool RequestType::operator !=( } void RequestType::node_id( - int32_t _node_id){ + int32_t _node_id) +{ impl_->node_id(_node_id); } @@ -2198,7 +2208,8 @@ int32_t& RequestType::node_id() } void RequestType::transaction_id( - int32_t _transaction_id){ + int32_t _transaction_id) +{ impl_->transaction_id(_transaction_id); } @@ -2270,14 +2281,14 @@ ResponseType::ResponseType( } ResponseType::ResponseType( - ResponseType&& x) noexcept + ResponseType&& x) noexcept { this->impl_ = x.impl_; x.impl_ = nullptr; } ResponseType& ResponseType::operator =( - const ResponseType& x) + const ResponseType& x) { this->impl_->node_id() = x.impl_->node_id(); this->impl_->transaction_id() = x.impl_->transaction_id(); @@ -2288,32 +2299,43 @@ ResponseType& ResponseType::operator =( } ResponseType& ResponseType::operator =( - ResponseType&& x) noexcept + ResponseType&& x) noexcept { if (x.impl_ != this->impl_) { - delete this->impl_; - this->impl_ = x.impl_; - x.impl_ = nullptr; + delete this->impl_; + this->impl_ = x.impl_; + x.impl_ = nullptr; } return *this; } +ResponseType& ResponseType::operator =( + const ResponseTypeImpl* x) +{ + this->impl_->node_id() = x->node_id(); + this->impl_->transaction_id() = x->transaction_id(); + this->impl_->success() = x->success(); + this->impl_->err_code() = x->err_code(); + this->impl_->configuration() = x->configuration(); + return *this; +} + bool ResponseType::operator ==( - const ResponseType& x) const + const ResponseType& x) const { return (this->impl_ == x.impl_); } bool ResponseType::operator !=( - const ResponseType& x) const + const ResponseType& x) const { return !(*this == x); } void ResponseType::node_id( - int32_t _node_id) + int32_t _node_id) { impl_->node_id(_node_id); } @@ -2329,7 +2351,7 @@ int32_t& ResponseType::node_id() } void ResponseType::transaction_id( - int32_t _transaction_id) + int32_t _transaction_id) { impl_->transaction_id(_transaction_id); } @@ -2345,7 +2367,7 @@ int32_t& ResponseType::transaction_id() } void ResponseType::success( - bool _success) + bool _success) { impl_->success(_success); } @@ -2361,7 +2383,7 @@ bool& ResponseType::success() } void ResponseType::err_code( - ErrorCode _err_code) + ErrorCode _err_code) { impl_->err_code(_err_code); } @@ -2377,13 +2399,13 @@ ErrorCode& ResponseType::err_code() } void ResponseType::configuration( - const std::string& _configuration) + const std::string& _configuration) { impl_->configuration(_configuration); } void ResponseType::configuration( - std::string&& _configuration) + std::string&& _configuration) { impl_->configuration(std::forward(_configuration)); }