Skip to content
This repository has been archived by the owner on Jul 19, 2021. It is now read-only.

Commit

Permalink
Make legacy ensemble members connect through websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
mortalisk authored and sondreso committed Apr 7, 2021
1 parent 7d29fdf commit 47c940c
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 133 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_and_deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
fail-fast: false
matrix:
os: ['ubuntu-latest', 'macos-latest']
python: ['3.6', '3.7', '3.8', '3.9']
python: ['3.6', '3.7', '3.8']

runs-on: ${{ matrix.os }}

Expand Down
7 changes: 4 additions & 3 deletions python/job_runner/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
from job_runner import JOBS_FILE


def _setup_reporters(is_interactive_run, ee_id):
def _setup_reporters(is_interactive_run, ee_id, evaluator_url):
reporters = []
if is_interactive_run:
reporters.append(reporting.Interactive())
elif ee_id:
reporters.append(reporting.File(sync_disc_timeout=0))
reporters.append(reporting.Network())
reporters.append(reporting.Event())
reporters.append(reporting.Event(evaluator_url=evaluator_url))
else:
reporters.append(reporting.File())
reporters.append(reporting.Network())
Expand Down Expand Up @@ -49,11 +49,12 @@ def main(args):
with open(JOBS_FILE, "r") as json_file:
jobs_data = json.load(json_file)
ee_id = jobs_data.get("ee_id")
evaluator_url = jobs_data.get("dispatch_url")
except ValueError as e:
raise IOError("Job Runner cli failed to load JSON-file.{}".format(str(e)))

is_interactive_run = len(parsed_args.job) > 0
reporters = _setup_reporters(is_interactive_run, ee_id)
reporters = _setup_reporters(is_interactive_run, ee_id, evaluator_url)

job_runner = JobRunner(jobs_data)

Expand Down
15 changes: 6 additions & 9 deletions python/job_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
)
from pathlib import Path

from job_runner.util.client import Client

_FM_JOB_START = "com.equinor.ert.forward_model_job.start"
_FM_JOB_RUNNING = "com.equinor.ert.forward_model_job.running"
_FM_JOB_SUCCESS = "com.equinor.ert.forward_model_job.success"
Expand All @@ -23,15 +25,14 @@ class TransitionError(ValueError):


class Event:
def __init__(self, event_log="event_log"):
self._event_log = event_log
def __init__(self, evaluator_url):
self._evaluator_url = evaluator_url

self._ee_id = None
self._real_id = None
self._stage_id = None

self._initialize_state_machine()
self._clear_log()

def _initialize_state_machine(self):
initialized = (Init,)
Expand All @@ -49,10 +50,6 @@ def _initialize_state_machine(self):
}
self._state = None

def _clear_log(self):
with open(self._event_log, "w") as f:
pass

def report(self, msg):
new_state = None
for state in self._states.keys():
Expand All @@ -70,8 +67,8 @@ def report(self, msg):
self._state = new_state

def _dump_event(self, event):
with open(self._event_log, "a") as el:
el.write("{}\n".format(to_json(event).decode()))
with Client(self._evaluator_url) as client:
client.send(to_json(event).decode())

def _step_path(self):
return f"/ert/ee/{self._ee_id}/real/{self._real_id}/stage/{self._stage_id}/step/{0}"
Expand Down
59 changes: 59 additions & 0 deletions python/job_runner/util/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import websockets
from websockets import ConnectionClosedOK
import asyncio
import cloudevents
from websockets.exceptions import ConnectionClosed


class Client:
def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
if self.websocket is not None:
self.loop.run_until_complete(self.websocket.close())
self.loop.close()

def __init__(self, url, max_retries=10, timeout_multiplier=5):
if url is None:
raise ValueError("url was None")
self.url = url
self._max_retries = max_retries
self._timeout_multiplier = timeout_multiplier
self.websocket = None
self.loop = asyncio.new_event_loop()

async def get_websocket(self):
return await websockets.connect(self.url)

async def _send(self, msg):
for retry in range(self._max_retries + 1):
try:
if self.websocket is None:
self.websocket = await self.get_websocket()
await self.websocket.send(msg)
return
except ConnectionClosedOK:
# Connection was closed no point in trying to send more messages
raise
except (ConnectionClosed, ConnectionRefusedError, OSError):
if retry == self._max_retries:
raise
await asyncio.sleep(0.2 + self._timeout_multiplier * retry)
self.websocket = None

def send(self, msg):
self.loop.run_until_complete(self._send(msg))

def send_event(self, ev_type, ev_source, ev_data=None):
if ev_data is None:
ev_data = {}
event = cloudevents.http.CloudEvent(
{
"type": ev_type,
"source": ev_source,
"datacontenttype": "application/json",
},
ev_data,
)
self.send(cloudevents.http.to_json(event).decode())
3 changes: 2 additions & 1 deletion python/res/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,13 +567,14 @@ def snapshot(self) -> typing.Optional[typing.Dict[int, str]]:
return None
return snapshot

def add_ensemble_evaluator_information_to_jobs_file(self, ee_id):
def add_ensemble_evaluator_information_to_jobs_file(self, ee_id, dispatch_url):
for q_index, q_node in enumerate(self.job_list):
with open(f"{q_node.run_path}/{JOBS_FILE}", "r+") as jobs_file:
data = json.load(jobs_file)
data["ee_id"] = ee_id
data["real_id"] = self._qindex_to_iens[q_index]
data["stage_id"] = 0
data["dispatch_url"] = dispatch_url
jobs_file.seek(0)
jobs_file.truncate()
json.dump(data, jobs_file, indent=4)
1 change: 1 addition & 0 deletions test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ decorator
pylint
black
click
pytest-asyncio
Loading

0 comments on commit 47c940c

Please sign in to comment.