Skip to content

Commit

Permalink
Merge pull request #324 from UCL-CCS/actions-rework
Browse files Browse the repository at this point in the history
The rework of the actions system
  • Loading branch information
orbitfold authored Mar 26, 2021
2 parents 4d95f4e + c3bff38 commit 5d503eb
Show file tree
Hide file tree
Showing 40 changed files with 1,387 additions and 1,605 deletions.
5 changes: 3 additions & 2 deletions easyvvuq/actions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .base import BaseAction
from .execute_local import ExecuteLocal, ExecuteLocalV2, ExecutePython
from .execute_local import ExecuteLocal, ExecutePython, CreateRunDirectory, Encode, Decode
from .execute_local import CleanUp, Actions
from .execute_kubernetes import ExecuteKubernetes
from .execute_slurm import ExecuteSLURM
from .action_statuses import ActionStatuses
from .action_statuses import ActionPool

__copyright__ = """
Expand Down
75 changes: 35 additions & 40 deletions easyvvuq/actions/action_statuses.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
import copy

__copyright__ = """
Expand All @@ -24,7 +24,7 @@
__license__ = "LGPL"


class ActionStatuses:
class ActionPool:
"""A class that tracks statuses of a list of actions.
Parameters
Expand All @@ -37,37 +37,31 @@ class ActionStatuses:
"""

def __init__(self, statuses, batch_size=8, poll_sleep_time=1):
self.statuses = list(statuses)
self.actions = []
self.poll_sleep_time = poll_sleep_time
self.pool = ThreadPoolExecutor(batch_size)
def __init__(self, campaign, actions, inits, max_workers=None, sequential=False):
self.campaign = campaign
self.actions = actions
self.inits = inits
self.max_workers = max_workers
self.sequential = sequential
self.futures = []
self.results = []

def job_handler(self, status):
"""Will handle the execution of this action status.
Parameters
----------
status: ActionStatus
ActionStatus of an action to be executed.
"""
status.start()
while not status.finished():
time.sleep(self.poll_sleep_time)
if status.succeeded():
status.finalise()
return True
else:
return False

def start(self):
def start(self, executor=ThreadPoolExecutor):
"""Start the actions.
Returns
-------
A list of Python futures represending action execution.
"""
self.actions = [self.pool.submit(self.job_handler, status) for status in self.statuses]
self.pool = executor(max_workers=self.max_workers)
for previous in self.inits:
previous = copy.copy(previous)
if self.sequential:
result = self.actions.start(previous)
self.results.append(result)
else:
future = self.pool.submit(self.actions.start, previous)
self.futures.append(future)
return self

def progress(self):
Expand All @@ -81,25 +75,26 @@ def progress(self):
running = 0
done = 0
failed = 0
for action in self.actions:
if action.running():
for future in self.futures:
if future.running():
running += 1
elif action.done():
if not action.result():
elif future.done():
if not future.result():
failed += 1
else:
done += 1
else:
ready += 1
return {'ready': ready, 'active': running, 'finished': done, 'failed': failed}

