Skip to content

Commit

Permalink
Add a localhost server to the back-end node (#55)
Browse files Browse the repository at this point in the history
* Refs #21816: Sort requirements list alphabetically

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>

* Refs #21816: Add Flask dependency in requirements

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>

* Refs #21816: Add utility methods to retrieve information easily

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>

* Refs #21816: Include orchestrator node as a python package

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>

* Refs #21816: Update back-end node to create a Flask server with the orchestrator

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>

* Refs #21816: Address partial revision

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>

* Refs #21816: Retrieve data from orchestrator with interface helpers

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>

* Refs #21816: Adapt node module to allow data query

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>

* Refs #21816: Update WP1 submodule with NIT PR

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>

---------

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>
  • Loading branch information
JesusPoderoso authored Oct 14, 2024
1 parent 441377f commit ab2285c
Show file tree
Hide file tree
Showing 8 changed files with 537 additions and 46 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Ignore pycache Python dirs
__pycache__/
19 changes: 10 additions & 9 deletions sustainml_modules/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
PyQt5>=5.15.10
rdflib>=7.0.0
torch>=2.1.2+cu121
numpy>=1.24.1
Pillow>=9.3.0
opencv-python>=4.9.0.80
transformers>=4.37.0
flask
flask-socketio==5.2.0
networkx>=3.0
numpy>=1.24.1
ollama
optimum
onnx
onnxruntime
rdflib
opencv-python>=4.9.0.80
optimum
Pillow>=9.3.0
PyQt5>=5.15.10
rdflib>=7.0.0
timm
torch>=2.1.2+cu121
transformers>=4.37.0
ultralytics
2 changes: 1 addition & 1 deletion sustainml_modules/sustainml_modules/sustainml-wp1
134 changes: 99 additions & 35 deletions sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,51 +13,115 @@
# limitations under the License.
"""SustainML Backend Node Implementation."""

from sustainml_py.nodes.OrchestratorNode import OrchestratorNode

# Manage signaling
import signal
from flask import Flask, request, jsonify
import threading
import time
import signal
import sys
from orchestrator_node import orchestrator_node, utils
from werkzeug.serving import make_server

# Whether to go on spinning or interrupt
running = False
running = True
orchestrator = orchestrator_node.Orchestrator()
server = Flask(__name__)
server_ip_address = '127.0.0.1'
server_port = 5000

# Signal handler
def signal_handler(sig, frame):
print("\nExiting")
OrchestratorNode.terminate()
global running
running = False
# Flask server default route
@server.route('/')
def hello_world():
return 'Hello world! Use "/terminate" route to stop Back-end node.\n'

def new_output(id, data):
print("New output")
@server.route('/status', methods=['GET'])
def status():
return orchestrator.get_all_status()

def status_change(id, status):
# NodeID_ID_APP_REQUIREMENTS are the kind of ids
print("Status change")
@server.route('/status', methods=['POST'])
def status_args():
data = request.json
node_id = data.get('node_id')
return jsonify({'status': f'{orchestrator.get_status(node_id)}'}), 200

# Main workflow routine
def run():
global running
running = True
@server.route('/results', methods=['GET'])
def results():
last_task_id = orchestrator.get_last_task_id()
if last_task_id is None:
return 'Nodes have not reported any output yet.\n', 200
app_req = orchestrator.get_results(utils.node_id.APP_REQUIREMENTS.value, last_task_id)
metadata = orchestrator.get_results(utils.node_id.ML_MODEL_METADATA.value, last_task_id)
constraints = orchestrator.get_results(utils.node_id.HW_CONSTRAINTS.value, last_task_id)
model = orchestrator.get_results(utils.node_id.ML_MODEL_PROVIDER.value, last_task_id)
hardware = orchestrator.get_results(utils.node_id.HW_PROVIDER.value, last_task_id)
carbontracker = orchestrator.get_results(utils.node_id.CARBONTRACKER.value, last_task_id)
json = {f'{utils.string_node(utils.node_id.APP_REQUIREMENTS.value)}': f'{app_req}',
f'{utils.string_node(utils.node_id.ML_MODEL_METADATA.value)}': f'{metadata}',
f'{utils.string_node(utils.node_id.HW_CONSTRAINTS.value)}': f'{constraints}',
f'{utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)}': f'{model}',
f'{utils.string_node(utils.node_id.HW_PROVIDER.value)}': f'{hardware}',
f'{utils.string_node(utils.node_id.CARBONTRACKER.value)}': f'{carbontracker}'}
return jsonify(json), 200

node = OrchestratorNode(
new_node_output=new_output,
node_status_change=status_change)
node.spin()
@server.route('/results', methods=['POST'])
def results_args():
data = request.json
node_id = data.get('node_id')
task_id = data.get('task_id')
return jsonify({f'{utils.string_node(node_id)}': f'{orchestrator.get_results(node_id, task_id)}'}), 200

# Call main in program execution
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)

