diff --git a/cylc/flow/async_util.py b/cylc/flow/async_util.py
index 1e103615b33..478c41dc297 100644
--- a/cylc/flow/async_util.py
+++ b/cylc/flow/async_util.py
@@ -16,6 +16,7 @@
"""Utilities for use with asynchronous code."""
import asyncio
+from contextlib import asynccontextmanager
from functools import partial, wraps
from inspect import signature
import os
@@ -478,3 +479,48 @@ async def _fcn(*args, executor=None, **kwargs):
async_listdir = make_async(os.listdir)
+
+
+@asynccontextmanager
+async def async_block():
+ """Ensure all tasks started within the context are awaited when it closes.
+
+ Normally, you would await a task e.g:
+
+ await three()
+
+ If it's possible to await the task, do that, however, this isn't always an
+ option. This interface exists is to help patch over issues where async code
+ (one) calls sync code (two) which calls async code (three) e.g:
+
+ async def one():
+ two()
+
+ def two():
+ # this breaks - event loop is already running
+ asyncio.get_event_loop().run_until_complete(three())
+
+ async def three():
+ await asyncio.sleep(1)
+
+ This code will error because you can't nest asyncio (without nest-asyncio)
+ which means you can schedule tasks the tasks in "two", but you can't await
+ them.
+
+ def two():
+ # this works, but it doesn't wait for three() to complete
+ asyncio.create_task(three())
+
+ This interface allows you to await the tasks
+
+ async def one()
+ async with async_block():
+ two()
+ # any tasks two() started will have been awaited by now
+ """
+ # make a list of all tasks running before we enter the context manager
+ tasks_before = asyncio.all_tasks()
+ # run the user code
+ yield
+ # await any new tasks
+ await asyncio.gather(*(asyncio.all_tasks() - tasks_before))
diff --git a/cylc/flow/parsec/fileparse.py b/cylc/flow/parsec/fileparse.py
index 42e8a4aa40b..5a1d1747692 100644
--- a/cylc/flow/parsec/fileparse.py
+++ b/cylc/flow/parsec/fileparse.py
@@ -36,14 +36,14 @@
import sys
import typing as t
-from cylc.flow import __version__, iter_entry_points
+from cylc.flow import __version__
from cylc.flow import LOG
-from cylc.flow.exceptions import PluginError
from cylc.flow.parsec.exceptions import (
FileParseError, ParsecError, TemplateVarLanguageClash
)
-from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults
from cylc.flow.parsec.include import inline
+from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults
+from cylc.flow.plugins import run_plugins
from cylc.flow.parsec.util import itemstr
from cylc.flow.templatevars import get_template_vars_from_db
from cylc.flow.workflow_files import (
@@ -271,23 +271,11 @@ def process_plugins(fpath, opts):
}
# Run entry point pre_configure items, trying to merge values with each.:
- for entry_point in iter_entry_points(
- 'cylc.pre_configure'
+ for entry_point, plugin_result in run_plugins(
+ 'cylc.pre_configure',
+ srcdir=fpath,
+ opts=opts,
):
- try:
- # If you want it to work on sourcedirs you need to get the options
- # to here.
- plugin_result = entry_point.load()(
- srcdir=fpath, opts=opts
- )
- except Exception as exc:
- # NOTE: except Exception (purposefully vague)
- # this is to separate plugin from core Cylc errors
- raise PluginError(
- 'cylc.pre_configure',
- entry_point.name,
- exc
- ) from None
for section in ['env', 'template_variables']:
if section in plugin_result and plugin_result[section] is not None:
# Raise error if multiple plugins try to update the same keys.
diff --git a/cylc/flow/plugins.py b/cylc/flow/plugins.py
new file mode 100644
index 00000000000..f291b2697d3
--- /dev/null
+++ b/cylc/flow/plugins.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python3
+
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Common functionality related to the loading and calling of plugins."""
+
+from time import time
+
+from cylc.flow import LOG, iter_entry_points
+from cylc.flow.exceptions import PluginError
+import cylc.flow.flags
+
+
+def get_entry_points(plugin_namespace):
+ """Yield all installed plugin entry points which match the given name."""
+ yield from iter_entry_points(plugin_namespace)
+
+
+def run_plugins(plugin_namespace, *args, **kwargs):
+ """Run all installed plugins for the given namespace.
+
+ This runs plugins in series, yielding the results one by one.
+
+ Args:
+ plugin_namespace:
+ The entry point namespace for the plugins to run,
+ e.g. "cylc.post_install".
+ args:
+ Any arguments to call plugins with.
+ kwargs:
+ Any kwargs to call plugins with.
+
+ Yields:
+ (entry_point, plugin_result)
+
+ Warning:
+ Remember to wrap "cylc.post_install" plugins with
+ "cylc.flow.async_util.async_block".
+
+ See https://github.com/cylc/cylc-rose/issues/274
+
+ """
+ for entry_point in get_entry_points(plugin_namespace):
+ try:
+ meth = entry_point.load()
+ start_time = time()
+ plugin_result = meth(*args, **kwargs)
+ LOG.debug(
+ f'ran {entry_point.name} in {time() - start_time:0.05f}s'
+ )
+ yield entry_point, plugin_result
+ except Exception as exc:
+ # NOTE: except Exception (purposefully vague)
+ # this is to separate plugin from core Cylc errors
+ if cylc.flow.flags.verbosity > 1:
+ # raise the full exception in debug mode
+ raise
+ # raise a user-friendly exception
+ raise PluginError(
+ plugin_namespace,
+ entry_point.name,
+ exc
+ ) from None
diff --git a/cylc/flow/scheduler_cli.py b/cylc/flow/scheduler_cli.py
index 61fd408089b..785f8e1eb7f 100644
--- a/cylc/flow/scheduler_cli.py
+++ b/cylc/flow/scheduler_cli.py
@@ -369,6 +369,12 @@ async def scheduler_cli(
functionality.
"""
+ if options.starttask:
+ options.starttask = upgrade_legacy_ids(
+ *options.starttask,
+ relative=True,
+ )
+
# Parse workflow name but delay Cylc 7 suite.rc deprecation warning
# until after the start-up splash is printed.
# TODO: singleton
@@ -651,14 +657,4 @@ async def _run(scheduler: Scheduler) -> int:
@cli_function(get_option_parser)
def play(parser: COP, options: 'Values', id_: str):
"""Implement cylc play."""
- return _play(parser, options, id_)
-
-
-def _play(parser: COP, options: 'Values', id_: str):
- """Allows compound scripts to import play, but supply their own COP."""
- if options.starttask:
- options.starttask = upgrade_legacy_ids(
- *options.starttask,
- relative=True,
- )
return asyncio.run(scheduler_cli(options, id_))
diff --git a/cylc/flow/scripts/install.py b/cylc/flow/scripts/install.py
index be9583137e9..f8f9fff70a5 100755
--- a/cylc/flow/scripts/install.py
+++ b/cylc/flow/scripts/install.py
@@ -86,14 +86,9 @@
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
-from cylc.flow.scripts.scan import (
- get_pipe,
- _format_plain,
- FLOW_STATE_SYMBOLS,
- FLOW_STATE_CMAP
-)
-from cylc.flow import LOG, iter_entry_points
-from cylc.flow.exceptions import PluginError, InputError
+from cylc.flow import LOG
+from cylc.flow.async_util import async_block
+from cylc.flow.exceptions import InputError
from cylc.flow.loggingutil import CylcLogFormatter, set_timestamps
from cylc.flow.option_parsers import (
CylcOptionParser as COP,
@@ -105,12 +100,19 @@
expand_path,
get_workflow_run_dir
)
+from cylc.flow.plugins import run_plugins
from cylc.flow.install import (
install_workflow,
parse_cli_sym_dirs,
search_install_source_dirs,
check_deprecation,
)
+from cylc.flow.scripts.scan import (
+ get_pipe,
+ _format_plain,
+ FLOW_STATE_SYMBOLS,
+ FLOW_STATE_CMAP
+)
from cylc.flow.terminal import cli_function
@@ -268,22 +270,20 @@ def main(
id_: Optional[str] = None
) -> None:
"""CLI wrapper."""
- install_cli(opts, id_)
+ asyncio.run(install_cli(opts, id_))
-def install_cli(
+async def install_cli(
opts: 'Values',
id_: Optional[str] = None
) -> Tuple[str, str]:
"""Install workflow and scan for already-running instances."""
- wf_name, wf_id = install(opts, id_)
- asyncio.run(
- scan(wf_name, not opts.no_ping)
- )
+ wf_name, wf_id = await install(opts, id_)
+ await scan(wf_name, not opts.no_ping)
return wf_name, wf_id
-def install(
+async def install(
opts: 'Values', id_: Optional[str] = None
) -> Tuple[str, str]:
set_timestamps(LOG, opts.log_timestamp and opts.verbosity > 1)
@@ -297,19 +297,12 @@ def install(
# for compatibility mode:
check_deprecation(source)
- for entry_point in iter_entry_points(
- 'cylc.pre_configure'
+ for _entry_point, _plugin_result in run_plugins(
+ 'cylc.pre_configure',
+ srcdir=source,
+ opts=opts,
):
- try:
- entry_point.load()(srcdir=source, opts=opts)
- except Exception as exc:
- # NOTE: except Exception (purposefully vague)
- # this is to separate plugin from core Cylc errors
- raise PluginError(
- 'cylc.pre_configure',
- entry_point.name,
- exc
- ) from None
+ pass
cli_symdirs: Optional[Dict[str, Dict[str, Any]]] = None
if opts.symlink_dirs == '':
@@ -325,23 +318,14 @@ def install(
cli_symlink_dirs=cli_symdirs
)
- for entry_point in iter_entry_points(
- 'cylc.post_install'
- ):
- try:
- entry_point.load()(
- srcdir=source_dir,
- opts=opts,
- rundir=str(rundir)
- )
- except Exception as exc:
- # NOTE: except Exception (purposefully vague)
- # this is to separate plugin from core Cylc errors
- raise PluginError(
- 'cylc.post_install',
- entry_point.name,
- exc
- ) from None
+ async with async_block():
+ for _entry_point, _plugin_result in run_plugins(
+ 'cylc.post_install',
+ srcdir=source_dir,
+ opts=opts,
+ rundir=str(rundir),
+ ):
+ pass
print(f'INSTALLED {workflow_id} from {source_dir}')
diff --git a/cylc/flow/scripts/reinstall.py b/cylc/flow/scripts/reinstall.py
index a2acbc7f771..006fabcfaa4 100644
--- a/cylc/flow/scripts/reinstall.py
+++ b/cylc/flow/scripts/reinstall.py
@@ -75,9 +75,8 @@
from ansimarkup import parse as cparse
-from cylc.flow import iter_entry_points
+from cylc.flow.async_util import async_block
from cylc.flow.exceptions import (
- PluginError,
ServiceFileError,
WorkflowFilesError,
)
@@ -90,8 +89,8 @@
OptionSettings,
ID_MULTI_ARG_DOC
)
-
from cylc.flow.pathutil import get_workflow_run_dir
+from cylc.flow.plugins import run_plugins
from cylc.flow.workflow_files import (
get_workflow_source_dir,
load_contact_file,
@@ -198,7 +197,7 @@ async def reinstall_cli(
if is_terminal() and not opts.skip_interactive:
# interactive mode - perform dry-run and prompt
# dry-mode reinstall
- if not reinstall(
+ if not await reinstall(
opts,
workflow_id,
source,
@@ -231,7 +230,7 @@ async def reinstall_cli(
if usr == 'y':
# reinstall for real
- reinstall(opts, workflow_id, source, run_dir, dry_run=False)
+ await reinstall(opts, workflow_id, source, run_dir, dry_run=False)
print(cparse('Successfully reinstalled.'))
if print_reload_tip:
display_cylc_reload_tip(workflow_id)
@@ -245,7 +244,7 @@ async def reinstall_cli(
return False
-def reinstall(
+async def reinstall(
opts: 'Values',
workflow_id: str,
src_dir: Path,
@@ -273,7 +272,12 @@ def reinstall(
# run pre_configure plugins
if not dry_run:
# don't run plugins in dry-mode
- pre_configure(opts, src_dir)
+ for _entry_point, _plugin_result in run_plugins(
+ 'cylc.pre_configure',
+ srcdir=src_dir,
+ opts=opts,
+ ):
+ pass
# reinstall from src_dir (will raise WorkflowFilesError on error)
stdout: str = reinstall_workflow(
@@ -295,7 +299,14 @@ def reinstall(
# run post_install plugins
if not dry_run:
# don't run plugins in dry-mode
- post_install(opts, src_dir, run_dir)
+ async with async_block():
+ for _entry_point, _plugin_result in run_plugins(
+ 'cylc.post_install',
+ srcdir=src_dir,
+ opts=opts,
+ rundir=str(run_dir),
+ ):
+ pass
return True
@@ -338,45 +349,6 @@ def format_rsync_out(out: str) -> List[str]:
return lines
-def pre_configure(opts: 'Values', src_dir: Path) -> None:
- """Run pre_configure plugins."""
- # don't run plugins in dry-mode
- for entry_point in iter_entry_points(
- 'cylc.pre_configure'
- ):
- try:
- entry_point.load()(srcdir=src_dir, opts=opts)
- except Exception as exc:
- # NOTE: except Exception (purposefully vague)
- # this is to separate plugin from core Cylc errors
- raise PluginError(
- 'cylc.pre_configure',
- entry_point.name,
- exc
- ) from None
-
-
-def post_install(opts: 'Values', src_dir: Path, run_dir: Path) -> None:
- """Run post_install plugins."""
- for entry_point in iter_entry_points(
- 'cylc.post_install'
- ):
- try:
- entry_point.load()(
- srcdir=src_dir,
- opts=opts,
- rundir=str(run_dir)
- )
- except Exception as exc:
- # NOTE: except Exception (purposefully vague)
- # this is to separate plugin from core Cylc errors
- raise PluginError(
- 'cylc.post_install',
- entry_point.name,
- exc
- ) from None
-
-
def display_rose_warning(src_dir: Path) -> None:
"""Explain why rose installed files are marked as deleted."""
if (src_dir / 'rose-suite.conf').is_file():
diff --git a/cylc/flow/scripts/validate.py b/cylc/flow/scripts/validate.py
index 7cfd42c2138..acd14537d1e 100755
--- a/cylc/flow/scripts/validate.py
+++ b/cylc/flow/scripts/validate.py
@@ -133,14 +133,10 @@ def get_option_parser():
@cli_function(get_option_parser)
def main(parser: COP, options: 'Values', workflow_id: str) -> None:
- _main(parser, options, workflow_id)
+ asyncio.run(run(parser, options, workflow_id))
-def _main(parser: COP, options: 'Values', workflow_id: str) -> None:
- asyncio.run(wrapped_main(parser, options, workflow_id))
-
-
-async def wrapped_main(
+async def run(
parser: COP, options: 'Values', workflow_id: str
) -> None:
"""cylc validate CLI."""
diff --git a/cylc/flow/scripts/validate_install_play.py b/cylc/flow/scripts/validate_install_play.py
index 4ad66b5c454..db4adb80340 100644
--- a/cylc/flow/scripts/validate_install_play.py
+++ b/cylc/flow/scripts/validate_install_play.py
@@ -28,15 +28,9 @@
"""
+import asyncio
import sys
-from cylc.flow.scripts.validate import (
- VALIDATE_OPTIONS,
- _main as validate_main
-)
-from cylc.flow.scripts.install import (
- INSTALL_OPTIONS, install_cli as cylc_install, get_source_location
-)
from cylc.flow import LOG
from cylc.flow.scheduler_cli import PLAY_OPTIONS
from cylc.flow.loggingutil import set_timestamps
@@ -46,7 +40,16 @@
cleanup_sysargv,
log_subcommand,
)
-from cylc.flow.scheduler_cli import _play
+from cylc.flow.scheduler_cli import scheduler_cli as cylc_play
+from cylc.flow.scripts.validate import (
+ VALIDATE_OPTIONS,
+ run as cylc_validate,
+)
+from cylc.flow.scripts.install import (
+ INSTALL_OPTIONS,
+ install_cli as cylc_install,
+ get_source_location,
+)
from cylc.flow.terminal import cli_function
from typing import TYPE_CHECKING, Optional
@@ -89,16 +92,20 @@ def get_option_parser() -> COP:
@cli_function(get_option_parser)
def main(parser: COP, options: 'Values', workflow_id: Optional[str] = None):
+ asyncio.run(run(parser, options, workflow_id))
+
+
+async def run(parser: COP, options: 'Values', workflow_id: Optional[str]):
"""Run Cylc validate - install - play in sequence."""
if not workflow_id:
workflow_id = '.'
orig_source = workflow_id
source = get_source_location(workflow_id)
log_subcommand('validate', source)
- validate_main(parser, options, str(source))
+ await cylc_validate(parser, options, str(source))
log_subcommand('install', source)
- _, workflow_id = cylc_install(options, workflow_id)
+ _, workflow_id = await cylc_install(options, workflow_id)
cleanup_sysargv(
'play',
@@ -114,4 +121,4 @@ def main(parser: COP, options: 'Values', workflow_id: Optional[str] = None):
set_timestamps(LOG, options.log_timestamp)
log_subcommand(*sys.argv[1:])
- _play(parser, options, workflow_id)
+ await cylc_play(options, workflow_id)
diff --git a/cylc/flow/scripts/validate_reinstall.py b/cylc/flow/scripts/validate_reinstall.py
index 3fb665cf999..808f27cd586 100644
--- a/cylc/flow/scripts/validate_reinstall.py
+++ b/cylc/flow/scripts/validate_reinstall.py
@@ -62,7 +62,7 @@
from cylc.flow.scheduler_cli import PLAY_OPTIONS, scheduler_cli
from cylc.flow.scripts.validate import (
VALIDATE_OPTIONS,
- wrapped_main as cylc_validate
+ run as cylc_validate,
)
from cylc.flow.scripts.reinstall import (
REINSTALL_CYLC_ROSE_OPTIONS,
diff --git a/tests/functional/cylc-combination-scripts/01-vr-reload.t b/tests/functional/cylc-combination-scripts/01-vr-reload.t
index 3b63c64d896..c161f19d1e8 100644
--- a/tests/functional/cylc-combination-scripts/01-vr-reload.t
+++ b/tests/functional/cylc-combination-scripts/01-vr-reload.t
@@ -51,6 +51,6 @@ named_grep_ok "${TEST_NAME_BASE}-it-logged-reload" \
"${WORKFLOW_RUN_DIR}/log/scheduler/log"
# Clean Up.
-run_ok "teardown (stop workflow)" cylc stop "${WORKFLOW_NAME}" --now --now
+run_ok "${TEST_NAME_BASE}-stop" cylc stop "${WORKFLOW_NAME}" --now --now
purge
exit 0
diff --git a/tests/functional/cylc-combination-scripts/02-vr-restart.t b/tests/functional/cylc-combination-scripts/02-vr-restart.t
index 9238c45b723..9dce297d09c 100644
--- a/tests/functional/cylc-combination-scripts/02-vr-restart.t
+++ b/tests/functional/cylc-combination-scripts/02-vr-restart.t
@@ -44,6 +44,6 @@ named_grep_ok "${TEST_NAME_BASE}-it-installed" "$ cylc reinstall" "VIPOUT.txt"
named_grep_ok "${TEST_NAME_BASE}-it-played" "cylc play" "VIPOUT.txt"
# Clean Up.
-run_ok "teardown (stop workflow)" cylc stop "${WORKFLOW_NAME}" --now --now
+run_ok "${TEST_NAME_BASE}-stop" cylc stop "${WORKFLOW_NAME}" --now --now
purge
exit 0
diff --git a/tests/functional/cylc-combination-scripts/04-vr-fail-validate.t b/tests/functional/cylc-combination-scripts/04-vr-fail-validate.t
index ef819a32781..5432f6d1016 100644
--- a/tests/functional/cylc-combination-scripts/04-vr-fail-validate.t
+++ b/tests/functional/cylc-combination-scripts/04-vr-fail-validate.t
@@ -48,6 +48,6 @@ named_grep_ok "${TEST_NAME_BASE}-it-failed" \
# Clean Up:
-run_ok "teardown (stop workflow)" cylc stop "${WORKFLOW_NAME}" --now --now
+run_ok "${TEST_NAME_BASE}-stop" cylc stop "${WORKFLOW_NAME}" --now --now
purge
exit 0
diff --git a/tests/functional/cylc-combination-scripts/05-vr-fail-is-running.t b/tests/functional/cylc-combination-scripts/05-vr-fail-is-running.t
index 876c494d4f3..90275dae037 100644
--- a/tests/functional/cylc-combination-scripts/05-vr-fail-is-running.t
+++ b/tests/functional/cylc-combination-scripts/05-vr-fail-is-running.t
@@ -41,6 +41,7 @@ run_ok "setup (vip)" \
# Get the workflow into an unreachable state
CONTACTFILE="${RUN_DIR}/${WORKFLOW_NAME}/.service/contact"
+cp "$CONTACTFILE" "${CONTACTFILE}.old"
poll test -e "${CONTACTFILE}"
sed -i 's@CYLC_WORKFLOW_HOST=.*@CYLC_WORKFLOW_HOST=elephantshrew@' "${CONTACTFILE}"
@@ -54,7 +55,7 @@ run_fail "${TEST_NAME_BASE}-runs" cylc vr "${WORKFLOW_NAME}"
grep_ok "on elephantshrew." "${TEST_NAME_BASE}-runs.stderr"
# Clean Up:
-sed -i "s@CYLC_WORKFLOW_HOST=elephantshrew@CYLC_WORKFLOW_HOST=$HOSTNAME@" "${CONTACTFILE}"
-run_ok "teardown (stop workflow)" cylc stop "${WORKFLOW_NAME}" --now --now
+mv "${CONTACTFILE}.old" "$CONTACTFILE"
+run_ok "${TEST_NAME_BASE}-stop" cylc stop "${WORKFLOW_NAME}" --now --now
purge
exit 0
diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py
index f2f24b09ab5..cb9102b5974 100644
--- a/tests/integration/conftest.py
+++ b/tests/integration/conftest.py
@@ -462,14 +462,14 @@ def install(test_dir, run_dir):
Returns:
Workflow id, including run directory.
"""
- def _inner(source, **kwargs):
+ async def _inner(source, **kwargs):
opts = InstallOpts(**kwargs)
# Note we append the source.name to the string rather than creating
# a subfolder because the extra layer of directories would exceed
# Cylc install's default limit.
opts.workflow_name = (
f'{str(test_dir.relative_to(run_dir))}.{source.name}')
- workflow_id, _ = cylc_install(opts, str(source))
+ workflow_id, _ = await cylc_install(opts, str(source))
workflow_id = infer_latest_run_from_id(workflow_id)
return workflow_id
yield _inner
diff --git a/tests/integration/scripts/test_validate_integration.py b/tests/integration/scripts/test_validate_integration.py
index e271726dafb..9a17f2c7740 100644
--- a/tests/integration/scripts/test_validate_integration.py
+++ b/tests/integration/scripts/test_validate_integration.py
@@ -29,7 +29,7 @@ async def test_validate_against_source_checks_source(
"""Validation fails if validating against source with broken config.
"""
src_dir = workflow_source(one_conf)
- workflow_id = install(src_dir)
+ workflow_id = await install(src_dir)
# Check that the original installation validates OK:
validate(workflow_id, against_source=True)
@@ -67,7 +67,7 @@ async def test_validate_against_source_gets_old_tvars(
}
})
- wf_id = install(src_dir)
+ wf_id = await install(src_dir)
installed_dir = run_dir / wf_id
# Check that the original installation validates OK:
diff --git a/tests/integration/test_get_old_tvars.py b/tests/integration/test_get_old_tvars.py
index cd1ee2f118d..1a2ee67fe23 100644
--- a/tests/integration/test_get_old_tvars.py
+++ b/tests/integration/test_get_old_tvars.py
@@ -20,7 +20,7 @@
from cylc.flow.option_parsers import Options
from cylc.flow.scripts.validate import (
- wrapped_main as validate,
+ run as validate,
get_option_parser as validate_gop
)
from cylc.flow.scripts.view import (
diff --git a/tests/integration/test_install.py b/tests/integration/test_install.py
index f681c5a2a01..5a9298c4a8f 100644
--- a/tests/integration/test_install.py
+++ b/tests/integration/test_install.py
@@ -18,8 +18,7 @@
import pytest
from pathlib import Path
-
-from .test_scan import init_flows
+from typing import Callable, Tuple
from cylc.flow.async_util import pipe
from cylc.flow.scripts import scan
@@ -29,7 +28,8 @@
install_cli
)
-from typing import Callable, Tuple
+from .test_scan import init_flows
+from .utils.entry_points import EntryPointWrapper
SRV_DIR = Path(WorkflowFiles.Service.DIRNAME)
CONTACT = Path(WorkflowFiles.Service.CONTACT)
@@ -97,8 +97,8 @@ def src_run_dirs(
return tmp_src_path, tmp_run_path
-def test_install_scan_no_ping(
- src_run_dirs: Callable,
+async def test_install_scan_no_ping(
+ src_run_dirs: Tuple[Path, Path],
capsys: pytest.CaptureFixture,
caplog: pytest.LogCaptureFixture
) -> None:
@@ -110,21 +110,21 @@ def test_install_scan_no_ping(
opts = InstallOptions()
opts.no_ping = True
- install_cli(opts, id_='w1')
+ await install_cli(opts, id_='w1')
out = capsys.readouterr().out
assert INSTALLED_MSG.format(wfrun='w1/run2') in out
assert WF_ACTIVE_MSG.format(wf='w1') in out
# Empty contact file faked with "touch":
assert f"{BAD_CONTACT_MSG} w1/run1" in caplog.text
- install_cli(opts, id_='w2')
+ await install_cli(opts, id_='w2')
out = capsys.readouterr().out
assert WF_ACTIVE_MSG.format(wf='w2') not in out
assert INSTALLED_MSG.format(wfrun='w2/run1') in out
-def test_install_scan_ping(
- src_run_dirs: Callable,
+async def test_install_scan_ping(
+ src_run_dirs: Tuple[Path, Path],
capsys: pytest.CaptureFixture,
caplog: pytest.LogCaptureFixture,
patch_graphql_query: Callable
@@ -136,7 +136,7 @@ def test_install_scan_ping(
opts = InstallOptions()
opts.no_ping = False
- install_cli(opts, id_='w1')
+ await install_cli(opts, id_='w1')
out = capsys.readouterr().out
assert INSTALLED_MSG.format(wfrun='w1/run2') in out
assert WF_ACTIVE_MSG.format(wf='w1') in out
@@ -144,47 +144,52 @@ def test_install_scan_ping(
# Empty contact file faked with "touch":
assert f"{BAD_CONTACT_MSG} w1/run1" in caplog.text
- install_cli(opts, id_='w2')
+ await install_cli(opts, id_='w2')
out = capsys.readouterr().out
assert INSTALLED_MSG.format(wfrun='w2/run1') in out
assert WF_ACTIVE_MSG.format(wf='w2') not in out
-def test_install_gets_back_compat_mode_for_plugins(
- src_run_dirs: Callable,
+async def test_install_gets_back_compat_mode_for_plugins(
+ src_run_dirs: Tuple[Path, Path],
monkeypatch: pytest.MonkeyPatch,
+ capcall,
capsys: pytest.CaptureFixture,
):
- """Assert that pre cylc install will detect whether a workflow
+ """Assert that cylc install will detect whether a workflow
should use back compat mode _before_ running pre_configure plugins
so that those plugins can use that information.
"""
- class failIfDeprecated:
+ # track calls of the check_deprecation method
+ # (this is the thing that sets cylc.flow.flags.back_compat)
+ check_deprecation_calls = capcall(
+ 'cylc.flow.scripts.install.check_deprecation'
+ )
+
+ @EntryPointWrapper
+ def failIfDeprecated(*args, **kwargs):
"""A fake Cylc Plugin entry point"""
- @staticmethod
- def load():
- return failIfDeprecated.raiser
-
- @staticmethod
- def raiser(*_, **__):
- import cylc.flow.flags
- if cylc.flow.flags.cylc7_back_compat:
- print('Plugin:True')
- return True
- print('Plugin:False')
- return False
+ nonlocal check_deprecation_calls
+ # print the number of times the check_deprecation method has been
+ # called
+ print(f'CALLS={len(check_deprecation_calls)}')
+ # return a blank result
+ return {
+ 'env': {},
+ 'template_variables': {},
+ }
# Monkeypatch our fake entry point into iter_entry_points:
monkeypatch.setattr(
- 'cylc.flow.scripts.install.iter_entry_points',
- lambda x: [failIfDeprecated]
+ 'cylc.flow.plugins.iter_entry_points',
+ lambda namespace: (
+ [failIfDeprecated] if namespace == 'cylc.pre_configure' else []
+ )
)
- opts = InstallOptions()
- monkeypatch.setattr('cylc.flow.flags.cylc7_back_compat', False)
- install_cli(opts, id_='w1')
- assert capsys.readouterr()[0].split('\n')[0] == 'Plugin:False'
+ # install the workflow
+ opts = InstallOptions()
+ await install_cli(opts, id_='w1')
- monkeypatch.setattr('cylc.flow.flags.cylc7_back_compat', True)
- install_cli(opts, id_='w1')
- assert capsys.readouterr()[0].split('\n')[0] == 'Plugin:True'
+ # ensure the check_deprecation method was called before the plugin was run
+ assert 'CALLS=1' in capsys.readouterr()[0]
diff --git a/tests/integration/test_reinstall.py b/tests/integration/test_reinstall.py
index 1b965f18b8e..b2acd7abae6 100644
--- a/tests/integration/test_reinstall.py
+++ b/tests/integration/test_reinstall.py
@@ -16,10 +16,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+import asyncio
+from contextlib import asynccontextmanager
from pathlib import Path
from types import SimpleNamespace
from uuid import uuid1
-from functools import partial
import pytest
@@ -34,11 +35,11 @@
get_option_parser as reinstall_gop,
reinstall_cli,
)
-from cylc.flow.terminal import cli_function
from cylc.flow.workflow_files import (
WorkflowFiles,
)
-from cylc.flow.network.multi import call_multi
+
+from .utils.entry_points import EntryPointWrapper
ReInstallOptions = Options(reinstall_gop())
@@ -299,3 +300,89 @@ async def test_rsync_fail(one_src, one_run, mock_glbl_cfg, non_interactive):
with pytest.raises(WorkflowFilesError) as exc_ctx:
await reinstall_cli(opts=ReInstallOptions(), workflow_id=one_run.id)
assert 'An error occurred reinstalling' in str(exc_ctx.value)
+
+
+@pytest.fixture
+def my_install_plugin(monkeypatch):
+ """This configures a single post_install plugin.
+
+ The plugin starts an async task, then returns.
+ """
+ progress = []
+
+ @EntryPointWrapper
+ def post_install_basic(*_, **__):
+ """Simple plugin that returns one env var and one template var."""
+ nonlocal progress
+
+ async def my_async():
+ # the async task
+ nonlocal progress
+ await asyncio.sleep(2)
+ progress.append('end')
+
+ # start the async task
+ progress.append('start')
+ asyncio.get_event_loop().create_task(my_async())
+ progress.append('return')
+
+ # return a blank result
+ return {
+ 'env': {},
+ 'template_variables': {},
+ }
+
+ monkeypatch.setattr(
+ 'cylc.flow.plugins.iter_entry_points',
+ lambda namespace: (
+ [post_install_basic] if namespace == 'cylc.post_install' else []
+ )
+ )
+
+ return progress
+
+
+async def test_async_block(
+ one_src,
+ one_run,
+ my_install_plugin,
+ monkeypatch,
+):
+ """Ensure async tasks created by post_install plugins are awaited.
+
+ The cylc-rose plugin may create asyncio tasks when run but cannot await
+ them (because it isn't async itself). To get around this we have
+ "cylc reinstall" use "async_block" which detects tasks created in the
+ background and awaits them.
+
+ This test ensures that the async_block mechanism is correctly plugged in
+ to "cylc reinstall".
+
+ See https://github.com/cylc/cylc-rose/issues/274
+ """
+ # this is what it should do
+ (one_src.path / 'a').touch() # give it something to install
+ assert my_install_plugin == []
+ await reinstall_cli(opts=ReInstallOptions(), workflow_id=one_run.id)
+ # the presence of "end" means that the task was awaited
+ assert my_install_plugin == ['start', 'return', 'end']
+
+ # substitute the "async_block" (which waits for asyncio tasks started in
+ # the background) for a fake implementation (which doesn't)
+
+ @asynccontextmanager
+ async def async_block():
+ yield
+
+ monkeypatch.setattr(
+ 'cylc.flow.scripts.reinstall.async_block',
+ async_block,
+ )
+
+ # this is would it would have done without async block
+ (one_src.path / 'b').touch() # give it something else to install
+ my_install_plugin.clear()
+ assert my_install_plugin == []
+ await reinstall_cli(opts=ReInstallOptions(), workflow_id=one_run.id)
+ # the absence of "end" means that the task was not awaited
+ assert my_install_plugin == ['start', 'return']
diff --git a/tests/integration/utils/entry_points.py b/tests/integration/utils/entry_points.py
new file mode 100644
index 00000000000..3430f74631b
--- /dev/null
+++ b/tests/integration/utils/entry_points.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python3
+
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Utilities for working with entry points."""
+
+
+class EntryPointWrapper:
+ """Wraps a method to make it look like an entry point."""
+
+ def __init__(self, fcn):
+ self.name = fcn.__name__
+ self.fcn = fcn
+
+ def load(self):
+ return self.fcn
diff --git a/tests/unit/plugins/test_pre_configure.py b/tests/unit/plugins/test_pre_configure.py
index e9f91054e12..87ecfda0e41 100644
--- a/tests/unit/plugins/test_pre_configure.py
+++ b/tests/unit/plugins/test_pre_configure.py
@@ -65,9 +65,12 @@ def pre_configure_error(*_, **__):
def test_pre_configure(monkeypatch):
"""It should call the plugin."""
monkeypatch.setattr(
- 'cylc.flow.parsec.fileparse.iter_entry_points',
- lambda x: [pre_configure_basic]
+ 'cylc.flow.plugins.iter_entry_points',
+ lambda namespace: (
+ [pre_configure_basic] if namespace == 'cylc.pre_configure' else []
+ )
)
+
extra_vars = process_plugins(None, None)
assert extra_vars == {
'env': {
@@ -83,11 +86,13 @@ def test_pre_configure(monkeypatch):
def test_pre_configure_duplicate(monkeypatch):
"""It should error when plugins clash."""
monkeypatch.setattr(
- 'cylc.flow.parsec.fileparse.iter_entry_points',
- lambda x: [
- pre_configure_basic,
- pre_configure_basic
- ]
+ 'cylc.flow.plugins.iter_entry_points',
+ lambda namespace: (
+ [
+ pre_configure_basic,
+ pre_configure_basic,
+ ] if namespace == 'cylc.pre_configure' else []
+ )
)
with pytest.raises(ParsecError):
process_plugins(None, None)
@@ -96,11 +101,13 @@ def test_pre_configure_duplicate(monkeypatch):
def test_pre_configure_templating_detected(monkeypatch):
"""It should error when plugins clash (for templating)."""
monkeypatch.setattr(
- 'cylc.flow.parsec.fileparse.iter_entry_points',
- lambda x: [
- pre_configure_templating_detected,
- pre_configure_templating_detected
- ]
+ 'cylc.flow.plugins.iter_entry_points',
+ lambda namespace: (
+ [
+ pre_configure_templating_detected,
+ pre_configure_templating_detected,
+ ] if namespace == 'cylc.pre_configure' else []
+ )
)
with pytest.raises(ParsecError):
process_plugins(None, None)
@@ -109,8 +116,12 @@ def test_pre_configure_templating_detected(monkeypatch):
def test_pre_configure_exception(monkeypatch):
"""It should wrap plugin errors."""
monkeypatch.setattr(
- 'cylc.flow.parsec.fileparse.iter_entry_points',
- lambda x: [pre_configure_error]
+ 'cylc.flow.plugins.iter_entry_points',
+ lambda namespace: (
+ [
+ pre_configure_error,
+ ] if namespace == 'cylc.pre_configure' else []
+ )
)
with pytest.raises(PluginError) as exc_ctx:
process_plugins(None, None)