From 47c940c1364130173eaae74190748d9489c2dff1 Mon Sep 17 00:00:00 2001 From: Morten Bendiksen Date: Fri, 19 Mar 2021 10:22:04 +0100 Subject: [PATCH] Make legacy ensemble members connect through websocket --- .github/workflows/test_and_deploy.yml | 2 +- python/job_runner/cli.py | 7 +- python/job_runner/reporting/event.py | 15 +- python/job_runner/util/client.py | 59 +++++++ python/res/job_queue/queue.py | 3 +- test_requirements.txt | 1 + tests/job_runner/test_event_reporter.py | 211 +++++++++++++----------- tests/job_runner/test_job_dispatch.py | 44 ++--- tests/utils/__init__.py | 45 +++++ 9 files changed, 254 insertions(+), 133 deletions(-) create mode 100644 python/job_runner/util/client.py diff --git a/.github/workflows/test_and_deploy.yml b/.github/workflows/test_and_deploy.yml index 1a82a12739..2f72ab54ff 100644 --- a/.github/workflows/test_and_deploy.yml +++ b/.github/workflows/test_and_deploy.yml @@ -53,7 +53,7 @@ jobs: fail-fast: false matrix: os: ['ubuntu-latest', 'macos-latest'] - python: ['3.6', '3.7', '3.8', '3.9'] + python: ['3.6', '3.7', '3.8'] runs-on: ${{ matrix.os }} diff --git a/python/job_runner/cli.py b/python/job_runner/cli.py index 612f0ad026..ec2aa1ca40 100644 --- a/python/job_runner/cli.py +++ b/python/job_runner/cli.py @@ -10,14 +10,14 @@ from job_runner import JOBS_FILE -def _setup_reporters(is_interactive_run, ee_id): +def _setup_reporters(is_interactive_run, ee_id, evaluator_url): reporters = [] if is_interactive_run: reporters.append(reporting.Interactive()) elif ee_id: reporters.append(reporting.File(sync_disc_timeout=0)) reporters.append(reporting.Network()) - reporters.append(reporting.Event()) + reporters.append(reporting.Event(evaluator_url=evaluator_url)) else: reporters.append(reporting.File()) reporters.append(reporting.Network()) @@ -49,11 +49,12 @@ def main(args): with open(JOBS_FILE, "r") as json_file: jobs_data = json.load(json_file) ee_id = jobs_data.get("ee_id") + evaluator_url = jobs_data.get("dispatch_url") except ValueError as e: raise IOError("Job Runner cli failed to load JSON-file.{}".format(str(e))) is_interactive_run = len(parsed_args.job) > 0 - reporters = _setup_reporters(is_interactive_run, ee_id) + reporters = _setup_reporters(is_interactive_run, ee_id, evaluator_url) job_runner = JobRunner(jobs_data) diff --git a/python/job_runner/reporting/event.py b/python/job_runner/reporting/event.py index e916263fee..a21093b9e7 100644 --- a/python/job_runner/reporting/event.py +++ b/python/job_runner/reporting/event.py @@ -8,6 +8,8 @@ ) from pathlib import Path +from job_runner.util.client import Client + _FM_JOB_START = "com.equinor.ert.forward_model_job.start" _FM_JOB_RUNNING = "com.equinor.ert.forward_model_job.running" _FM_JOB_SUCCESS = "com.equinor.ert.forward_model_job.success" @@ -23,15 +25,14 @@ class TransitionError(ValueError): class Event: - def __init__(self, event_log="event_log"): - self._event_log = event_log + def __init__(self, evaluator_url): + self._evaluator_url = evaluator_url self._ee_id = None self._real_id = None self._stage_id = None self._initialize_state_machine() - self._clear_log() def _initialize_state_machine(self): initialized = (Init,) @@ -49,10 +50,6 @@ def _initialize_state_machine(self): } self._state = None - def _clear_log(self): - with open(self._event_log, "w") as f: - pass - def report(self, msg): new_state = None for state in self._states.keys(): @@ -70,8 +67,8 @@ def report(self, msg): self._state = new_state def _dump_event(self, event): - with open(self._event_log, "a") as el: - el.write("{}\n".format(to_json(event).decode())) + with Client(self._evaluator_url) as client: + client.send(to_json(event).decode()) def _step_path(self): return f"/ert/ee/{self._ee_id}/real/{self._real_id}/stage/{self._stage_id}/step/{0}" diff --git a/python/job_runner/util/client.py b/python/job_runner/util/client.py new file mode 100644 index 0000000000..9f6e3d315a --- /dev/null +++ b/python/job_runner/util/client.py @@ -0,0 +1,59 @@ +import websockets +from websockets import ConnectionClosedOK +import asyncio +import cloudevents +from websockets.exceptions import ConnectionClosed + + +class Client: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + if self.websocket is not None: + self.loop.run_until_complete(self.websocket.close()) + self.loop.close() + + def __init__(self, url, max_retries=10, timeout_multiplier=5): + if url is None: + raise ValueError("url was None") + self.url = url + self._max_retries = max_retries + self._timeout_multiplier = timeout_multiplier + self.websocket = None + self.loop = asyncio.new_event_loop() + + async def get_websocket(self): + return await websockets.connect(self.url) + + async def _send(self, msg): + for retry in range(self._max_retries + 1): + try: + if self.websocket is None: + self.websocket = await self.get_websocket() + await self.websocket.send(msg) + return + except ConnectionClosedOK: + # Connection was closed no point in trying to send more messages + raise + except (ConnectionClosed, ConnectionRefusedError, OSError): + if retry == self._max_retries: + raise + await asyncio.sleep(0.2 + self._timeout_multiplier * retry) + self.websocket = None + + def send(self, msg): + self.loop.run_until_complete(self._send(msg)) + + def send_event(self, ev_type, ev_source, ev_data=None): + if ev_data is None: + ev_data = {} + event = cloudevents.http.CloudEvent( + { + "type": ev_type, + "source": ev_source, + "datacontenttype": "application/json", + }, + ev_data, + ) + self.send(cloudevents.http.to_json(event).decode()) diff --git a/python/res/job_queue/queue.py b/python/res/job_queue/queue.py index 0615718dfa..3c9a410a82 100644 --- a/python/res/job_queue/queue.py +++ b/python/res/job_queue/queue.py @@ -567,13 +567,14 @@ def snapshot(self) -> typing.Optional[typing.Dict[int, str]]: return None return snapshot - def add_ensemble_evaluator_information_to_jobs_file(self, ee_id): + def add_ensemble_evaluator_information_to_jobs_file(self, ee_id, dispatch_url): for q_index, q_node in enumerate(self.job_list): with open(f"{q_node.run_path}/{JOBS_FILE}", "r+") as jobs_file: data = json.load(jobs_file) data["ee_id"] = ee_id data["real_id"] = self._qindex_to_iens[q_index] data["stage_id"] = 0 + data["dispatch_url"] = dispatch_url jobs_file.seek(0) jobs_file.truncate() json.dump(data, jobs_file, indent=4) diff --git a/test_requirements.txt b/test_requirements.txt index 5cf52c3cfb..c23fe9aed3 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -3,3 +3,4 @@ decorator pylint black click +pytest-asyncio diff --git a/tests/job_runner/test_event_reporter.py b/tests/job_runner/test_event_reporter.py index bfb1ba9cda..7ecba19952 100644 --- a/tests/job_runner/test_event_reporter.py +++ b/tests/job_runner/test_event_reporter.py @@ -1,4 +1,7 @@ +import contextlib import os +import threading +from functools import partial from job_runner.job import Job from job_runner.reporting import Event @@ -14,140 +17,148 @@ from job_runner.reporting.message import Exited, Finish, Init, Running, Start import json +from tests.utils import _mock_ws_thread -def test_report_with_init_message_argument(tmpdir): - reporter = Event(event_log=tmpdir / "event_log") +def test_report_with_init_message_argument(unused_tcp_port): + host = "localhost" + url = f"ws://{host}:{unused_tcp_port}" + reporter = Event(evaluator_url=url) job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - - reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) - - with open(reporter._event_log, "r") as f: - lines = f.readlines() - assert len(lines) == 1 - event = json.loads(lines[0]) - job = event.get("data", {}).get("jobs", {}).get("0", {}) - assert job - assert job["name"] == "job1" - assert job["stdout"].startswith("/") and job["stdout"].endswith("stdout") - assert job["stderr"].startswith("/") and job["stderr"].endswith("stderr") - assert event["type"] == _FM_STEP_START - assert event["source"] == "/ert/ee/ee_id/real/0/stage/0/step/0" - - -def test_report_with_successful_start_message_argument(tmpdir): - reporter = Event(event_log=tmpdir / "event_log") + lines = [] + with _mock_ws_thread(host, unused_tcp_port, lines): + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + + assert len(lines) == 1 + event = json.loads(lines[0]) + job = event.get("data", {}).get("jobs", {}).get("0", {}) + assert job + assert job["name"] == "job1" + assert job["stdout"].startswith("/") and job["stdout"].endswith("stdout") + assert job["stderr"].startswith("/") and job["stderr"].endswith("stderr") + assert event["type"] == _FM_STEP_START + assert event["source"] == "/ert/ee/ee_id/real/0/stage/0/step/0" + + +def test_report_with_successful_start_message_argument(unused_tcp_port): + host = "localhost" + url = f"ws://{host}:{unused_tcp_port}" + reporter = Event(evaluator_url=url) job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) - msg = Start(job1) - - reporter.report(msg) + lines = [] + with _mock_ws_thread(host, unused_tcp_port, lines): + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + msg = Start(job1) + reporter.report(msg) - with open(reporter._event_log, "r") as f: - lines = f.readlines() - assert len(lines) == 2 - event = json.loads(lines[1]) - assert event["type"] == _FM_JOB_START - assert event["source"] == "/ert/ee/ee_id/real/0/stage/0/step/0/job/0" + assert len(lines) == 2 + event = json.loads(lines[1]) + assert event["type"] == _FM_JOB_START + assert event["source"] == "/ert/ee/ee_id/real/0/stage/0/step/0/job/0" -def test_report_with_failed_start_message_argument(tmpdir): - reporter = Event(event_log=tmpdir / "event_log") +def test_report_with_failed_start_message_argument(unused_tcp_port): + host = "localhost" + url = f"ws://{host}:{unused_tcp_port}" + reporter = Event(evaluator_url=url) job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) - msg = Start(job1).with_error("massive_failure") + lines = [] + with _mock_ws_thread(host, unused_tcp_port, lines): + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) - reporter.report(msg) + msg = Start(job1).with_error("massive_failure") - with open(reporter._event_log, "r") as f: - lines = f.readlines() - assert len(lines) == 3 - event = json.loads(lines[2]) - assert event["type"] == _FM_JOB_FAILURE - assert event["data"]["error_msg"] == "massive_failure" + reporter.report(msg) + assert len(lines) == 3 + event = json.loads(lines[2]) + assert event["type"] == _FM_JOB_FAILURE + assert event["data"]["error_msg"] == "massive_failure" -def test_report_with_successful_exit_message_argument(tmpdir): - reporter = Event(event_log=tmpdir / "event_log") - job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) - reporter.report(Exited(job1, 0)) - - with open(reporter._event_log, "r") as f: - lines = f.readlines() - assert len(lines) == 2 - event = json.loads(lines[1]) - assert event["type"] == _FM_JOB_SUCCESS - -def test_report_with_failed_exit_message_argument(tmpdir): - reporter = Event(event_log=tmpdir / "event_log") +def test_report_with_successful_exit_message_argument(unused_tcp_port): + host = "localhost" + url = f"ws://{host}:{unused_tcp_port}" + reporter = Event(evaluator_url=url) job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) - reporter.report(Exited(job1, 1).with_error("massive_failure")) - with open(reporter._event_log, "r") as f: - lines = f.readlines() - assert len(lines) == 2 - event = json.loads(lines[1]) - assert event["type"] == _FM_JOB_FAILURE - assert event["data"]["error_msg"] == "massive_failure" + lines = [] + with _mock_ws_thread(host, unused_tcp_port, lines): + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + reporter.report(Exited(job1, 0)) + + assert len(lines) == 2 + event = json.loads(lines[1]) + assert event["type"] == _FM_JOB_SUCCESS -def test_report_with_running_message_argument(tmpdir): - reporter = Event(event_log=tmpdir / "event_log") +def test_report_with_failed_exit_message_argument(unused_tcp_port): + host = "localhost" + url = f"ws://{host}:{unused_tcp_port}" + reporter = Event(evaluator_url=url) job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) - reporter.report(Running(job1, 100, 10)) + lines = [] + with _mock_ws_thread(host, unused_tcp_port, lines): + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + reporter.report(Exited(job1, 1).with_error("massive_failure")) - with open(reporter._event_log, "r") as f: - lines = f.readlines() - assert len(lines) == 2 - event = json.loads(lines[1]) - assert event["type"] == _FM_JOB_RUNNING - assert event["data"]["max_memory_usage"] == 100 - assert event["data"]["current_memory_usage"] == 10 + assert len(lines) == 2 + event = json.loads(lines[1]) + assert event["type"] == _FM_JOB_FAILURE + assert event["data"]["error_msg"] == "massive_failure" -def test_report_with_successful_finish_message_argument(tmpdir): - reporter = Event(event_log=tmpdir / "event_log") +def test_report_with_running_message_argument(unused_tcp_port): + host = "localhost" + url = f"ws://{host}:{unused_tcp_port}" + reporter = Event(evaluator_url=url) job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) - reporter.report(Running(job1, 100, 10)) - reporter.report(Finish()) + lines = [] + with _mock_ws_thread(host, unused_tcp_port, lines): + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + reporter.report(Running(job1, 100, 10)) - with open(reporter._event_log, "r") as f: - lines = f.readlines() - assert len(lines) == 3 - event = json.loads(lines[2]) - assert event["type"] == _FM_STEP_SUCCESS + assert len(lines) == 2 + event = json.loads(lines[1]) + assert event["type"] == _FM_JOB_RUNNING + assert event["data"]["max_memory_usage"] == 100 + assert event["data"]["current_memory_usage"] == 10 -def test_report_with_failed_finish_message_argument(tmpdir): - reporter = Event(event_log=tmpdir / "event_log") +def test_report_with_successful_finish_message_argument(unused_tcp_port): + host = "localhost" + url = f"ws://{host}:{unused_tcp_port}" + reporter = Event(evaluator_url=url) job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) - reporter.report(Running(job1, 100, 10)) - reporter.report(Finish().with_error("massive_failure")) + lines = [] + with _mock_ws_thread(host, unused_tcp_port, lines): + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + reporter.report(Running(job1, 100, 10)) + reporter.report(Finish()) - with open(reporter._event_log, "r") as f: - lines = f.readlines() - assert len(lines) == 3 - event = json.loads(lines[2]) - assert event["type"] == _FM_STEP_FAILURE - assert event["data"]["error_msg"] == "massive_failure" + assert len(lines) == 3 + event = json.loads(lines[2]) + assert event["type"] == _FM_STEP_SUCCESS -def test_report_startup_clearing_of_event_log_file(tmpdir): - reporter1 = Event(event_log=tmpdir / "event_log") +def test_report_with_failed_finish_message_argument(unused_tcp_port): + host = "localhost" + url = f"ws://{host}:{unused_tcp_port}" + reporter = Event(evaluator_url=url) job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - reporter1.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) - reporter2 = Event(event_log=tmpdir / "event_log") + lines = [] + with _mock_ws_thread(host, unused_tcp_port, lines): + reporter.report(Init([job1], 1, 19, ee_id="ee_id", real_id=0, stage_id=0)) + reporter.report(Running(job1, 100, 10)) + reporter.report(Finish().with_error("massive_failure")) - assert os.path.getsize(tmpdir / "event_log") == 0 + assert len(lines) == 3 + event = json.loads(lines[2]) + assert event["type"] == _FM_STEP_FAILURE + assert event["data"]["error_msg"] == "massive_failure" diff --git a/tests/job_runner/test_job_dispatch.py b/tests/job_runner/test_job_dispatch.py index 03830f2d33..cf627d4611 100644 --- a/tests/job_runner/test_job_dispatch.py +++ b/tests/job_runner/test_job_dispatch.py @@ -16,7 +16,9 @@ from job_runner.cli import main, _setup_reporters from job_runner.reporting.message import Init, Finish from job_runner.reporting import Event, Interactive -from tests.utils import tmpdir, wait_until +from pytest_asyncio.plugin import _unused_tcp_port + +from tests.utils import tmpdir, wait_until, _mock_ws_thread from unittest.mock import patch, mock_open @@ -109,23 +111,6 @@ def test_terminate_jobs(self): os.wait() # allow os to clean up zombie processes - @tmpdir(None) - def test_job_dispatch_kills_itself_after_unsuccessful_job(self): - jobs_json = json.dumps({"ee_id": "_id_"}) - - with patch("job_runner.cli.os") as mock_os, patch( - "job_runner.cli.open", new=mock_open(read_data=jobs_json) - ) as mock_file, patch("job_runner.cli.JobRunner") as mock_runner: - mock_runner.return_value.run.return_value = [ - Init([], 0, 0), - Finish().with_error("overall bad run"), - ] - mock_os.getpgid.return_value = 17 - - main(["script.py", "/foo/bar/baz"]) - - mock_os.killpg.assert_called_with(17, signal.SIGKILL) - @tmpdir(None) def test_job_dispatch_run_subset_specified_as_parmeter(self): with open("dummy_executable", "w") as f: @@ -265,7 +250,7 @@ def test_no_json_jobs_json_file(self): [(False, None), (False, "1234"), (True, None), (True, "1234")], ) def test_setup_reporters(is_interactive_run, ee_id): - reporters = _setup_reporters(is_interactive_run, ee_id) + reporters = _setup_reporters(is_interactive_run, ee_id, "") if not is_interactive_run and not ee_id: assert len(reporters) == 2 @@ -278,3 +263,24 @@ def test_setup_reporters(is_interactive_run, ee_id): if is_interactive_run and ee_id: assert len(reporters) == 1 assert any([isinstance(r, Interactive) for r in reporters]) + + +@tmpdir(None) +def test_job_dispatch_kills_itself_after_unsuccessful_job(unused_tcp_port): + host = "localhost" + port = unused_tcp_port + jobs_json = json.dumps({"ee_id": "_id_", "dispatch_url": f"ws://localhost:{port}"}) + + with patch("job_runner.cli.os") as mock_os, patch( + "job_runner.cli.open", new=mock_open(read_data=jobs_json) + ) as mock_file, patch("job_runner.cli.JobRunner") as mock_runner: + mock_runner.return_value.run.return_value = [ + Init([], 0, 0), + Finish().with_error("overall bad run"), + ] + mock_os.getpgid.return_value = 17 + + with _mock_ws_thread(host, port, []): + main(["script.py", "/foo/bar/baz"]) + + mock_os.killpg.assert_called_with(17, signal.SIGKILL) diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py index 952d70e632..f39e7b33e6 100644 --- a/tests/utils/__init__.py +++ b/tests/utils/__init__.py @@ -1,12 +1,18 @@ +import asyncio import contextlib import logging import os import tempfile import shutil +import threading +from functools import partial import decorator import time +import websockets +from job_runner.util.client import Client + """ Swiped from https://github.com/equinor/everest/blob/master/tests/utils/__init__.py @@ -86,3 +92,42 @@ def wait_until(func, interval=0.5, timeout=30): func.__name__, timeout ) ) + + +def _mock_ws(host, port, messages, delay_startup=0): + loop = asyncio.new_event_loop() + done = loop.create_future() + + async def _handler(websocket, path): + while True: + msg = await websocket.recv() + messages.append(msg) + if msg == "stop": + done.set_result(None) + break + + async def _run_server(): + await asyncio.sleep(delay_startup) + async with websockets.serve(_handler, host, port): + await done + + loop.run_until_complete(_run_server()) + loop.close() + + +@contextlib.contextmanager +def _mock_ws_thread(host, port, messages): + mock_ws_thread = threading.Thread( + target=partial(_mock_ws, messages=messages), + args=( + host, + port, + ), + ) + mock_ws_thread.start() + yield + url = f"ws://{host}:{port}" + with Client(url) as client: + client.send("stop") + mock_ws_thread.join() + messages.pop()