Skip to content

Commit

Permalink
Services added on swig, py and module with examples and test
Browse files Browse the repository at this point in the history
Signed-off-by: Javier Gil Aviles <javiergil@eprosima.com>
  • Loading branch information
Javgilavi committed Jan 27, 2025
1 parent 0cd2a24 commit 553df20
Show file tree
Hide file tree
Showing 26 changed files with 556 additions and 48 deletions.
12 changes: 6 additions & 6 deletions sustainml_modules/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ configure_project_cpp()
# C++ Project
###############################################################################
# Update submodules
message(STATUS "Updating submodules")
execute_process(
COMMAND git submodule update --recursive --init
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
RESULT_VARIABLE EXECUTE_RESULT
)
# message(STATUS "Updating submodules")
# execute_process(
# COMMAND git submodule update --recursive --init
# WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
# RESULT_VARIABLE EXECUTE_RESULT
# )

###############################################################################
# Test
Expand Down
2 changes: 1 addition & 1 deletion sustainml_modules/sustainml_modules/sustainml-wp2
2 changes: 1 addition & 1 deletion sustainml_modules/sustainml_modules/sustainml-wp3
10 changes: 10 additions & 0 deletions sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ def user_input():
return jsonify({'message': 'User input data sent successfully.',
'task_id': utils.task_json(task_id)}), 200

# Handle configuration request from the front-end
@server.route('/config_request', methods=['POST'])
def config_request():
data = request.json
res = orchestrator.send_request(data)
if res is None:
return jsonify({'error': 'Invalid input data'}), 400
return jsonify({'message': 'Configuration request sent successfully.',
'response': utils.response_json(res)}), 200

# Retrieve Node status methods
@server.route('/status', methods=['GET'])
def status():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,18 @@ def send_user_input(self, json_data):
return task_id
else:
return None

def send_request(self, json_data):
request_type = sustainml_swig.RequestType()
request_type.node_id(json_data.get('node_id'))
request_type.transaction_id(json_data.get('transaction_id'))
request_type.configuration(json_data.get('configuration'))

if request_type.node_id() is None or request_type.transaction_id() is None or request_type.configuration() is None:
return None

res = self.node_.configuration_request(request_type)
if res.success:
return res
else:
return None
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,12 @@ def json_dict(dict):

def dict_from_json(json_obj):
return json.loads(json_obj)

def response_json(response):
return {
"node_id": response.node_id(),
"transaction_id": response.transaction_id(),
"success": response.success(),
"err_code": response.err_code(),
"configuration": response.configuration()
}
13 changes: 13 additions & 0 deletions sustainml_modules/test/communication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/chained_nodes_validator.py
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/SimpleOrchestratorNode.py
${CMAKE_CURRENT_BINARY_DIR}/SimpleOrchestratorNode.py COPYONLY)

configure_file(${CMAKE_CURRENT_SOURCE_DIR}/RequestOrchestratorNode.py
${CMAKE_CURRENT_BINARY_DIR}/RequestOrchestratorNode.py COPYONLY)

if (Python3_FOUND)

add_test(NAME SimpleCommunicationSixNodesChainedPython
Expand Down Expand Up @@ -90,4 +93,14 @@ if (Python3_FOUND)
-ap app_requirements_node.py
-py-orc SimpleOrchestratorNode.py)