"""Python does not process signals async if
the main thread is blocked (spin()) so, tun
user work flow in another thread """
runner = threading.Thread(target=run)
runner.start()
# Flask server shutdown route
@server.route('/shutdown', methods=['GET'])
def shutdown():
shutdown_func = request.environ.get('werkzeug.server.shutdown')
if shutdown_func is None:
return 'Use "/terminate" route to stop Back-end node.\n'
shutdown_func()
return 'Terminating...\n'

# Flask server terminate route
@server.route('/terminate', methods=['GET'])
def terminate():
signal.raise_signal(signal.SIGINT)
return 'Terminating...\n'

class ServerThread(threading.Thread):
def __init__(self):
# Create orchestrator node
self.orchestrator_thread = threading.Thread(target=orchestrator.run)
# Create Flask server
threading.Thread.__init__(self)
self.srv = make_server(server_ip_address, server_port, server)
self.ctx = server.app_context()
self.ctx.push()

def run(self):
# Start the orchestrator thread
self.orchestrator_thread.start()
# Start the Flask server
self.srv.serve_forever()

def shutdown(self):
# Terminate the orchestrator and its thread
orchestrator.terminate()
# Terminate the Flask server
self.srv.shutdown()
# Wait for the orchestrator thread to finish
self.orchestrator_thread.join()

# Process signal handler
def signal_handler(sig, frame):
print("\nBack-end node terminating...")
flask_server_thread.shutdown()
global running
running = False

# Main program execution
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)
print("Back-end Node running, use Ctrl+C to terminate. Server listening at http://" + server_ip_address + ":" + str(server_port) + "/")
flask_server_thread = ServerThread()
flask_server_thread.start() # Start the Flask server with the orchestrator
while running:
time.sleep(1)

