From a1487a740846ff774b3739b147b9da37a72fa407 Mon Sep 17 00:00:00 2001 From: "Jonas G. Drange" Date: Thu, 8 Oct 2020 15:10:14 +0200 Subject: [PATCH] Support first iteration of ensemble evaluator - Add an Event reporter to job_runner so that it produces events - Slightly refactor the job_runner cli to accommodate the need for reporters to vary --- python/job_runner/cli.py | 33 +++- python/job_runner/reporting/__init__.py | 1 + python/job_runner/reporting/event.py | 186 ++++++++++++++++++ python/job_runner/reporting/file.py | 55 +++--- python/job_runner/reporting/message.py | 24 ++- python/job_runner/runner.py | 21 +- python/job_runner/util/data.py | 33 ++++ python/res/job_queue/job_status_type_enum.py | 2 +- .../tests/job_runner/test_event_reporter.py | 146 ++++++++++++++ python/tests/job_runner/test_job_dispatch.py | 52 ++++- .../job_runner/test_job_manager_runtime_kw.py | 11 +- python/tests/job_runner/test_jobmanager.py | 69 ++----- requirements.txt | 1 + setup.py | 1 + 14 files changed, 516 insertions(+), 119 deletions(-) create mode 100644 python/job_runner/reporting/event.py create mode 100644 python/job_runner/util/data.py create mode 100644 python/tests/job_runner/test_event_reporter.py diff --git a/python/job_runner/cli.py b/python/job_runner/cli.py index 8523359613..f361c3a68e 100644 --- a/python/job_runner/cli.py +++ b/python/job_runner/cli.py @@ -2,10 +2,26 @@ import os import signal import sys +import json import job_runner.reporting as reporting from job_runner.reporting.message import Finish from job_runner.runner import JobRunner +from job_runner import JOBS_FILE + + +def _setup_reporters(is_interactive_run, ee_id): + reporters = [] + if is_interactive_run: + reporters.append(reporting.Interactive()) + elif ee_id: + reporters.append(reporting.File()) + reporters.append(reporting.Network()) + reporters.append(reporting.Event()) + else: + reporters.append(reporting.File()) + reporters.append(reporting.Network()) + return reporters def main(args): @@ -28,15 +44,18 @@ def main(args): sys.exit("No such directory: {}".format(parsed_args.run_path)) os.chdir(parsed_args.run_path) - reporters = [] + ee_id = None + try: + with open(JOBS_FILE, "r") as json_file: + jobs_data = json.load(json_file) + ee_id = jobs_data.get("ee_id") + except ValueError as e: + raise IOError("Job Runner cli failed to load JSON-file.{}".format(str(e))) - if len(parsed_args.job) > 0: - reporters.append(reporting.Interactive()) - else: - reporters.append(reporting.File()) - reporters.append(reporting.Network()) + is_interactive_run = len(parsed_args.job) > 0 + reporters = _setup_reporters(is_interactive_run, ee_id) - job_runner = JobRunner() + job_runner = JobRunner(jobs_data) for job_status in job_runner.run(parsed_args.job): for reporter in reporters: diff --git a/python/job_runner/reporting/__init__.py b/python/job_runner/reporting/__init__.py index 29dea0bc95..01d6980e6a 100644 --- a/python/job_runner/reporting/__init__.py +++ b/python/job_runner/reporting/__init__.py @@ -5,3 +5,4 @@ from .file import File from .interactive import Interactive from .network import Network +from .event import Event diff --git a/python/job_runner/reporting/event.py b/python/job_runner/reporting/event.py new file mode 100644 index 0000000000..03f90d3446 --- /dev/null +++ b/python/job_runner/reporting/event.py @@ -0,0 +1,186 @@ +from cloudevents.http import CloudEvent, to_json +from job_runner.reporting.message import ( + Exited, + Finish, + Init, + Running, + Start, +) + +_FM_JOB_START = "com.equinor.ert.forward_model_job.start" +_FM_JOB_RUNNING = "com.equinor.ert.forward_model_job.running" +_FM_JOB_SUCCESS = "com.equinor.ert.forward_model_job.success" +_FM_JOB_FAILURE = "com.equinor.ert.forward_model_job.failure" + +_FM_STEP_START = "com.equinor.ert.forward_model_step.start" +_FM_STEP_FAILURE = "com.equinor.ert.forward_model_step.failure" +_FM_STEP_SUCCESS = "com.equinor.ert.forward_model_step.success" + + +class TransitionError(ValueError): + pass + + +class Event: + def __init__(self, event_log="event_log"): + self._event_log = event_log + + self._ee_id = None + self._real_id = None + self._stage_id = None + + self._initialize_state_machine() + self._clear_log() + + def _initialize_state_machine(self): + initialized = (Init,) + jobs = (Start, Running, Exited) + finished = (Finish,) + self._states = { + initialized: self._init_handler, + jobs: self._job_handler, + finished: self._end_handler, + } + self._transitions = { + None: initialized, + initialized: jobs + finished, + jobs: jobs + finished, + } + self._state = None + + def _clear_log(self): + with open(self._event_log, "w") as f: + pass + + def report(self, msg): + new_state = None + for state in self._states.keys(): + if isinstance(msg, state): + new_state = state + + if self._state not in self._transitions or not isinstance( + msg, self._transitions[self._state] + ): + raise TransitionError( + f"Illegal transition {self._state} -> {new_state} for {msg}, expected to transition into {self._transitions[self._state]}" + ) + + self._states[new_state](msg) + self._state = new_state + + def _dump_event(self, event): + with open(self._event_log, "a") as el: + el.write("{}\n".format(to_json(event).decode())) + + def _step_path(self): + return f"/ert/ee/{self._ee_id}/real/{self._real_id}/stage/{self._stage_id}/step/{0}" + + def _init_handler(self, msg): + self._ee_id = msg.ee_id + self._real_id = msg.real_id + self._stage_id = msg.stage_id + self._dump_event( + CloudEvent( + { + "type": _FM_STEP_START, + "source": self._step_path(), + "datacontenttype": "application/json", + }, + { + "jobs": [job.job_data for job in msg.jobs], + }, + ) + ) + + def _job_handler(self, msg): + job_path = f"{self._step_path()}/job/{msg.job.index}" + + if isinstance(msg, Start): + self._dump_event( + CloudEvent( + { + "type": _FM_JOB_START, + "source": job_path, + }, + None, + ) + ) + if not msg.success(): + self._dump_event( + CloudEvent( + { + "type": _FM_JOB_FAILURE, + "source": job_path, + "datacontenttype": "application/json", + }, + { + "error_msg": msg.error_message, + }, + ) + ) + + elif isinstance(msg, Exited): + if msg.success(): + self._dump_event( + CloudEvent( + { + "type": _FM_JOB_SUCCESS, + "source": job_path, + }, + None, + ) + ) + else: + self._dump_event( + CloudEvent( + { + "type": _FM_JOB_FAILURE, + "source": job_path, + "datacontenttype": "application/json", + }, + { + "exit_code": msg.exit_code, + "error_msg": msg.error_message, + }, + ) + ) + + elif isinstance(msg, Running): + self._dump_event( + CloudEvent( + { + "type": _FM_JOB_RUNNING, + "source": job_path, + "datacontenttype": "application/json", + }, + { + "max_memory_usage": msg.max_memory_usage, + "current_memory_usage": msg.current_memory_usage, + }, + ) + ) + + def _end_handler(self, msg): + step_path = self._step_path() + if msg.success(): + self._dump_event( + CloudEvent( + { + "type": _FM_STEP_SUCCESS, + "source": step_path, + } + ) + ) + else: + self._dump_event( + CloudEvent( + { + "type": _FM_STEP_FAILURE, + "source": step_path, + "datacontenttype": "application/json", + }, + { + "error_msg": msg.error_message, + }, + ) + ) diff --git a/python/job_runner/reporting/file.py b/python/job_runner/reporting/file.py index af75aad773..f6890a7297 100644 --- a/python/job_runner/reporting/file.py +++ b/python/job_runner/reporting/file.py @@ -1,11 +1,20 @@ import json import os -import shutil import socket import time from job_runner.io import cond_unlink -from job_runner.reporting.message import Exited, Finish, Init, Running, Start +from job_runner.reporting.message import ( + _JOB_STATUS_FAILURE, + _JOB_STATUS_RUNNING, + _JOB_STATUS_SUCCESS, + Exited, + Finish, + Init, + Running, + Start, +) +from job_runner.util import data as data_util class File(object): @@ -36,25 +45,25 @@ def report(self, msg, sync_disc_timeout=10): if msg.success(): self._start_status_file(msg) self._add_log_line(msg.job) - job_status["status"] = "Running" - job_status["start_time"] = self._datetime_serialize(msg.timestamp) + job_status["status"] = _JOB_STATUS_RUNNING + job_status["start_time"] = data_util.datetime_serialize(msg.timestamp) else: error_msg = msg.error_message - job_status["status"] = "Failure" + job_status["status"] = _JOB_STATUS_FAILURE job_status["error"] = error_msg - job_status["end_time"] = self._datetime_serialize(msg.timestamp) + job_status["end_time"] = data_util.datetime_serialize(msg.timestamp) self._complete_status_file(msg) elif isinstance(msg, Exited): - job_status["end_time"] = self._datetime_serialize(msg.timestamp) + job_status["end_time"] = data_util.datetime_serialize(msg.timestamp) if msg.success(): - job_status["status"] = "Success" + job_status["status"] = _JOB_STATUS_SUCCESS self._complete_status_file(msg) else: error_msg = msg.error_message job_status["error"] = error_msg - job_status["status"] = "Failure" + job_status["status"] = _JOB_STATUS_FAILURE # A STATUS_file is not written if there is no exit_code, i.e. # when the job is killed due to timeout. @@ -65,11 +74,13 @@ def report(self, msg, sync_disc_timeout=10): elif isinstance(msg, Running): job_status["max_memory_usage"] = msg.max_memory_usage job_status["current_memory_usage"] = msg.current_memory_usage - job_status["status"] = "Running" + job_status["status"] = _JOB_STATUS_RUNNING elif isinstance(msg, Finish): if msg.success(): - self.status_dict["end_time"] = self._datetime_serialize(msg.timestamp) + self.status_dict["end_time"] = data_util.datetime_serialize( + msg.timestamp + ) self._dump_ok_file(sync_disc_timeout) else: # this has already been handled by earlier event @@ -89,27 +100,9 @@ def _init_status_file(self): def _init_job_status_dict(self, start_time, run_id, jobs): return { "run_id": run_id, - "start_time": self._datetime_serialize(start_time), - "end_time": None, - "jobs": [self._create_job_dict(j) for j in jobs], - } - - def _datetime_serialize(self, dt): - if dt is None: - return None - return time.mktime(dt.timetuple()) - - def _create_job_dict(self, job): - return { - "name": job.name(), - "status": "Waiting", - "error": None, - "start_time": None, + "start_time": data_util.datetime_serialize(start_time), "end_time": None, - "stdout": job.std_out, - "stderr": job.std_err, - "current_memory_usage": None, - "max_memory_usage": None, + "jobs": [data_util.create_job_dict(j) for j in jobs], } def _start_status_file(self, msg): diff --git a/python/job_runner/reporting/message.py b/python/job_runner/reporting/message.py index d033dd6ebc..3765dc8411 100644 --- a/python/job_runner/reporting/message.py +++ b/python/job_runner/reporting/message.py @@ -1,12 +1,29 @@ from datetime import datetime as dt +_JOB_STATUS_SUCCESS = "Success" +_JOB_STATUS_RUNNING = "Running" +_JOB_STATUS_FAILURE = "Failure" +_JOB_STATUS_WAITING = "Waiting" -class Message(object): +_RUNNER_STATUS_INITIALIZED = "Initialized" +_RUNNER_STATUS_SUCCESS = "Success" +_RUNNER_STATUS_FAILURE = "Failure" + + +class _MetaMessage(type): + def __repr__(cls): + return f"MessageType<{cls.__name__}>" + + +class Message(metaclass=_MetaMessage): def __init__(self, job=None): self.timestamp = dt.now() self.job = job self.error_message = None + def __repr__(self): + return type(self).__name__ + def with_error(self, message): self.error_message = message return self @@ -19,11 +36,14 @@ def success(self): class Init(Message): - def __init__(self, jobs, run_id, ert_pid): + def __init__(self, jobs, run_id, ert_pid, ee_id=None, real_id=None, stage_id=None): super(Init, self).__init__() self.jobs = jobs self.run_id = run_id self.ert_pid = ert_pid + self.ee_id = ee_id + self.real_id = real_id + self.stage_id = stage_id class Finish(Message): diff --git a/python/job_runner/runner.py b/python/job_runner/runner.py index 1b04fa6ff6..83e657f4f2 100644 --- a/python/job_runner/runner.py +++ b/python/job_runner/runner.py @@ -1,19 +1,12 @@ import json import os -from job_runner import JOBS_FILE from job_runner.job import Job from job_runner.reporting.message import Init, Finish class JobRunner(object): - def __init__(self, jobs_file=JOBS_FILE): - try: - with open(jobs_file, "r") as json_file: - jobs_data = json.load(json_file) - except ValueError as e: - raise IOError("Job Runner failed to load JSON-file.{}".format(str(e))) - + def __init__(self, jobs_data): os.umask(int(jobs_data["umask"], 8)) self._data_root = jobs_data.get("DATA_ROOT") @@ -21,6 +14,9 @@ def __init__(self, jobs_file=JOBS_FILE): os.environ["DATA_ROOT"] = self._data_root self.simulation_id = jobs_data.get("run_id") + self.ee_id = jobs_data.get("ee_id") + self.real_id = jobs_data.get("real_id") + self.stage_id = jobs_data.get("stage_id") self.ert_pid = jobs_data.get("ert_pid") self.global_environment = jobs_data.get("global_environment") self.global_update_path = jobs_data.get("global_update_path") @@ -44,7 +40,14 @@ def run(self, names_of_jobs_to_run): else: job_queue = [j for j in self.jobs if j.name() in names_of_jobs_to_run] - init_message = Init(job_queue, self.simulation_id, self.ert_pid) + init_message = Init( + job_queue, + self.simulation_id, + self.ert_pid, + self.ee_id, + self.real_id, + self.stage_id, + ) unused = set(names_of_jobs_to_run) - set([j.name() for j in job_queue]) if unused: diff --git a/python/job_runner/util/data.py b/python/job_runner/util/data.py new file mode 100644 index 0000000000..6fd309eadd --- /dev/null +++ b/python/job_runner/util/data.py @@ -0,0 +1,33 @@ +"""Utility to compensate for a weak job type.""" +import time +from job_runner.reporting.message import ( + Exited, + Finish, + Init, + Running, + Start, + _JOB_STATUS_SUCCESS, + _JOB_STATUS_RUNNING, + _JOB_STATUS_FAILURE, + _JOB_STATUS_WAITING, +) + + +def create_job_dict(job): + return { + "name": job.name(), + "status": _JOB_STATUS_WAITING, + "error": None, + "start_time": None, + "end_time": None, + "stdout": job.std_out, + "stderr": job.std_err, + "current_memory_usage": None, + "max_memory_usage": None, + } + + +def datetime_serialize(dt): + if dt is None: + return None + return time.mktime(dt.timetuple()) diff --git a/python/res/job_queue/job_status_type_enum.py b/python/res/job_queue/job_status_type_enum.py index de798091e0..ae76877ae9 100644 --- a/python/res/job_queue/job_status_type_enum.py +++ b/python/res/job_queue/job_status_type_enum.py @@ -37,7 +37,7 @@ class JobStatusType(BaseCEnum): @classmethod def from_string(cls, string): - pass + return super().from_string(string) JobStatusType.addEnum("JOB_QUEUE_NOT_ACTIVE", 1) diff --git a/python/tests/job_runner/test_event_reporter.py b/python/tests/job_runner/test_event_reporter.py new file mode 100644 index 0000000000..3d2041e2b2 --- /dev/null +++ b/python/tests/job_runner/test_event_reporter.py @@ -0,0 +1,146 @@ +import os + +from job_runner.job import Job +from job_runner.reporting import Event +from job_runner.reporting.event import ( + _FM_JOB_FAILURE, + _FM_JOB_RUNNING, + _FM_JOB_START, + _FM_JOB_SUCCESS, + _FM_STEP_FAILURE, + _FM_STEP_START, + _FM_STEP_SUCCESS, +) +from job_runner.reporting.message import Exited, Finish, Init, Running, Start + + +def test_report_with_init_message_argument(tmpdir): + + reporter = Event(event_log=tmpdir / "event_log") + job1 = Job({"name": "job1", "stdout": "/stdout", "stderr": "/stderr"}, 0) + + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + + with open(reporter._event_log, "r") as f: + lines = f.readlines() + assert len(lines) == 1 + assert '"name": "job1"' in lines[0], "log missing job1" + assert f'"type": "{_FM_STEP_START}"' in lines[0], "logged wrong type" + assert ( + '"source": "/ert/ee/ee_id/real/0/stage/0/step/0"' in lines[0] + ), "bad source" + + +def test_report_with_successful_start_message_argument(tmpdir): + reporter = Event(event_log=tmpdir / "event_log") + job1 = Job({"name": "job1", "stdout": "/stdout", "stderr": "/stderr"}, 0) + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + msg = Start(job1) + + reporter.report(msg) + + with open(reporter._event_log, "r") as f: + lines = f.readlines() + assert len(lines) == 2 + assert f'"type": "{_FM_JOB_START}"' in lines[1], "logged wrong type" + assert ( + '"source": "/ert/ee/ee_id/real/0/stage/0/step/0/job/0"' in lines[1] + ), "bad source" + + +def test_report_with_failed_start_message_argument(tmpdir): + reporter = Event(event_log=tmpdir / "event_log") + + job1 = Job({"name": "job1"}, 0) + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + + msg = Start(job1).with_error("massive_failure") + + reporter.report(msg) + + with open(reporter._event_log, "r") as f: + lines = f.readlines() + assert len(lines) == 3 + assert f'"type": "{_FM_JOB_FAILURE}"' in lines[2], "logged wrong type" + assert '"error_msg": "massive_failure"' in lines[2], "log missing error message" + + +def test_report_with_successful_exit_message_argument(tmpdir): + reporter = Event(event_log=tmpdir / "event_log") + job1 = Job({"name": "job1"}, 0) + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + reporter.report(Exited(job1, 0)) + + with open(reporter._event_log, "r") as f: + lines = f.readlines() + assert len(lines) == 2 + assert f'"type": "{_FM_JOB_SUCCESS}"' in lines[1], "logged wrong type" + + +def test_report_with_failed_exit_message_argument(tmpdir): + reporter = Event(event_log=tmpdir / "event_log") + job1 = Job({"name": "job1"}, 0) + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + reporter.report(Exited(job1, 1).with_error("massive_failure")) + + with open(reporter._event_log, "r") as f: + lines = f.readlines() + assert len(lines) == 2 + assert '"error_msg": "massive_failure"' in lines[1], "log missing error message" + assert f'"type": "{_FM_JOB_FAILURE}"' in lines[1], "logged wrong type" + + +def test_report_with_running_message_argument(tmpdir): + reporter = Event(event_log=tmpdir / "event_log") + job1 = Job({"name": "job1"}, 0) + + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + reporter.report(Running(job1, 100, 10)) + + with open(reporter._event_log, "r") as f: + lines = f.readlines() + assert len(lines) == 2 + assert f'"type": "{_FM_JOB_RUNNING}"' in lines[1], "logged wrong type" + assert '"max_memory_usage": 100' in lines[1], "log missing max_memory_usage" + assert ( + '"current_memory_usage": 10' in lines[1] + ), "log missing current_memory_usage" + + +def test_report_with_successful_finish_message_argument(tmpdir): + reporter = Event(event_log=tmpdir / "event_log") + job1 = Job({"name": "job1"}, 0) + + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + reporter.report(Running(job1, 100, 10)) + reporter.report(Finish()) + + with open(reporter._event_log, "r") as f: + lines = f.readlines() + assert len(lines) == 3 + assert f'"type": "{_FM_STEP_SUCCESS}"' in lines[2], "logged wrong type" + + +def test_report_with_failed_finish_message_argument(tmpdir): + reporter = Event(event_log=tmpdir / "event_log") + job1 = Job({"name": "job1"}, 0) + + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + reporter.report(Running(job1, 100, 10)) + reporter.report(Finish().with_error("massive_failure")) + + with open(reporter._event_log, "r") as f: + lines = f.readlines() + assert len(lines) == 3 + assert f'"type": "{_FM_STEP_FAILURE}"' in lines[2], "logged wrong type" + assert '"error_msg": "massive_failure"' in lines[2], "log missing error message" + + +def test_report_startup_clearing_of_event_log_file(tmpdir): + reporter1 = Event(event_log=tmpdir / "event_log") + job1 = Job({"name": "job1"}, 0) + reporter1.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + + reporter2 = Event(event_log=tmpdir / "event_log") + + assert os.path.getsize(tmpdir / "event_log") == 0 diff --git a/python/tests/job_runner/test_job_dispatch.py b/python/tests/job_runner/test_job_dispatch.py index 7f5747a15e..03830f2d33 100644 --- a/python/tests/job_runner/test_job_dispatch.py +++ b/python/tests/job_runner/test_job_dispatch.py @@ -10,12 +10,14 @@ from textwrap import dedent import psutil +import pytest from subprocess import Popen -from job_runner.cli import main -from job_runner.reporting.message import Finish +from job_runner.cli import main, _setup_reporters +from job_runner.reporting.message import Init, Finish +from job_runner.reporting import Event, Interactive from tests.utils import tmpdir, wait_until -from unittest.mock import patch +from unittest.mock import patch, mock_open class JobDispatchTest(unittest.TestCase): @@ -109,11 +111,14 @@ def test_terminate_jobs(self): @tmpdir(None) def test_job_dispatch_kills_itself_after_unsuccessful_job(self): + jobs_json = json.dumps({"ee_id": "_id_"}) + with patch("job_runner.cli.os") as mock_os, patch( - "job_runner.cli.JobRunner" - ) as mock_runner: + "job_runner.cli.open", new=mock_open(read_data=jobs_json) + ) as mock_file, patch("job_runner.cli.JobRunner") as mock_runner: mock_runner.return_value.run.return_value = [ - Finish().with_error("overall bad run") + Init([], 0, 0), + Finish().with_error("overall bad run"), ] mock_os.getpgid.return_value = 17 @@ -238,3 +243,38 @@ def test_job_dispatch_run_subset_specified_as_parmeter(self): assert not os.path.isfile("job_A.out") assert os.path.isfile("job_B.out") assert os.path.isfile("job_C.out") + + def test_no_jobs_json_file(self): + with self.assertRaises(IOError): + main(["script.py", os.path.realpath(os.curdir)]) + + @tmpdir(None) + def test_no_json_jobs_json_file(self): + path = os.path.realpath(os.curdir) + jobs_file = os.path.join(path, "jobs.json") + + with open(jobs_file, "w") as f: + f.write("not json") + + with self.assertRaises(OSError): + main(["script.py", path]) + + +@pytest.mark.parametrize( + "is_interactive_run, ee_id", + [(False, None), (False, "1234"), (True, None), (True, "1234")], +) +def test_setup_reporters(is_interactive_run, ee_id): + reporters = _setup_reporters(is_interactive_run, ee_id) + + if not is_interactive_run and not ee_id: + assert len(reporters) == 2 + assert not any([isinstance(r, Event) for r in reporters]) + + if not is_interactive_run and ee_id: + assert len(reporters) == 3 + assert any([isinstance(r, Event) for r in reporters]) + + if is_interactive_run and ee_id: + assert len(reporters) == 1 + assert any([isinstance(r, Interactive) for r in reporters]) diff --git a/python/tests/job_runner/test_job_manager_runtime_kw.py b/python/tests/job_runner/test_job_manager_runtime_kw.py index 0a60a23dfc..b75c8fcf7f 100644 --- a/python/tests/job_runner/test_job_manager_runtime_kw.py +++ b/python/tests/job_runner/test_job_manager_runtime_kw.py @@ -31,10 +31,7 @@ def test_run_one_job_with_an_integer_arg_is_actually_a_fractional(self): data = {"umask": "0000", "DATA_ROOT": "/path/to/data", "jobList": [job_0]} - jobs_file = os.path.join(os.getcwd(), "jobs.json") - with open(jobs_file, "w") as f: - f.write(json.dumps(data)) - runner = JobRunner() + runner = JobRunner(data) statuses = list(runner.run([])) starts = [e for e in statuses if isinstance(e, Start)] @@ -76,11 +73,7 @@ def test_run_given_one_job_with_missing_file_and_one_file_present(self): "jobList": [job_0, job_1], } - jobs_file = os.path.join(os.getcwd(), "jobs.json") - with open(jobs_file, "w") as f: - f.write(json.dumps(data)) - - runner = JobRunner() + runner = JobRunner(data) statuses = list(runner.run([])) diff --git a/python/tests/job_runner/test_jobmanager.py b/python/tests/job_runner/test_jobmanager.py index 57d43ff112..2e1ee397b4 100644 --- a/python/tests/job_runner/test_jobmanager.py +++ b/python/tests/job_runner/test_jobmanager.py @@ -2,13 +2,12 @@ import os import os.path import stat -from unittest import TestCase +from unittest import TestCase from job_runner.reporting.message import Exited, Start from job_runner.runner import JobRunner from res.job_queue import EnvironmentVarlist, ExtJob, ExtJoblist, ForwardModel from res.util import SubstitutionList - from tests.utils import tmpdir # Test data generated by ForwardModel @@ -57,11 +56,7 @@ def create_jobs_json(job_list, umask="0000"): - data = {"umask": umask, "DATA_ROOT": "/path/to/data", "jobList": job_list} - - jobs_file = os.path.join(os.getcwd(), "jobs.json") - with open(jobs_file, "w") as f: - f.write(json.dumps(data)) + return {"umask": umask, "DATA_ROOT": "/path/to/data", "jobList": job_list} class JobRunnerTest(TestCase): @@ -89,48 +84,15 @@ def tearDown(self): if key in os.environ: del os.environ[key] - def assert_clean_slate(self): - self.assertFalse(os.path.isfile("jobs.py")) - self.assertFalse(os.path.isfile("jobs.json")) - - @tmpdir(None) - def test_no_jobs_json(self): - self.assert_clean_slate() - with self.assertRaises(IOError): - JobRunner(jobs_file="does/not/exist") - - @tmpdir(None) - def test_invalid_jobs_json(self): - self.assert_clean_slate() - # Syntax error - with open("jobs.json", "w") as f: - f.write("Hello - this is not valid JSON ...") - - with self.assertRaises(IOError): - JobRunner() - @tmpdir(None) def test_missing_joblist_json(self): - self.assert_clean_slate() - with open("jobs.json", "w") as f: - f.write(json.dumps({"umask": "0000"})) - with self.assertRaises(KeyError): - JobRunner() + JobRunner({"umask": "0000"}) @tmpdir(None) def test_missing_umask_json(self): - self.assert_clean_slate() - with open("jobs.json", "w") as f: - f.write(json.dumps({"jobList": "[]"})) - with self.assertRaises(KeyError): - JobRunner() - - @tmpdir(None) - def test_jobs_zero(self): - with self.assertRaises(IOError): - JobRunner(jobs_file="Does/not/exist") + JobRunner({"jobList": "[]"}) @tmpdir(None) def test_run_output_rename(self): @@ -141,8 +103,8 @@ def test_run_output_rename(self): "stderr": "err", } joblist = [job, job, job, job, job] - create_jobs_json(joblist) - jobm = JobRunner() + + jobm = JobRunner(create_jobs_json(joblist)) for status in enumerate(jobm.run([])): if isinstance(status, Start): @@ -162,8 +124,8 @@ def test_run_multiple_ok(self): "argList": ["-p", "-v", job_index], } joblist.append(job) - create_jobs_json(joblist) - jobm = JobRunner() + + jobm = JobRunner(create_jobs_json(joblist)) statuses = [s for s in list(jobm.run([])) if isinstance(s, Exited)] @@ -187,15 +149,14 @@ def test_run_multiple_fail_only_runs_one(self): "stdout": "exit_out", "stderr": "exit_err", # produces something on stderr, and exits with - # exit_code=index "argList": [ "-c", 'echo "failed with {}" 1>&2 ; exit {}'.format(index, index), ], } joblist.append(job) - create_jobs_json(joblist) - jobm = JobRunner() + + jobm = JobRunner(create_jobs_json(joblist)) statuses = [s for s in list(jobm.run([])) if isinstance(s, Exited)] @@ -243,10 +204,7 @@ def test_given_global_env_and_update_path_executable_env_is_updated(self): "jobList": [job], } - jobs_file = os.path.join(os.getcwd(), "jobs.json") - with open(jobs_file, "w") as f: - f.write(json.dumps(data)) - statuses = list(JobRunner().run([])) + statuses = list(JobRunner(data).run([])) exited_messages = [m for m in statuses if isinstance(m, Exited) and m.success()] number_of_finished_scripts = len(exited_messages) @@ -300,7 +258,10 @@ def test_exec_env(self): "run_id", None, "data_root", global_args, 0, env_varlist ) - for msg in list(JobRunner().run([])): + with open("jobs.json", "r") as f: + jobs_json = json.load(f) + + for msg in list(JobRunner(jobs_json).run([])): if isinstance(msg, Start): with open("exec_env_exec_env.json") as f: exec_env = json.load(f) diff --git a/requirements.txt b/requirements.txt index b431a62dd4..49a423cd76 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ cmake ninja # From setup.py install_requires +cloudevents ecl futures jinja2 diff --git a/setup.py b/setup.py index f11c2c9c9a..84cd88338b 100644 --- a/setup.py +++ b/setup.py @@ -51,6 +51,7 @@ def get_data_files(): license="GPL-3.0", platforms="any", install_requires=[ + "cloudevents", "ecl", "futures", "jinja2",