From 9a8317f771ac2389e895abd4ce5727bd5074e318 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Wed, 6 Dec 2023 14:19:37 +0000 Subject: [PATCH] post_configure: ensure asyncio tasks created by plugins are awaited * Await any background tasks started by a plugin before continuing. * Addresses https://github.com/cylc/cylc-rose/issues/274 * Centralise the plugin loading/running/reporting logic. * Fix the installation test for back-compat-mode. --- cylc/flow/async_util.py | 46 +++++++++ cylc/flow/parsec/fileparse.py | 26 ++---- cylc/flow/plugins.py | 77 +++++++++++++++ cylc/flow/scheduler_cli.py | 16 ++-- cylc/flow/scripts/install.py | 72 ++++++-------- cylc/flow/scripts/reinstall.py | 66 ++++--------- cylc/flow/scripts/validate.py | 8 +- cylc/flow/scripts/validate_install_play.py | 29 +++--- cylc/flow/scripts/validate_reinstall.py | 2 +- .../cylc-combination-scripts/01-vr-reload.t | 2 +- .../cylc-combination-scripts/02-vr-restart.t | 2 +- .../04-vr-fail-validate.t | 2 +- .../05-vr-fail-is-running.t | 5 +- tests/integration/conftest.py | 4 +- .../scripts/test_validate_integration.py | 4 +- tests/integration/test_get_old_tvars.py | 2 +- tests/integration/test_install.py | 77 ++++++++------- tests/integration/test_reinstall.py | 93 ++++++++++++++++++- tests/integration/utils/entry_points.py | 30 ++++++ tests/unit/plugins/test_pre_configure.py | 39 +++++--- 20 files changed, 401 insertions(+), 201 deletions(-) create mode 100644 cylc/flow/plugins.py create mode 100644 tests/integration/utils/entry_points.py diff --git a/cylc/flow/async_util.py b/cylc/flow/async_util.py index 1e103615b33..478c41dc297 100644 --- a/cylc/flow/async_util.py +++ b/cylc/flow/async_util.py @@ -16,6 +16,7 @@ """Utilities for use with asynchronous code.""" import asyncio +from contextlib import asynccontextmanager from functools import partial, wraps from inspect import signature import os @@ -478,3 +479,48 @@ async def _fcn(*args, executor=None, **kwargs): async_listdir = make_async(os.listdir) + + +@asynccontextmanager +async def async_block(): + """Ensure all tasks started within the context are awaited when it closes. + + Normally, you would await a task e.g: + + await three() + + If it's possible to await the task, do that, however, this isn't always an + option. This interface exists is to help patch over issues where async code + (one) calls sync code (two) which calls async code (three) e.g: + + async def one(): + two() + + def two(): + # this breaks - event loop is already running + asyncio.get_event_loop().run_until_complete(three()) + + async def three(): + await asyncio.sleep(1) + + This code will error because you can't nest asyncio (without nest-asyncio) + which means you can schedule tasks the tasks in "two", but you can't await + them. + + def two(): + # this works, but it doesn't wait for three() to complete + asyncio.create_task(three()) + + This interface allows you to await the tasks + + async def one() + async with async_block(): + two() + # any tasks two() started will have been awaited by now + """ + # make a list of all tasks running before we enter the context manager + tasks_before = asyncio.all_tasks() + # run the user code + yield + # await any new tasks + await asyncio.gather(*(asyncio.all_tasks() - tasks_before)) diff --git a/cylc/flow/parsec/fileparse.py b/cylc/flow/parsec/fileparse.py index 42e8a4aa40b..5a1d1747692 100644 --- a/cylc/flow/parsec/fileparse.py +++ b/cylc/flow/parsec/fileparse.py @@ -36,14 +36,14 @@ import sys import typing as t -from cylc.flow import __version__, iter_entry_points +from cylc.flow import __version__ from cylc.flow import LOG -from cylc.flow.exceptions import PluginError from cylc.flow.parsec.exceptions import ( FileParseError, ParsecError, TemplateVarLanguageClash ) -from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults from cylc.flow.parsec.include import inline +from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults +from cylc.flow.plugins import run_plugins from cylc.flow.parsec.util import itemstr from cylc.flow.templatevars import get_template_vars_from_db from cylc.flow.workflow_files import ( @@ -271,23 +271,11 @@ def process_plugins(fpath, opts): } # Run entry point pre_configure items, trying to merge values with each.: - for entry_point in iter_entry_points( - 'cylc.pre_configure' + for entry_point, plugin_result in run_plugins( + 'cylc.pre_configure', + srcdir=fpath, + opts=opts, ): - try: - # If you want it to work on sourcedirs you need to get the options - # to here. - plugin_result = entry_point.load()( - srcdir=fpath, opts=opts - ) - except Exception as exc: - # NOTE: except Exception (purposefully vague) - # this is to separate plugin from core Cylc errors - raise PluginError( - 'cylc.pre_configure', - entry_point.name, - exc - ) from None for section in ['env', 'template_variables']: if section in plugin_result and plugin_result[section] is not None: # Raise error if multiple plugins try to update the same keys. diff --git a/cylc/flow/plugins.py b/cylc/flow/plugins.py new file mode 100644 index 00000000000..f291b2697d3 --- /dev/null +++ b/cylc/flow/plugins.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 + +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Common functionality related to the loading and calling of plugins.""" + +from time import time + +from cylc.flow import LOG, iter_entry_points +from cylc.flow.exceptions import PluginError +import cylc.flow.flags + + +def get_entry_points(plugin_namespace): + """Yield all installed plugin entry points which match the given name.""" + yield from iter_entry_points(plugin_namespace) + + +def run_plugins(plugin_namespace, *args, **kwargs): + """Run all installed plugins for the given namespace. + + This runs plugins in series, yielding the results one by one. + + Args: + plugin_namespace: + The entry point namespace for the plugins to run, + e.g. "cylc.post_install". + args: + Any arguments to call plugins with. + kwargs: + Any kwargs to call plugins with. + + Yields: + (entry_point, plugin_result) + + Warning: + Remember to wrap "cylc.post_install" plugins with + "cylc.flow.async_util.async_block". + + See https://github.com/cylc/cylc-rose/issues/274 + + """ + for entry_point in get_entry_points(plugin_namespace): + try: + meth = entry_point.load() + start_time = time() + plugin_result = meth(*args, **kwargs) + LOG.debug( + f'ran {entry_point.name} in {time() - start_time:0.05f}s' + ) + yield entry_point, plugin_result + except Exception as exc: + # NOTE: except Exception (purposefully vague) + # this is to separate plugin from core Cylc errors + if cylc.flow.flags.verbosity > 1: + # raise the full exception in debug mode + raise + # raise a user-friendly exception + raise PluginError( + plugin_namespace, + entry_point.name, + exc + ) from None diff --git a/cylc/flow/scheduler_cli.py b/cylc/flow/scheduler_cli.py index 61fd408089b..785f8e1eb7f 100644 --- a/cylc/flow/scheduler_cli.py +++ b/cylc/flow/scheduler_cli.py @@ -369,6 +369,12 @@ async def scheduler_cli( functionality. """ + if options.starttask: + options.starttask = upgrade_legacy_ids( + *options.starttask, + relative=True, + ) + # Parse workflow name but delay Cylc 7 suite.rc deprecation warning # until after the start-up splash is printed. # TODO: singleton @@ -651,14 +657,4 @@ async def _run(scheduler: Scheduler) -> int: @cli_function(get_option_parser) def play(parser: COP, options: 'Values', id_: str): """Implement cylc play.""" - return _play(parser, options, id_) - - -def _play(parser: COP, options: 'Values', id_: str): - """Allows compound scripts to import play, but supply their own COP.""" - if options.starttask: - options.starttask = upgrade_legacy_ids( - *options.starttask, - relative=True, - ) return asyncio.run(scheduler_cli(options, id_)) diff --git a/cylc/flow/scripts/install.py b/cylc/flow/scripts/install.py index be9583137e9..f8f9fff70a5 100755 --- a/cylc/flow/scripts/install.py +++ b/cylc/flow/scripts/install.py @@ -86,14 +86,9 @@ from pathlib import Path from typing import Any, Dict, Optional, Tuple -from cylc.flow.scripts.scan import ( - get_pipe, - _format_plain, - FLOW_STATE_SYMBOLS, - FLOW_STATE_CMAP -) -from cylc.flow import LOG, iter_entry_points -from cylc.flow.exceptions import PluginError, InputError +from cylc.flow import LOG +from cylc.flow.async_util import async_block +from cylc.flow.exceptions import InputError from cylc.flow.loggingutil import CylcLogFormatter, set_timestamps from cylc.flow.option_parsers import ( CylcOptionParser as COP, @@ -105,12 +100,19 @@ expand_path, get_workflow_run_dir ) +from cylc.flow.plugins import run_plugins from cylc.flow.install import ( install_workflow, parse_cli_sym_dirs, search_install_source_dirs, check_deprecation, ) +from cylc.flow.scripts.scan import ( + get_pipe, + _format_plain, + FLOW_STATE_SYMBOLS, + FLOW_STATE_CMAP +) from cylc.flow.terminal import cli_function @@ -268,22 +270,20 @@ def main( id_: Optional[str] = None ) -> None: """CLI wrapper.""" - install_cli(opts, id_) + asyncio.run(install_cli(opts, id_)) -def install_cli( +async def install_cli( opts: 'Values', id_: Optional[str] = None ) -> Tuple[str, str]: """Install workflow and scan for already-running instances.""" - wf_name, wf_id = install(opts, id_) - asyncio.run( - scan(wf_name, not opts.no_ping) - ) + wf_name, wf_id = await install(opts, id_) + await scan(wf_name, not opts.no_ping) return wf_name, wf_id -def install( +async def install( opts: 'Values', id_: Optional[str] = None ) -> Tuple[str, str]: set_timestamps(LOG, opts.log_timestamp and opts.verbosity > 1) @@ -297,19 +297,12 @@ def install( # for compatibility mode: check_deprecation(source) - for entry_point in iter_entry_points( - 'cylc.pre_configure' + for _entry_point, _plugin_result in run_plugins( + 'cylc.pre_configure', + srcdir=source, + opts=opts, ): - try: - entry_point.load()(srcdir=source, opts=opts) - except Exception as exc: - # NOTE: except Exception (purposefully vague) - # this is to separate plugin from core Cylc errors - raise PluginError( - 'cylc.pre_configure', - entry_point.name, - exc - ) from None + pass cli_symdirs: Optional[Dict[str, Dict[str, Any]]] = None if opts.symlink_dirs == '': @@ -325,23 +318,14 @@ def install( cli_symlink_dirs=cli_symdirs ) - for entry_point in iter_entry_points( - 'cylc.post_install' - ): - try: - entry_point.load()( - srcdir=source_dir, - opts=opts, - rundir=str(rundir) - ) - except Exception as exc: - # NOTE: except Exception (purposefully vague) - # this is to separate plugin from core Cylc errors - raise PluginError( - 'cylc.post_install', - entry_point.name, - exc - ) from None + async with async_block(): + for _entry_point, _plugin_result in run_plugins( + 'cylc.post_install', + srcdir=source_dir, + opts=opts, + rundir=str(rundir), + ): + pass print(f'INSTALLED {workflow_id} from {source_dir}') diff --git a/cylc/flow/scripts/reinstall.py b/cylc/flow/scripts/reinstall.py index a2acbc7f771..006fabcfaa4 100644 --- a/cylc/flow/scripts/reinstall.py +++ b/cylc/flow/scripts/reinstall.py @@ -75,9 +75,8 @@ from ansimarkup import parse as cparse -from cylc.flow import iter_entry_points +from cylc.flow.async_util import async_block from cylc.flow.exceptions import ( - PluginError, ServiceFileError, WorkflowFilesError, ) @@ -90,8 +89,8 @@ OptionSettings, ID_MULTI_ARG_DOC ) - from cylc.flow.pathutil import get_workflow_run_dir +from cylc.flow.plugins import run_plugins from cylc.flow.workflow_files import ( get_workflow_source_dir, load_contact_file, @@ -198,7 +197,7 @@ async def reinstall_cli( if is_terminal() and not opts.skip_interactive: # interactive mode - perform dry-run and prompt # dry-mode reinstall - if not reinstall( + if not await reinstall( opts, workflow_id, source, @@ -231,7 +230,7 @@ async def reinstall_cli( if usr == 'y': # reinstall for real - reinstall(opts, workflow_id, source, run_dir, dry_run=False) + await reinstall(opts, workflow_id, source, run_dir, dry_run=False) print(cparse('Successfully reinstalled.')) if print_reload_tip: display_cylc_reload_tip(workflow_id) @@ -245,7 +244,7 @@ async def reinstall_cli( return False -def reinstall( +async def reinstall( opts: 'Values', workflow_id: str, src_dir: Path, @@ -273,7 +272,12 @@ def reinstall( # run pre_configure plugins if not dry_run: # don't run plugins in dry-mode - pre_configure(opts, src_dir) + for _entry_point, _plugin_result in run_plugins( + 'cylc.pre_configure', + srcdir=src_dir, + opts=opts, + ): + pass # reinstall from src_dir (will raise WorkflowFilesError on error) stdout: str = reinstall_workflow( @@ -295,7 +299,14 @@ def reinstall( # run post_install plugins if not dry_run: # don't run plugins in dry-mode - post_install(opts, src_dir, run_dir) + async with async_block(): + for _entry_point, _plugin_result in run_plugins( + 'cylc.post_install', + srcdir=src_dir, + opts=opts, + rundir=str(run_dir), + ): + pass return True @@ -338,45 +349,6 @@ def format_rsync_out(out: str) -> List[str]: return lines -def pre_configure(opts: 'Values', src_dir: Path) -> None: - """Run pre_configure plugins.""" - # don't run plugins in dry-mode - for entry_point in iter_entry_points( - 'cylc.pre_configure' - ): - try: - entry_point.load()(srcdir=src_dir, opts=opts) - except Exception as exc: - # NOTE: except Exception (purposefully vague) - # this is to separate plugin from core Cylc errors - raise PluginError( - 'cylc.pre_configure', - entry_point.name, - exc - ) from None - - -def post_install(opts: 'Values', src_dir: Path, run_dir: Path) -> None: - """Run post_install plugins.""" - for entry_point in iter_entry_points( - 'cylc.post_install' - ): - try: - entry_point.load()( - srcdir=src_dir, - opts=opts, - rundir=str(run_dir) - ) - except Exception as exc: - # NOTE: except Exception (purposefully vague) - # this is to separate plugin from core Cylc errors - raise PluginError( - 'cylc.post_install', - entry_point.name, - exc - ) from None - - def display_rose_warning(src_dir: Path) -> None: """Explain why rose installed files are marked as deleted.""" if (src_dir / 'rose-suite.conf').is_file(): diff --git a/cylc/flow/scripts/validate.py b/cylc/flow/scripts/validate.py index 7cfd42c2138..acd14537d1e 100755 --- a/cylc/flow/scripts/validate.py +++ b/cylc/flow/scripts/validate.py @@ -133,14 +133,10 @@ def get_option_parser(): @cli_function(get_option_parser) def main(parser: COP, options: 'Values', workflow_id: str) -> None: - _main(parser, options, workflow_id) + asyncio.run(run(parser, options, workflow_id)) -def _main(parser: COP, options: 'Values', workflow_id: str) -> None: - asyncio.run(wrapped_main(parser, options, workflow_id)) - - -async def wrapped_main( +async def run( parser: COP, options: 'Values', workflow_id: str ) -> None: """cylc validate CLI.""" diff --git a/cylc/flow/scripts/validate_install_play.py b/cylc/flow/scripts/validate_install_play.py index 4ad66b5c454..db4adb80340 100644 --- a/cylc/flow/scripts/validate_install_play.py +++ b/cylc/flow/scripts/validate_install_play.py @@ -28,15 +28,9 @@ """ +import asyncio import sys -from cylc.flow.scripts.validate import ( - VALIDATE_OPTIONS, - _main as validate_main -) -from cylc.flow.scripts.install import ( - INSTALL_OPTIONS, install_cli as cylc_install, get_source_location -) from cylc.flow import LOG from cylc.flow.scheduler_cli import PLAY_OPTIONS from cylc.flow.loggingutil import set_timestamps @@ -46,7 +40,16 @@ cleanup_sysargv, log_subcommand, ) -from cylc.flow.scheduler_cli import _play +from cylc.flow.scheduler_cli import scheduler_cli as cylc_play +from cylc.flow.scripts.validate import ( + VALIDATE_OPTIONS, + run as cylc_validate, +) +from cylc.flow.scripts.install import ( + INSTALL_OPTIONS, + install_cli as cylc_install, + get_source_location, +) from cylc.flow.terminal import cli_function from typing import TYPE_CHECKING, Optional @@ -89,16 +92,20 @@ def get_option_parser() -> COP: @cli_function(get_option_parser) def main(parser: COP, options: 'Values', workflow_id: Optional[str] = None): + asyncio.run(run(parser, options, workflow_id)) + + +async def run(parser: COP, options: 'Values', workflow_id: Optional[str]): """Run Cylc validate - install - play in sequence.""" if not workflow_id: workflow_id = '.' orig_source = workflow_id source = get_source_location(workflow_id) log_subcommand('validate', source) - validate_main(parser, options, str(source)) + await cylc_validate(parser, options, str(source)) log_subcommand('install', source) - _, workflow_id = cylc_install(options, workflow_id) + _, workflow_id = await cylc_install(options, workflow_id) cleanup_sysargv( 'play', @@ -114,4 +121,4 @@ def main(parser: COP, options: 'Values', workflow_id: Optional[str] = None): set_timestamps(LOG, options.log_timestamp) log_subcommand(*sys.argv[1:]) - _play(parser, options, workflow_id) + await cylc_play(options, workflow_id) diff --git a/cylc/flow/scripts/validate_reinstall.py b/cylc/flow/scripts/validate_reinstall.py index 3fb665cf999..808f27cd586 100644 --- a/cylc/flow/scripts/validate_reinstall.py +++ b/cylc/flow/scripts/validate_reinstall.py @@ -62,7 +62,7 @@ from cylc.flow.scheduler_cli import PLAY_OPTIONS, scheduler_cli from cylc.flow.scripts.validate import ( VALIDATE_OPTIONS, - wrapped_main as cylc_validate + run as cylc_validate, ) from cylc.flow.scripts.reinstall import ( REINSTALL_CYLC_ROSE_OPTIONS, diff --git a/tests/functional/cylc-combination-scripts/01-vr-reload.t b/tests/functional/cylc-combination-scripts/01-vr-reload.t index 3b63c64d896..c161f19d1e8 100644 --- a/tests/functional/cylc-combination-scripts/01-vr-reload.t +++ b/tests/functional/cylc-combination-scripts/01-vr-reload.t @@ -51,6 +51,6 @@ named_grep_ok "${TEST_NAME_BASE}-it-logged-reload" \ "${WORKFLOW_RUN_DIR}/log/scheduler/log" # Clean Up. -run_ok "teardown (stop workflow)" cylc stop "${WORKFLOW_NAME}" --now --now +run_ok "${TEST_NAME_BASE}-stop" cylc stop "${WORKFLOW_NAME}" --now --now purge exit 0 diff --git a/tests/functional/cylc-combination-scripts/02-vr-restart.t b/tests/functional/cylc-combination-scripts/02-vr-restart.t index 9238c45b723..9dce297d09c 100644 --- a/tests/functional/cylc-combination-scripts/02-vr-restart.t +++ b/tests/functional/cylc-combination-scripts/02-vr-restart.t @@ -44,6 +44,6 @@ named_grep_ok "${TEST_NAME_BASE}-it-installed" "$ cylc reinstall" "VIPOUT.txt" named_grep_ok "${TEST_NAME_BASE}-it-played" "cylc play" "VIPOUT.txt" # Clean Up. -run_ok "teardown (stop workflow)" cylc stop "${WORKFLOW_NAME}" --now --now +run_ok "${TEST_NAME_BASE}-stop" cylc stop "${WORKFLOW_NAME}" --now --now purge exit 0 diff --git a/tests/functional/cylc-combination-scripts/04-vr-fail-validate.t b/tests/functional/cylc-combination-scripts/04-vr-fail-validate.t index ef819a32781..5432f6d1016 100644 --- a/tests/functional/cylc-combination-scripts/04-vr-fail-validate.t +++ b/tests/functional/cylc-combination-scripts/04-vr-fail-validate.t @@ -48,6 +48,6 @@ named_grep_ok "${TEST_NAME_BASE}-it-failed" \ # Clean Up: -run_ok "teardown (stop workflow)" cylc stop "${WORKFLOW_NAME}" --now --now +run_ok "${TEST_NAME_BASE}-stop" cylc stop "${WORKFLOW_NAME}" --now --now purge exit 0 diff --git a/tests/functional/cylc-combination-scripts/05-vr-fail-is-running.t b/tests/functional/cylc-combination-scripts/05-vr-fail-is-running.t index 876c494d4f3..90275dae037 100644 --- a/tests/functional/cylc-combination-scripts/05-vr-fail-is-running.t +++ b/tests/functional/cylc-combination-scripts/05-vr-fail-is-running.t @@ -41,6 +41,7 @@ run_ok "setup (vip)" \ # Get the workflow into an unreachable state CONTACTFILE="${RUN_DIR}/${WORKFLOW_NAME}/.service/contact" +cp "$CONTACTFILE" "${CONTACTFILE}.old" poll test -e "${CONTACTFILE}" sed -i 's@CYLC_WORKFLOW_HOST=.*@CYLC_WORKFLOW_HOST=elephantshrew@' "${CONTACTFILE}" @@ -54,7 +55,7 @@ run_fail "${TEST_NAME_BASE}-runs" cylc vr "${WORKFLOW_NAME}" grep_ok "on elephantshrew." "${TEST_NAME_BASE}-runs.stderr" # Clean Up: -sed -i "s@CYLC_WORKFLOW_HOST=elephantshrew@CYLC_WORKFLOW_HOST=$HOSTNAME@" "${CONTACTFILE}" -run_ok "teardown (stop workflow)" cylc stop "${WORKFLOW_NAME}" --now --now +mv "${CONTACTFILE}.old" "$CONTACTFILE" +run_ok "${TEST_NAME_BASE}-stop" cylc stop "${WORKFLOW_NAME}" --now --now purge exit 0 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index f2f24b09ab5..cb9102b5974 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -462,14 +462,14 @@ def install(test_dir, run_dir): Returns: Workflow id, including run directory. """ - def _inner(source, **kwargs): + async def _inner(source, **kwargs): opts = InstallOpts(**kwargs) # Note we append the source.name to the string rather than creating # a subfolder because the extra layer of directories would exceed # Cylc install's default limit. opts.workflow_name = ( f'{str(test_dir.relative_to(run_dir))}.{source.name}') - workflow_id, _ = cylc_install(opts, str(source)) + workflow_id, _ = await cylc_install(opts, str(source)) workflow_id = infer_latest_run_from_id(workflow_id) return workflow_id yield _inner diff --git a/tests/integration/scripts/test_validate_integration.py b/tests/integration/scripts/test_validate_integration.py index e271726dafb..9a17f2c7740 100644 --- a/tests/integration/scripts/test_validate_integration.py +++ b/tests/integration/scripts/test_validate_integration.py @@ -29,7 +29,7 @@ async def test_validate_against_source_checks_source( """Validation fails if validating against source with broken config. """ src_dir = workflow_source(one_conf) - workflow_id = install(src_dir) + workflow_id = await install(src_dir) # Check that the original installation validates OK: validate(workflow_id, against_source=True) @@ -67,7 +67,7 @@ async def test_validate_against_source_gets_old_tvars( } }) - wf_id = install(src_dir) + wf_id = await install(src_dir) installed_dir = run_dir / wf_id # Check that the original installation validates OK: diff --git a/tests/integration/test_get_old_tvars.py b/tests/integration/test_get_old_tvars.py index cd1ee2f118d..1a2ee67fe23 100644 --- a/tests/integration/test_get_old_tvars.py +++ b/tests/integration/test_get_old_tvars.py @@ -20,7 +20,7 @@ from cylc.flow.option_parsers import Options from cylc.flow.scripts.validate import ( - wrapped_main as validate, + run as validate, get_option_parser as validate_gop ) from cylc.flow.scripts.view import ( diff --git a/tests/integration/test_install.py b/tests/integration/test_install.py index f681c5a2a01..5a9298c4a8f 100644 --- a/tests/integration/test_install.py +++ b/tests/integration/test_install.py @@ -18,8 +18,7 @@ import pytest from pathlib import Path - -from .test_scan import init_flows +from typing import Callable, Tuple from cylc.flow.async_util import pipe from cylc.flow.scripts import scan @@ -29,7 +28,8 @@ install_cli ) -from typing import Callable, Tuple +from .test_scan import init_flows +from .utils.entry_points import EntryPointWrapper SRV_DIR = Path(WorkflowFiles.Service.DIRNAME) CONTACT = Path(WorkflowFiles.Service.CONTACT) @@ -97,8 +97,8 @@ def src_run_dirs( return tmp_src_path, tmp_run_path -def test_install_scan_no_ping( - src_run_dirs: Callable, +async def test_install_scan_no_ping( + src_run_dirs: Tuple[Path, Path], capsys: pytest.CaptureFixture, caplog: pytest.LogCaptureFixture ) -> None: @@ -110,21 +110,21 @@ def test_install_scan_no_ping( opts = InstallOptions() opts.no_ping = True - install_cli(opts, id_='w1') + await install_cli(opts, id_='w1') out = capsys.readouterr().out assert INSTALLED_MSG.format(wfrun='w1/run2') in out assert WF_ACTIVE_MSG.format(wf='w1') in out # Empty contact file faked with "touch": assert f"{BAD_CONTACT_MSG} w1/run1" in caplog.text - install_cli(opts, id_='w2') + await install_cli(opts, id_='w2') out = capsys.readouterr().out assert WF_ACTIVE_MSG.format(wf='w2') not in out assert INSTALLED_MSG.format(wfrun='w2/run1') in out -def test_install_scan_ping( - src_run_dirs: Callable, +async def test_install_scan_ping( + src_run_dirs: Tuple[Path, Path], capsys: pytest.CaptureFixture, caplog: pytest.LogCaptureFixture, patch_graphql_query: Callable @@ -136,7 +136,7 @@ def test_install_scan_ping( opts = InstallOptions() opts.no_ping = False - install_cli(opts, id_='w1') + await install_cli(opts, id_='w1') out = capsys.readouterr().out assert INSTALLED_MSG.format(wfrun='w1/run2') in out assert WF_ACTIVE_MSG.format(wf='w1') in out @@ -144,47 +144,52 @@ def test_install_scan_ping( # Empty contact file faked with "touch": assert f"{BAD_CONTACT_MSG} w1/run1" in caplog.text - install_cli(opts, id_='w2') + await install_cli(opts, id_='w2') out = capsys.readouterr().out assert INSTALLED_MSG.format(wfrun='w2/run1') in out assert WF_ACTIVE_MSG.format(wf='w2') not in out -def test_install_gets_back_compat_mode_for_plugins( - src_run_dirs: Callable, +async def test_install_gets_back_compat_mode_for_plugins( + src_run_dirs: Tuple[Path, Path], monkeypatch: pytest.MonkeyPatch, + capcall, capsys: pytest.CaptureFixture, ): - """Assert that pre cylc install will detect whether a workflow + """Assert that cylc install will detect whether a workflow should use back compat mode _before_ running pre_configure plugins so that those plugins can use that information. """ - class failIfDeprecated: + # track calls of the check_deprecation method + # (this is the thing that sets cylc.flow.flags.back_compat) + check_deprecation_calls = capcall( + 'cylc.flow.scripts.install.check_deprecation' + ) + + @EntryPointWrapper + def failIfDeprecated(*args, **kwargs): """A fake Cylc Plugin entry point""" - @staticmethod - def load(): - return failIfDeprecated.raiser - - @staticmethod - def raiser(*_, **__): - import cylc.flow.flags - if cylc.flow.flags.cylc7_back_compat: - print('Plugin:True') - return True - print('Plugin:False') - return False + nonlocal check_deprecation_calls + # print the number of times the check_deprecation method has been + # called + print(f'CALLS={len(check_deprecation_calls)}') + # return a blank result + return { + 'env': {}, + 'template_variables': {}, + } # Monkeypatch our fake entry point into iter_entry_points: monkeypatch.setattr( - 'cylc.flow.scripts.install.iter_entry_points', - lambda x: [failIfDeprecated] + 'cylc.flow.plugins.iter_entry_points', + lambda namespace: ( + [failIfDeprecated] if namespace == 'cylc.pre_configure' else [] + ) ) - opts = InstallOptions() - monkeypatch.setattr('cylc.flow.flags.cylc7_back_compat', False) - install_cli(opts, id_='w1') - assert capsys.readouterr()[0].split('\n')[0] == 'Plugin:False' + # install the workflow + opts = InstallOptions() + await install_cli(opts, id_='w1') - monkeypatch.setattr('cylc.flow.flags.cylc7_back_compat', True) - install_cli(opts, id_='w1') - assert capsys.readouterr()[0].split('\n')[0] == 'Plugin:True' + # ensure the check_deprecation method was called before the plugin was run + assert 'CALLS=1' in capsys.readouterr()[0] diff --git a/tests/integration/test_reinstall.py b/tests/integration/test_reinstall.py index 1b965f18b8e..b2acd7abae6 100644 --- a/tests/integration/test_reinstall.py +++ b/tests/integration/test_reinstall.py @@ -16,10 +16,11 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import asyncio +from contextlib import asynccontextmanager from pathlib import Path from types import SimpleNamespace from uuid import uuid1 -from functools import partial import pytest @@ -34,11 +35,11 @@ get_option_parser as reinstall_gop, reinstall_cli, ) -from cylc.flow.terminal import cli_function from cylc.flow.workflow_files import ( WorkflowFiles, ) -from cylc.flow.network.multi import call_multi + +from .utils.entry_points import EntryPointWrapper ReInstallOptions = Options(reinstall_gop()) @@ -299,3 +300,89 @@ async def test_rsync_fail(one_src, one_run, mock_glbl_cfg, non_interactive): with pytest.raises(WorkflowFilesError) as exc_ctx: await reinstall_cli(opts=ReInstallOptions(), workflow_id=one_run.id) assert 'An error occurred reinstalling' in str(exc_ctx.value) + + +@pytest.fixture +def my_install_plugin(monkeypatch): + """This configures a single post_install plugin. + + The plugin starts an async task, then returns. + """ + progress = [] + + @EntryPointWrapper + def post_install_basic(*_, **__): + """Simple plugin that returns one env var and one template var.""" + nonlocal progress + + async def my_async(): + # the async task + nonlocal progress + await asyncio.sleep(2) + progress.append('end') + + # start the async task + progress.append('start') + asyncio.get_event_loop().create_task(my_async()) + progress.append('return') + + # return a blank result + return { + 'env': {}, + 'template_variables': {}, + } + + monkeypatch.setattr( + 'cylc.flow.plugins.iter_entry_points', + lambda namespace: ( + [post_install_basic] if namespace == 'cylc.post_install' else [] + ) + ) + + return progress + + +async def test_async_block( + one_src, + one_run, + my_install_plugin, + monkeypatch, +): + """Ensure async tasks created by post_install plugins are awaited. + + The cylc-rose plugin may create asyncio tasks when run but cannot await + them (because it isn't async itself). To get around this we have + "cylc reinstall" use "async_block" which detects tasks created in the + background and awaits them. + + This test ensures that the async_block mechanism is correctly plugged in + to "cylc reinstall". + + See https://github.com/cylc/cylc-rose/issues/274 + """ + # this is what it should do + (one_src.path / 'a').touch() # give it something to install + assert my_install_plugin == [] + await reinstall_cli(opts=ReInstallOptions(), workflow_id=one_run.id) + # the presence of "end" means that the task was awaited + assert my_install_plugin == ['start', 'return', 'end'] + + # substitute the "async_block" (which waits for asyncio tasks started in + # the background) for a fake implementation (which doesn't) + + @asynccontextmanager + async def async_block(): + yield + + monkeypatch.setattr( + 'cylc.flow.scripts.reinstall.async_block', + async_block, + ) + + # this is would it would have done without async block + (one_src.path / 'b').touch() # give it something else to install + my_install_plugin.clear() + assert my_install_plugin == [] + await reinstall_cli(opts=ReInstallOptions(), workflow_id=one_run.id) + # the absence of "end" means that the task was not awaited + assert my_install_plugin == ['start', 'return'] diff --git a/tests/integration/utils/entry_points.py b/tests/integration/utils/entry_points.py new file mode 100644 index 00000000000..3430f74631b --- /dev/null +++ b/tests/integration/utils/entry_points.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 + +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Utilities for working with entry points.""" + + +class EntryPointWrapper: + """Wraps a method to make it look like an entry point.""" + + def __init__(self, fcn): + self.name = fcn.__name__ + self.fcn = fcn + + def load(self): + return self.fcn diff --git a/tests/unit/plugins/test_pre_configure.py b/tests/unit/plugins/test_pre_configure.py index e9f91054e12..87ecfda0e41 100644 --- a/tests/unit/plugins/test_pre_configure.py +++ b/tests/unit/plugins/test_pre_configure.py @@ -65,9 +65,12 @@ def pre_configure_error(*_, **__): def test_pre_configure(monkeypatch): """It should call the plugin.""" monkeypatch.setattr( - 'cylc.flow.parsec.fileparse.iter_entry_points', - lambda x: [pre_configure_basic] + 'cylc.flow.plugins.iter_entry_points', + lambda namespace: ( + [pre_configure_basic] if namespace == 'cylc.pre_configure' else [] + ) ) + extra_vars = process_plugins(None, None) assert extra_vars == { 'env': { @@ -83,11 +86,13 @@ def test_pre_configure(monkeypatch): def test_pre_configure_duplicate(monkeypatch): """It should error when plugins clash.""" monkeypatch.setattr( - 'cylc.flow.parsec.fileparse.iter_entry_points', - lambda x: [ - pre_configure_basic, - pre_configure_basic - ] + 'cylc.flow.plugins.iter_entry_points', + lambda namespace: ( + [ + pre_configure_basic, + pre_configure_basic, + ] if namespace == 'cylc.pre_configure' else [] + ) ) with pytest.raises(ParsecError): process_plugins(None, None) @@ -96,11 +101,13 @@ def test_pre_configure_duplicate(monkeypatch): def test_pre_configure_templating_detected(monkeypatch): """It should error when plugins clash (for templating).""" monkeypatch.setattr( - 'cylc.flow.parsec.fileparse.iter_entry_points', - lambda x: [ - pre_configure_templating_detected, - pre_configure_templating_detected - ] + 'cylc.flow.plugins.iter_entry_points', + lambda namespace: ( + [ + pre_configure_templating_detected, + pre_configure_templating_detected, + ] if namespace == 'cylc.pre_configure' else [] + ) ) with pytest.raises(ParsecError): process_plugins(None, None) @@ -109,8 +116,12 @@ def test_pre_configure_templating_detected(monkeypatch): def test_pre_configure_exception(monkeypatch): """It should wrap plugin errors.""" monkeypatch.setattr( - 'cylc.flow.parsec.fileparse.iter_entry_points', - lambda x: [pre_configure_error] + 'cylc.flow.plugins.iter_entry_points', + lambda namespace: ( + [ + pre_configure_error, + ] if namespace == 'cylc.pre_configure' else [] + ) ) with pytest.raises(PluginError) as exc_ctx: process_plugins(None, None)