Skip to content

Commit

Permalink
fileinstall: run coros in the background if loop already running
Browse files Browse the repository at this point in the history
* Allow Rose file installation to be called by code which already
  has an event loop running by scheduling coroutines to run in the
  background (i.e. schedule but don't await).
* The calling code can list these tasks using `asyncio.all_tasks()`
  and await them as appropriate.
* Addresses cylc/cylc-rose#274
  • Loading branch information
oliver-sanders committed Feb 20, 2024
1 parent 6293d65 commit be7004c
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 7 deletions.
4 changes: 4 additions & 0 deletions metomi/rose/config_processors/fileinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Process "file:*" sections in node of a metomi.rose.config_tree.ConfigTree.
"""

from contextlib import suppress
from fnmatch import fnmatch
from glob import glob
from io import BytesIO
Expand Down Expand Up @@ -106,6 +107,9 @@ def process(
finally:
if cwd != os.getcwd():
self.manager.fs_util.chdir(cwd)
if loc_dao.conn:
with suppress(Exception):
loc_dao.conn.close()

def _process(self, conf_tree, nodes, loc_dao, **kwargs):
"""Helper for self.process."""
Expand Down
36 changes: 29 additions & 7 deletions metomi/rose/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,17 @@
"""A multiprocessing runner of jobs with dependencies."""

import asyncio

from metomi.rose.reporter import Event


# set containing references to "background" coroutines that are not referenced
# from any code (i.e. are not directly awaited), adding them to this list
# avoids the potential for garbage collection to delete them whilst they are
# running
_BACKGROUND_TASKS = set()


class JobEvent(Event):
"""Event raised when a job completes."""

Expand Down Expand Up @@ -175,19 +183,33 @@ def run(self, job_manager, *args, concurrency=6):
The maximum number of jobs to run concurrently.
"""
running = []
loop = asyncio.get_event_loop()
loop.set_exception_handler(self.job_processor.handle_event)
loop.run_until_complete(
asyncio.gather(
self._run_jobs(running, job_manager, args, concurrency),
self._post_process_jobs(running, job_manager, args),
)
)
coro = self._run(job_manager, *args, concurrency=concurrency)
try:
# event loop is not running (e.g. rose CLI use)
loop.run_until_complete(coro)
except RuntimeError:
# event loop is already running (e.g. cylc CLI use)
# WARNING: this starts the file installation running, but it
# doesn't wait for it to finish, that's your problem :(
task = loop.create_task(coro)
# reference this task from a global variable to prevent it from
# being garbage collected
_BACKGROUND_TASKS.add(task)
# tidy up afterwards
task.add_done_callback(_BACKGROUND_TASKS.discard)
dead_jobs = job_manager.get_dead_jobs()
if dead_jobs:
raise JobRunnerNotCompletedError(dead_jobs)

async def _run(self, job_manager, *args, concurrency=6):
running = []
await asyncio.gather(
self._run_jobs(running, job_manager, args, concurrency),
self._post_process_jobs(running, job_manager, args),
)

async def _run_jobs(self, running, job_manager, args, concurrency):
"""Run pending jobs subject to the concurrency limit.
Expand Down

0 comments on commit be7004c

Please sign in to comment.