add_test(NAME SimpleServicePythonOrchestratorSixNodesPython
COMMAND ${Python3_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/chained_nodes_validator.py
-mlm ml_model_metadata_node.py
-ml ml_model_provider_node.py
-hw hw_resources_provider_node.py
-co2 carbon_footprint_node.py
-hwc hw_constraints_node.py
-ap app_requirements_node.py
-py-orc RequestOrchestratorNode.py)

endif()
159 changes: 159 additions & 0 deletions sustainml_modules/test/communication/RequestOrchestratorNode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SustainML Client Node API specification."""

import threading

from sustainml_swig import OrchestratorNodeHandle as cpp_OrchestratorNodeHandle
from sustainml_swig import OrchestratorNode as cpp_OrchestratorNode

from sustainml_swig import NodeStatus, RequestType, ResponseType

class OrchestratorNodeHandle(cpp_OrchestratorNodeHandle):

def __init__(self):
self.node_data_received_ = {}
# Condition variable
self.cv_ = threading.Condition()

# Parent class constructor
super().__init__()

# Callback
def on_node_status_change(
self,
id: int,
status : NodeStatus):

if id not in self.node_data_received_:
self.node_data_received_[id] = (0, 0)

self.node_data_received_[id] = (status.node_status(), self.node_data_received_[id][1])

self.cv_.acquire()
self.cv_.notify()
self.cv_.release()

# Callback
def on_new_node_output(
self,
id : int,
data):

if id not in self.node_data_received_:
self.node_data_received_[id] = (0, 0)

self.node_data_received_[id] = (self.node_data_received_[id][0], 1 + self.node_data_received_[id][1])

self.cv_.acquire()
self.cv_.notify()
self.cv_.release()

print("NodeOutput received ", id)

def prepare_expected_data(
self,
expected_data):

self.expected_data_ = expected_data

def check_data(
self):
for key, value in self.node_data_received_.items():
print(f"Arrived {key} {value[0]} {value[1]}")

for key, value in self.expected_data_.items():
print(f"Expected {key} {value[0]} {value[1]}")

return self.expected_data_ == self.node_data_received_

def wait_for_data(
self):
print("Waiting for data")
with self.cv_:
while not self.check_data():
print("Waiting for data...")
self.cv_.wait()
print("Data received")

# Proxy class to instantiate by the user
class OrchestratorNode:

def __init__(self):

self.handler_ = OrchestratorNodeHandle()
self.node_ = cpp_OrchestratorNode(self.handler_)

# Proxy method to run the node
def spin(self):

self.node_.spin()

# Proxy method to manually terminate
def terminate(self):

self.node_.terminate()
self.node_.destroy()

def send_request(self, json_data):
request_type = RequestType()
request_type.node_id(json_data.get('node_id'))
request_type.transaction_id(json_data.get('transaction_id'))
request_type.configuration(json_data.get('configuration'))

if request_type.node_id() is None or request_type.transaction_id() is None or request_type.configuration() is None:
return None
res = self.node_.configuration_request(request_type)
if res.success:
return res
else:
return None

# Call main in program execution
if __name__ == '__main__':

node = OrchestratorNode()
print("Node created")
res = node.send_request({
"node_id": 0,
"transaction_id": 1,
"configuration": "dummy1"
})
print(f"Response: {res.success()}")
res = node.send_request({
"node_id": 1,
"transaction_id": 1,
"configuration": "dummy2"
})
res = node.send_request({
"node_id": 2,
"transaction_id": 1,
"configuration": "dummy3"
})
res = node.send_request({
"node_id": 3,
"transaction_id": 1,
"configuration": "dummy4"
})
res = node.send_request({
"node_id": 4,
"transaction_id": 1,
"configuration": "dummy5"
})
res = node.send_request({
"node_id": 5,
"transaction_id": 1,
"configuration": "dummy6"
})
node.terminate()
14 changes: 13 additions & 1 deletion sustainml_py/examples/app_requirements_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""SustainML Example Python Node API."""

from sustainml_py.nodes.AppRequirementsNode import AppRequirementsNode
from sustainml_swig import ErrorCode

# Manage signaling
import signal
Expand All @@ -40,9 +41,20 @@ def task_callback(user_input, node_status, app_requirements):
app_requirements.app_requirements().append("New")
app_requirements.app_requirements().append("Requirement")

# User Configuration Callback implementation
# Inputs: req
# Outputs: res
def configuration_callback(req, res):
print (req.configuration())
res.node_id(req.node_id())
res.transaction_id(req.transaction_id())
res.configuration(req.configuration())
res.success(True)
res.err_code(ErrorCode.NO_ERROR)

# Main workflow routine
def run():
node = AppRequirementsNode(callback=task_callback)
node = AppRequirementsNode(callback=task_callback, service_callback=configuration_callback)
global running
running = True
node.spin()
Expand Down
15 changes: 14 additions & 1 deletion sustainml_py/examples/co2_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""SustainML Example Python Node API."""

from sustainml_py.nodes.CarbonFootprintNode import CarbonFootprintNode
from sustainml_swig import ErrorCode

# Manage signaling
import signal
Expand All @@ -40,11 +41,23 @@ def task_callback(ml_model, user_input, hw, node_status, co2):
print (node_status.node_status())
co2.carbon_intensity(4)

# User Configuration Callback implementation
# Inputs: req
# Outputs: res
def configuration_callback(req, res):
print (req.configuration())
res.node_id(req.node_id())
res.transaction_id(req.transaction_id())
res.configuration(req.configuration())
res.success(True)
res.err_code(ErrorCode.NO_ERROR)


# Main workflow routine
def run():
global running
running = True
node = CarbonFootprintNode(callback=task_callback)
node = CarbonFootprintNode(callback=task_callback, service_callback=configuration_callback)
node.spin()

# Call main in program execution
Expand Down
15 changes: 14 additions & 1 deletion sustainml_py/examples/hw_constratins_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"""SustainML Example Python Node API."""

from sustainml_py.nodes.HardwareConstraintsNode import HardwareConstraintsNode
from sustainml_swig import ErrorCode


# Manage signaling
import signal
Expand All @@ -38,9 +40,20 @@ def task_callback(user_input, node_status, hw_constraints):
hw_constraints.max_memory_footprint(100)
hw_constraints.hardware_required(['PIM_AI_1chip'])

# User Configuration Callback implementation
# Inputs: req
# Outputs: res
def configuration_callback(req, res):
print (req.configuration())
res.node_id(req.node_id())
res.transaction_id(req.transaction_id())
res.configuration(req.configuration())
res.success(True)
res.err_code(ErrorCode.NO_ERROR)

# Main workflow routine
def run():
node = HardwareConstraintsNode(callback=task_callback)
node = HardwareConstraintsNode(callback=task_callback, service_callback=configuration_callback)
global running
running = True
node.spin()
Expand Down
15 changes: 14 additions & 1 deletion sustainml_py/examples/hw_resources_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"""SustainML Example Python Node API."""

from sustainml_py.nodes.HardwareResourcesNode import HardwareResourcesNode
from sustainml_swig import ErrorCode


# Manage signaling
import signal
Expand All @@ -37,9 +39,20 @@ def task_callback(ml_model, app_requirements, hw_constraints, node_status, hw):
print (ml_model.model())
hw.hw_description("This is a HW description")

# User Configuration Callback implementation
# Inputs: req
# Outputs: res
def configuration_callback(req, res):
print (req.configuration())
res.node_id(req.node_id())
res.transaction_id(req.transaction_id())
res.configuration(req.configuration())
res.success(True)
res.err_code(ErrorCode.NO_ERROR)

# Main workflow routine
def run():
node = HardwareResourcesNode(callback=task_callback)
node = HardwareResourcesNode(callback=task_callback, service_callback=configuration_callback)
global running
running = True
node.spin()
Expand Down
Loading

0 comments on commit 553df20

Please sign in to comment.