runner.join()
flask_server_thread.join()
sys.exit(0)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SustainML Orchestrator Node API specification."""

from . import utils

from sustainml_swig import OrchestratorNodeHandle as cpp_OrchestratorNodeHandle
from sustainml_swig import OrchestratorNode as cpp_OrchestratorNode
from sustainml_swig import NodeStatus
import sustainml_swig

class OrchestratorNodeHandle(cpp_OrchestratorNodeHandle):

def __init__(self):

self.node_status_ = {}
self.last_task_id = None
# Parent class constructor
super().__init__()

# Callback
def on_node_status_change(
self,
id: int,
status : NodeStatus):

if id not in self.node_status_:
self.node_status_[id] = utils.node_status.INACTIVE.value

self.node_status_[id] = status.node_status()
print(utils.string_node(id), "node status", utils.string_status(status.node_status()), "received.")

# Callback
def on_new_node_output(
self,
id : int,
data):
task = sustainml_swig.get_task_id(id, data)
if (self.last_task_id is None and task is not None) or (
self.last_task_id is not None and task is not None and task > self.last_task_id):
self.last_task_id = task
if task is None:
print(utils.string_node(id), "node output received.")
else:
print(utils.string_node(id), "node output received from task", utils.string_task(task))

class Orchestrator:

def __init__(self):

self.handler_ = OrchestratorNodeHandle()
self.node_ = cpp_OrchestratorNode(self.handler_)

# Proxy method to run the node
def run(self):

self.node_.spin()

# Proxy method to manually terminate
def terminate(self):

self.node_.terminate()

def get_last_task_id(self):
return self.handler_.last_task_id

def get_all_status(self):
output = ""
for key, value in self.handler_.node_status_.items():
output += utils.string_node(key) + " node status " + utils.string_status(value) + "\n"
if output == "":
output = "No nodes have reported their status yet.\n"
return output

def get_status(self, node_id):
if node_id in self.handler_.node_status_:
return utils.string_status(self.handler_.node_status_[node_id])
else:
return utils.string_status(utils.node_status.INACTIVE.value)

def get_app_requirements(self, task_id):
# retrieve node data
node_data = sustainml_swig.get_app_requirements(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.APP_REQUIREMENTS.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
app_requirements_str_list = node_data.app_requirements()
json_output = {'app_requirements': f'{utils.string_std_vector(app_requirements_str_list)}'}
return json_output

def get_model_metadata(self, task_id):
# retrieve node data
node_data = sustainml_swig.get_model_metadata(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_METADATA.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
keywords_str_list = node_data.keywords()
metadata_str_list = node_data.ml_model_metadata()
json_output = {'keywords': f'{utils.string_std_vector(keywords_str_list)}',
'metadata': f'{utils.string_std_vector(metadata_str_list)}'}
return json_output

def get_hw_constraints(self, task_id):
# retrieve node data
node_data = sustainml_swig.get_hw_constraints(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_CONSTRAINTS.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
max_value = node_data.max_memory_footprint()
json_output = {'max_memory_footprint': f'{max_value}'}
return json_output

def get_ml_model_provider(self, task_id):
# retrieve node data
node_data = sustainml_swig.get_model_provider(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
model = node_data.model()
model_path = node_data.model_path()
model_properties = node_data.model_properties()
model_properties_path = node_data.model_properties_path()
input_batch = node_data.input_batch()
target_latency = node_data.target_latency()
json_output = {'model': f'{model}',
'model_path': f'{model_path}',
'model_properties': f'{model_properties}',
'model_properties_path': f'{model_properties_path}',
'input_batch': f'{utils.string_std_vector(input_batch)}',
'target_latency': f'{target_latency}'}
return json_output

def get_hw_provider(self, task_id):
# retrieve node data
node_data = sustainml_swig.get_hw_provider(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_PROVIDER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
hw_description = node_data.hw_description()
power_consumption = node_data.power_consumption()
latency = node_data.latency()
memory_footprint_of_ml_model = node_data.memory_footprint_of_ml_model()
json_output = {'hw_description': f'{hw_description}',
'power_consumption': f'{power_consumption}',
'latency': f'{latency}',
'memory_footprint_of_ml_model': f'{memory_footprint_of_ml_model}'}
return json_output

def get_carbontracker(self, task_id):
# retrieve node data
node_data = sustainml_swig.get_carbontracker(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.CARBONTRACKER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
carbon_footprint = node_data.carbon_footprint()
energy_consumption = node_data.energy_consumption()
carbon_intensity = node_data.carbon_intensity()
json_output = {'carbon_footprint': f'{carbon_footprint}',
'energy_consumption': f'{energy_consumption}',
'carbon_intensity': f'{carbon_intensity}'}
return json_output

def get_results(self, node_id, task_id):
if node_id == utils.node_id.APP_REQUIREMENTS.value:
return self.get_app_requirements(task_id)
elif node_id == utils.node_id.ML_MODEL_METADATA.value:
return self.get_model_metadata(task_id)
elif node_id == utils.node_id.HW_CONSTRAINTS.value:
return self.get_hw_constraints(task_id)
elif node_id == utils.node_id.ML_MODEL_PROVIDER.value:
return self.get_ml_model_provider(task_id)
elif node_id == utils.node_id.HW_PROVIDER.value:
return self.get_hw_provider(task_id)
elif node_id == utils.node_id.CARBONTRACKER.value:
return self.get_carbontracker(task_id)
else:
return utils.string_node(node_id) + " node does not have any results to show."
Loading

0 comments on commit ab2285c

Please sign in to comment.