def wait(self, poll_interval=1):
"""A command that will automatically poll job statuses. For use in scripts.
Parameters
----------
poll_interval: int
Polling interval in seconds.
def collate(self):
"""A command that will block untill all futures in the pool have finished.
"""
while self.progress()['ready'] > 0:
time.sleep(poll_interval)
if self.sequential:
for result in self.results:
self.campaign.campaign_db.store_result(result['run_id'], result)
else:
for future in as_completed(self.futures):
result = future.result()
self.campaign.campaign_db.store_result(result['run_id'], result)
self.campaign.campaign_db.session.commit()
133 changes: 47 additions & 86 deletions easyvvuq/actions/execute_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import logging
import uuid
import copy
import time

from kubernetes.client.api import core_v1_api
from kubernetes import config
from kubernetes.client import V1ConfigMap, V1ObjectMeta
Expand Down Expand Up @@ -54,7 +56,7 @@
logger = logging.getLogger(__name__)


class ActionStatusKubernetes():
class ExecuteKubernetes():
"""Provides a way to track the status of an on-going Kubernetes
action.
Expand All @@ -71,31 +73,56 @@ class ActionStatusKubernetes():
outfile : str
a filename to write the output of the simulation
"""

def __init__(self, api, body, config_names, namespace, outfile):
self.core_v1 = api
self.body = dict(body)
self.pod_name = body['metadata']['name']
self.config_names = config_names
self.namespace = namespace
self.outfile = outfile
def __init__(self, image, command, input_file_names=None, output_file_name=None):
pod_name = str(uuid.uuid4())
container_name = str(uuid.uuid4())
self.body = {
'apiVersion': 'v1', 'kind': 'Pod', 'metadata': {'name': pod_name},
'spec': {
'restartPolicy': 'Never',
'containers': [
{
'name': container_name,
'image': image,
'command': ['/bin/sh', '-c'],
'args': [command]
}
]
}
}
self.input_file_names = input_file_names
self.output_file_name = output_file_name
config.load_kube_config()
self.core_v1 = core_v1_api.CoreV1Api()
self.pod_name = self.body['metadata']['name']
self.namespace = "default"
self._succeeded = False
self._started = False

def start(self):
def start(self, previous=None):
"""Will create the Kubernetes pod and hence start the action.
"""
if self.started():
raise RuntimeError('The pod has already started!')
target_dir = previous['rundir']
if self.input_file_names is None:
self.input_file_names = [previous['encoder_filename']]
if self.output_file_name is None:
self.output_file_name = previous['decoder_filename']
file_names = [(os.path.join(target_dir, input_file_name), str(uuid.uuid4()))
for input_file_name in self.input_file_names]
self.config_names = file_names
dep = copy.deepcopy(self.body)
dep['metadata']['name'] = str(uuid.uuid4())
self.create_config_maps(self.config_names)
self.create_volumes(self.config_names, self.body)
self.core_v1.create_namespaced_pod(body=self.body, namespace="default")
print('configmaps created')
self.create_volumes(self.config_names, dep)
print('configs and volumes created')
self.core_v1.create_namespaced_pod(body=dep, namespace="default")
self._started = True

def started(self):
"""Will return true if start() was called.
"""
return self._started
self.result = previous
while not self.finished():
time.wait(5)
self.finalise()
return previous

def finished(self):
"""Will return True if the pod has finished, otherwise will return False.
Expand Down Expand Up @@ -160,71 +187,5 @@ def create_config_maps(self, file_names):
data={os.path.basename(file_name): data},
metadata=metadata
)
print(configmap)
self.core_v1.create_namespaced_config_map(namespace='default', body=configmap)


class ExecuteKubernetes(BaseAction):
""" Provides an action element to run a shell command in a specified
directory.
Parameters
----------
pod_config : str
Filename of the YAML file with the Kubernetes Pod configuration.
input_file_names : list of str
A list of input file names for your simulation.
output_file_name : str
An output file name for the output of the simulation.
"""

def __init__(self, image, command, input_file_names=None, output_file_name=None):
if os.name == 'nt':
msg = ('Local execution is provided for testing on Posix systems'
'only. We detect you are using Windows.')
logger.error(msg)
raise NotImplementedError(msg)
# with open(pod_config, 'r') as fd:
# self.dep = yaml.load(fd, Loader=yaml.BaseLoader)
#import pdb; pdb.set_trace()
pod_name = str(uuid.uuid4())
container_name = str(uuid.uuid4())
self.dep = {'apiVersion': 'v1', 'kind': 'Pod', 'metadata': {'name': pod_name},
'spec': {
'restartPolicy': 'Never',
'containers': [
{
'name': container_name,
'image': image,
'command': ['/bin/sh', '-c'],
'args': [command]
}
]
}
}
self.input_file_names = input_file_names
self.output_file_name = output_file_name
config.load_kube_config()
#c = Configuration()
#c.assert_hostname = False
# Configuration.set_default(c)
self.core_v1 = core_v1_api.CoreV1Api()

def act_on_dir(self, target_dir):
"""Executes a dockerized simulation on input files found in `target_dir`.
target_dir : str
Directory in which to execute simulation.
"""
# this is suboptimal and a better interface is needed to get those filenames
if self.input_file_names is None:
self.input_file_names = [self.campaign._active_app_encoder.target_filename]
if self.output_file_name is None:
self.output_file_name = self.campaign._active_app_decoder.target_filename
file_names = [(os.path.join(target_dir, input_file_name), str(uuid.uuid4()))
for input_file_name in self.input_file_names]
dep = copy.deepcopy(self.dep)
dep['metadata']['name'] = str(uuid.uuid4())
return ActionStatusKubernetes(
self.core_v1, dep, file_names, 'default',
os.path.join(target_dir, self.output_file_name))
Loading

0 comments on commit 5d503eb

Please sign in to comment.