From 731a2e98fe503bf9059d02c57425dcd09f9b06b1 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Mon, 19 Jun 2023 12:22:29 +0100 Subject: [PATCH] reload: wait for pending tasks to submit and pause the workflow * Closes https://github.com/cylc/cylc-flow/issues/5107 * Reload now waits for pending tasks to submit before attempting to reload the config itself. * Reload now also puts the workflow into the paused state during the reload process. This doesn't actually achieve anything as the reload command is blocking in the main loop, but it does help to communicate that the workflow will not de-queue or submit and new tasks during this process. * The workflow status message is now updated to reflect the reload progress. --- CHANGES.md | 4 + cylc/flow/scheduler.py | 225 +++++++++++++++++++++------ cylc/flow/workflow_status.py | 3 + tests/conftest.py | 32 ++++ tests/functional/logging/01-basic.t | 12 +- tests/functional/reload/09-garbage.t | 4 +- tests/integration/test_reload.py | 148 ++++++++++++++++++ tests/integration/test_resolvers.py | 2 +- tests/integration/test_scheduler.py | 74 --------- tests/integration/test_task_pool.py | 6 +- tests/unit/test_workflow_status.py | 10 +- 11 files changed, 386 insertions(+), 134 deletions(-) create mode 100644 tests/integration/test_reload.py diff --git a/CHANGES.md b/CHANGES.md index 7370fff01c7..41128f3a19e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,10 @@ ones in. --> ### Enhancements +[#5992](https://github.com/cylc/cylc-flow/pull/5992) - +The scheduler will now wait for preparing tasks to submit before attempting +to perform a reload and will also pause/unpause the workflow. + [#5405](https://github.com/cylc/cylc-flow/pull/5405) - Improve scan command help, and add scheduler PID to the output. diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 7b6cfe6a44d..2352997518d 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -20,6 +20,7 @@ from collections import deque from optparse import Values import os +import inspect from pathlib import Path from queue import Empty, Queue from shlex import quote @@ -32,12 +33,12 @@ from typing import ( TYPE_CHECKING, Callable, + Dict, Iterable, + List, NoReturn, Optional, - List, Set, - Dict, Tuple, Union, ) @@ -150,6 +151,10 @@ if TYPE_CHECKING: from cylc.flow.task_proxy import TaskProxy + # BACK COMPAT: typing_extensions.Literal + # FROM: Python 3.7 + # TO: Python 3.8 + from typing_extensions import Literal class SchedulerStop(CylcError): @@ -244,6 +249,7 @@ class Scheduler: stop_mode: Optional[StopMode] = None stop_task: Optional[str] = None stop_clock_time: Optional[int] = None + reload_pending: 'Union[Literal[False], str]' = False # task event loop is_paused = False @@ -427,7 +433,8 @@ async def configure(self): self.profiler.log_memory("scheduler.py: before load_flow_file") try: - self.load_flow_file() + cfg = self.load_flow_file() + self.apply_new_config(cfg, is_reload=False) except ParsecError as exc: # Mark this exc as expected (see docstring for .schd_expected): exc.schd_expected = True @@ -521,8 +528,7 @@ async def configure(self): self.command_set_hold_point(holdcp) if self.options.paused_start: - LOG.info("Paused on start up") - self.pause_workflow() + self.pause_workflow('Paused on start up') self.profiler.log_memory("scheduler.py: begin run while loop") self.is_updated = True @@ -881,7 +887,7 @@ def queue_command(self, command: str, kwargs: dict) -> None: tuple(kwargs.values()), {} )) - def process_command_queue(self) -> None: + async def process_command_queue(self) -> None: """Process queued commands.""" qsize = self.command_queue.qsize() if qsize <= 0: @@ -900,8 +906,12 @@ def process_command_queue(self) -> None: sep = ', ' if kwargs_string and args_string else '' cmdstr = f"{name}({args_string}{sep}{kwargs_string})" try: - n_warnings: Optional[int] = self.get_command_method(name)( - *args, **kwargs) + fcn = self.get_command_method(name) + n_warnings: Optional[int] + if inspect.iscoroutinefunction(fcn): + n_warnings = await fcn(*args, **kwargs) + else: + n_warnings = fcn(*args, **kwargs) except Exception as exc: # Don't let a bad command bring the workflow down. if ( @@ -1046,37 +1056,95 @@ def command_remove_tasks(self, items) -> int: """Remove tasks.""" return self.pool.remove_tasks(items) - def command_reload_workflow(self) -> None: + async def command_reload_workflow(self) -> None: """Reload workflow configuration.""" - LOG.info("Reloading the workflow definition.") - old_tasks = set(self.config.get_task_name_list()) - # Things that can't change on workflow reload: - self.workflow_db_mgr.pri_dao.select_workflow_params( - self._load_workflow_params - ) + # pause the workflow if not already + was_paused_before_reload = self.is_paused + if not was_paused_before_reload: + self.pause_workflow('Reloading workflow') + self.process_workflow_db_queue() # see #5593 + + # flush out preparing tasks before attempting reload + self.reload_pending = 'waiting for pending tasks to submit' + while self.release_queued_tasks(): + # Run the subset of main-loop functionality required to push + # preparing through the submission pipeline and keep the workflow + # responsive (e.g. to the `cylc stop` command). + + # NOTE: this reload method was called by process_command_queue + # which is called synchronously in the main loop so this call is + # blocking to other main loop functions + + # subproc pool - for issueing/tracking remote-init commands + self.proc_pool.process() + # task messages - for tracking task status changes + self.process_queued_task_messages() + # command queue - keeps the scheduler responsive + await self.process_command_queue() + # allows the scheduler to shutdown --now + await self.workflow_shutdown() + # keep the data store up to date with what's going on + await self.update_data_structure() + self.update_data_store() + # give commands time to complete + sleep(1) # give any remove-init's time to complete + # reload the workflow definition + self.reload_pending = 'loading the workflow definition' + self.update_data_store() # update workflow status msg + self._update_workflow_state() + LOG.info("Reloading the workflow definition.") try: - self.load_flow_file(is_reload=True) + cfg = self.load_flow_file(is_reload=True) except (ParsecError, CylcConfigError) as exc: - raise CommandFailedError(exc) - self.broadcast_mgr.linearized_ancestors = ( - self.config.get_linearized_ancestors()) - self.pool.set_do_reload(self.config) - self.task_events_mgr.mail_interval = self.cylc_config['mail'][ - 'task event batch interval'] - self.task_events_mgr.mail_smtp = self._get_events_conf("smtp") - self.task_events_mgr.mail_footer = self._get_events_conf("footer") - - # Log tasks that have been added by the reload, removed tasks are - # logged by the TaskPool. - add = set(self.config.get_task_name_list()) - old_tasks - for task in add: - LOG.warning(f"Added task: '{task}'") - self.workflow_db_mgr.put_workflow_template_vars(self.template_vars) - self.workflow_db_mgr.put_runtime_inheritance(self.config) - self.workflow_db_mgr.put_workflow_params(self) - self.is_updated = True - self.is_reloaded = True + if cylc.flow.flags.verbosity > 1: + # log full traceback in debug mode + LOG.exception(exc) + LOG.critical( + f'Reload failed - {exc.__class__.__name__}: {exc}' + '\nThis is probably due to an issue with the new' + ' configuration.' + '\nTo continue with the pre-reload config, un-pause the' + ' workflow.' + '\nOtherwise, fix the configuration and attempt to reload' + ' again.' + ) + else: + self.reload_pending = 'applying the new config' + old_tasks = set(self.config.get_task_name_list()) + # Things that can't change on workflow reload: + self.workflow_db_mgr.pri_dao.select_workflow_params( + self._load_workflow_params + ) + self.apply_new_config(cfg, is_reload=True) + self.broadcast_mgr.linearized_ancestors = ( + self.config.get_linearized_ancestors()) + self.pool.set_do_reload(self.config) + self.task_events_mgr.mail_interval = self.cylc_config['mail'][ + 'task event batch interval'] + self.task_events_mgr.mail_smtp = self._get_events_conf("smtp") + self.task_events_mgr.mail_footer = self._get_events_conf("footer") + + # Log tasks that have been added by the reload, removed tasks are + # logged by the TaskPool. + add = set(self.config.get_task_name_list()) - old_tasks + for task in add: + LOG.warning(f"Added task: '{task}'") + self.workflow_db_mgr.put_workflow_template_vars(self.template_vars) + self.workflow_db_mgr.put_runtime_inheritance(self.config) + self.workflow_db_mgr.put_workflow_params(self) + self.process_workflow_db_queue() # see #5593 + self.is_updated = True + self.is_reloaded = True + self._update_workflow_state() + + # resume the workflow if previously paused + self.reload_pending = False + self.update_data_store() # update workflow status msg + self._update_workflow_state() + if not was_paused_before_reload: + self.resume_workflow() + self.process_workflow_db_queue() # see #5593 def get_restart_num(self) -> int: """Return the number of the restart, else 0 if not a restart. @@ -1147,7 +1215,7 @@ def _configure_contact(self) -> None: def load_flow_file(self, is_reload=False): """Load, and log the workflow definition.""" # Local workflow environment set therein. - self.config = WorkflowConfig( + return WorkflowConfig( self.workflow, self.flow_file, self.options, @@ -1163,11 +1231,13 @@ def load_flow_file(self, is_reload=False): work_dir=self.workflow_work_dir, share_dir=self.workflow_share_dir, ) + + def apply_new_config(self, config, is_reload=False): + self.config = config self.cylc_config = DictTree( self.config.cfg['scheduler'], glbl_cfg().get(['scheduler']) ) - self.flow_file_update_time = time() # Dump the loaded flow.cylc file for future reference. config_dir = get_workflow_run_config_log_dir( @@ -1294,7 +1364,7 @@ def run_event_handlers(self, event, reason=""): return self.workflow_event_handler.handle(self, event, str(reason)) - def release_queued_tasks(self) -> None: + def release_queued_tasks(self) -> bool: """Release queued tasks, and submit jobs. The task queue manages references to task proxies in the task pool. @@ -1312,19 +1382,32 @@ def release_queued_tasks(self) -> None: * https://github.com/cylc/cylc-flow/pull/4620 * https://github.com/cylc/cylc-flow/issues/4974 + Returns: + True if tasks were passed through the submit-pipeline + (i.e. new waiting tasks have entered the preparing state OR + preparing tasks have been passed back through for + submission). + """ if ( not self.is_paused and self.stop_mode is None and self.auto_restart_time is None + and self.reload_pending is False ): pre_prep_tasks = self.pool.release_queued_tasks() elif ( - self.should_auto_restart_now() - and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL + ( + # Need to get preparing tasks to submit before auto restart + self.should_auto_restart_now() + and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL + ) or ( + # Need to get preparing tasks to submit before reload + self.reload_pending + ) ): - # Need to get preparing tasks to submit before auto restart + # don't release queued tasks, finish processing preparing tasks pre_prep_tasks = [ itask for itask in self.pool.get_tasks() if itask.state(TASK_STATUS_PREPARING) @@ -1332,9 +1415,9 @@ def release_queued_tasks(self) -> None: # Return, if no tasks to submit. else: - return + return False if not pre_prep_tasks: - return + return False # Start the job submission process. self.is_updated = True @@ -1362,6 +1445,9 @@ def release_queued_tasks(self) -> None: f"{itask.state.get_resolved_dependencies()} in flow {flow}" ) + # one or more tasks were passed through the submission pipeline + return True + def process_workflow_db_queue(self): """Update workflow DB.""" self.workflow_db_mgr.process_queued_ops() @@ -1431,7 +1517,7 @@ async def workflow_shutdown(self): LOG.info(stop_process_pool_empty_msg) stop_process_pool_empty_msg = None self.proc_pool.process() - self.process_command_queue() + await self.process_command_queue() if self.options.profile_mode: self.profiler.log_memory( "scheduler.py: end main loop (total loops %d): %s" % @@ -1574,7 +1660,7 @@ async def main_loop(self) -> None: self.is_reloaded = True self.is_updated = True - self.process_command_queue() + await self.process_command_queue() self.proc_pool.process() # Tasks in the main pool that are waiting but not queued must be @@ -1632,7 +1718,7 @@ async def main_loop(self) -> None: self.late_tasks_check() self.process_queued_task_messages() - self.process_command_queue() + await self.process_command_queue() self.task_events_mgr.process_events(self) # Update state summary, database, and uifeed @@ -1690,6 +1776,27 @@ async def main_loop(self) -> None: self.main_loop_intervals.append(time() - tinit) # END MAIN LOOP + def _update_workflow_state(self): + """Update workflow state in the data store and push out any deltas. + + A cut-down version of update_data_structure which only considers + workflow state changes e.g. status, status message, state totals, etc. + """ + # update the workflow state in the data store + self.data_store_mgr.update_workflow() + + # push out update deltas + self.data_store_mgr.batch_deltas() + self.data_store_mgr.apply_delta_batch() + self.data_store_mgr.apply_delta_checksum() + self.data_store_mgr.publish_deltas = self.data_store_mgr.get_publish_deltas() + self.server.publish_queue.put( + self.data_store_mgr.publish_deltas) + + # Non-async sleep - yield to other threads rather + # than event loop + sleep(0) + async def update_data_structure(self) -> Union[bool, List['TaskProxy']]: """Update DB, UIS, Summary data elements""" updated_tasks = [ @@ -1818,7 +1925,7 @@ async def _shutdown(self, reason: BaseException) -> None: LOG.exception(ex) # disconnect from workflow-db, stop db queue try: - self.workflow_db_mgr.process_queued_ops() + self.process_workflow_db_queue() self.workflow_db_mgr.on_workflow_shutdown() except Exception as exc: LOG.exception(exc) @@ -1931,12 +2038,22 @@ def check_auto_shutdown(self): return True - def pause_workflow(self) -> None: - """Pause the workflow.""" + def pause_workflow(self, msg: Optional[str] = None) -> None: + """Pause the workflow. + + Args: + msg: + A user-facing string explaining why the workflow was paused if + helpful. + + """ if self.is_paused: LOG.info("Workflow is already paused") return - LOG.info("PAUSING the workflow now") + _msg = "PAUSING the workflow now" + if msg: + _msg += f': {msg}' + LOG.info(_msg) self.is_paused = True self.workflow_db_mgr.put_workflow_paused() self.update_data_store() @@ -1945,8 +2062,14 @@ def resume_workflow(self, quiet: bool = False) -> None: """Resume the workflow. Args: - quiet: whether to log anything. + quiet: + Whether to log anything in the event the workflow is not + paused. + """ + if self.reload_pending: + LOG.warning('Cannot resume - workflow is reloading') + return if not self.is_paused: if not quiet: LOG.warning("Cannot resume - workflow is not paused") diff --git a/cylc/flow/workflow_status.py b/cylc/flow/workflow_status.py index b0cdf4a976c..95f25e39358 100644 --- a/cylc/flow/workflow_status.py +++ b/cylc/flow/workflow_status.py @@ -165,6 +165,9 @@ def get_workflow_status(schd: 'Scheduler') -> Tuple[str, str]: if schd.stop_mode is not None: status = WorkflowStatus.STOPPING status_msg = f'stopping: {schd.stop_mode.explain()}' + elif schd.reload_pending: + status = WorkflowStatus.PAUSED + status_msg = f'reloading: {schd.reload_pending}' elif schd.is_stalled: status_msg = 'stalled' elif schd.is_paused: diff --git a/tests/conftest.py b/tests/conftest.py index fb7a7408f31..8f07a4f3441 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -109,6 +109,38 @@ def _log_filter( return _log_filter +@pytest.fixture +def log_scan(): + """Ensure log messages appear in the correct order. + + TRY TO AVOID DOING THIS! + + If you are trying to test a sequence of events you are likely better off + doing this a different way (e.g. mock the functions you are interested in + and test the call arguments/returns later). + + However, there are some occasions where this might be necessary, e.g. + testing a monolithic synchronous function. + + Args: + log: The caplog fixture. + items: Iterable of string messages to compare. All are tested + by "contains" i.e. "item in string". + + """ + def _log_scan(log, items): + records = iter(log.records) + record = next(records) + for item in items: + while item not in record.message: + try: + record = next(records) + except StopIteration: + raise Exception(f'Reached end of log looking for: {item}') + + return _log_scan + + @pytest.fixture(scope='session') def port_range(): return glbl_cfg().get(['scheduler', 'run hosts', 'ports']) diff --git a/tests/functional/logging/01-basic.t b/tests/functional/logging/01-basic.t index d16e6e8d3e9..98183026bb2 100644 --- a/tests/functional/logging/01-basic.t +++ b/tests/functional/logging/01-basic.t @@ -31,14 +31,18 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' R1 = reloader1 => stopper => reloader2 [runtime] [[reloader1, reloader2]] - script = cylc reload "${CYLC_WORKFLOW_ID}" + script = """ + cylc reload "${CYLC_WORKFLOW_ID}" + # wait for the command to complete + cylc__job__poll_grep_workflow_log 'Reload completed' + """ [[stopper]] script = cylc stop --now --now "${CYLC_WORKFLOW_ID}" __FLOW_CONFIG__ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" -workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" +workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" --main-loop='log db' # Check scheduler logs. ls "${WORKFLOW_RUN_DIR}/log/scheduler/" > schd_1.out @@ -55,7 +59,9 @@ cmp_ok conf_1.out << __EOF__ flow-processed.cylc __EOF__ -workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" +mv "$WORKFLOW_RUN_DIR/cylc.flow.main_loop.log_db.sql" "$WORKFLOW_RUN_DIR/01.cylc.flow.main_loop.log_db.sql" + +workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" --main-loop='log db' ls "${WORKFLOW_RUN_DIR}/log/scheduler/" > schd_2.out cmp_ok schd_2.out << __EOF__ diff --git a/tests/functional/reload/09-garbage.t b/tests/functional/reload/09-garbage.t index 5c9e8c6b889..0853a4e4c1e 100644 --- a/tests/functional/reload/09-garbage.t +++ b/tests/functional/reload/09-garbage.t @@ -29,6 +29,8 @@ TEST_NAME="${TEST_NAME_BASE}-run" workflow_run_ok "${TEST_NAME}" \ cylc play --reference-test --debug --no-detach "${WORKFLOW_NAME}" #------------------------------------------------------------------------------- -grep_ok 'Command failed: reload_workflow' "${WORKFLOW_RUN_DIR}/log/scheduler/log" +grep_ok \ + 'Reload failed - IllegalItemError: \[scheduling\]garbage' \ + "${WORKFLOW_RUN_DIR}/log/scheduler/log" #------------------------------------------------------------------------------- purge diff --git a/tests/integration/test_reload.py b/tests/integration/test_reload.py new file mode 100644 index 00000000000..1a2ba1391fc --- /dev/null +++ b/tests/integration/test_reload.py @@ -0,0 +1,148 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Tests for reload behaviour in the scheduler.""" + +from contextlib import suppress + +from cylc.flow.task_state import ( + TASK_STATUS_WAITING, + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, +) + + +async def test_reload_waits_for_pending_tasks( + flow, + scheduler, + start, + monkeypatch, + capture_submission, + log_scan, +): + """Reload should flush out preparing tasks and pause the workflow. + + Reloading a workflow with preparing tasks may be unsafe and is at least + confusing. For safety we should pause the workflow and flush out any + preparing tasks before attempting reload. + + See https://github.com/cylc/cylc-flow/issues/5107 + """ + # a simple workflow with a single task + id_ = flow({ + 'scheduling': { + 'graph': { + 'R1': 'foo', + }, + }, + 'runtime': { + 'foo': {}, + }, + }) + schd = scheduler(id_, paused_start=False) + + # we will artificially push the task through these states + state_seq = [ + # repeat the preparing state a few times to simulate a task + # taking multiple main-loop cycles to submit + TASK_STATUS_PREPARING, + TASK_STATUS_PREPARING, + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, + ] + + # start the scheduler + async with start(schd) as log: + # disable submission events to prevent anything from actually running + capture_submission(schd) + + # set the task to go through some state changes + def change_state(_=0): + with suppress(IndexError): + foo.state_reset(state_seq.pop(0)) + monkeypatch.setattr( + 'cylc.flow.scheduler.sleep', + change_state + ) + + # the task should start as waiting + tasks = schd.pool.get_tasks() + assert len(tasks) == 1 + foo = tasks[0] + assert tasks[0].state(TASK_STATUS_WAITING) + + # put the task into the preparing state + change_state() + + # reload the workflow + await schd.command_reload_workflow() + + # the task should end in the submitted state + assert foo.state(TASK_STATUS_SUBMITTED) + + # ensure the order of events was correct + log_scan( + log, + [ + # the task should have entered the preparing state before the + # reload was requested + '[1/foo waiting(queued) job:00 flows:1] => preparing(queued)', + # the reload should have put the workflow into the paused state + 'PAUSING the workflow now: Reloading workflow defintion', + # reload should have waited for the task to submit + '[1/foo preparing(queued) job:00 flows:1]' + ' => submitted(queued)', + # before then reloading the workflow config + 'Reloading the workflow definition.', + # post-reload the workflow should have been resumed + 'RESUMING the workflow now', + ], + ) + + +async def test_reload_failure( + flow, + one_conf, + scheduler, + start, + log_filter, +): + """Reload should not crash the workflow on config errors. + + A warning should be logged along with the error. + """ + id_ = flow(one_conf) + schd = scheduler(id_) + async with start(schd) as log: + # corrupt the config by removing the scheduling section + two_conf = {**one_conf, 'scheduling': {}} + flow(two_conf, id_=id_) + + # reload the workflow + await schd.command_reload_workflow() + + # the reload should have failed but the workflow should still be + # running + assert log_filter( + log, + contains=( + 'Reload failed - WorkflowConfigError:' + ' missing [scheduling][[graph]] section' + ) + ) + + # the config should be unchanged + assert schd.config.cfg['scheduling']['graph']['R1'] == 'one' diff --git a/tests/integration/test_resolvers.py b/tests/integration/test_resolvers.py index 2f452cda760..b6e32708505 100644 --- a/tests/integration/test_resolvers.py +++ b/tests/integration/test_resolvers.py @@ -232,7 +232,7 @@ async def test_stop( schd=one ) resolvers.stop(StopMode.REQUEST_CLEAN) - one.process_command_queue() + await one.process_command_queue() assert log_filter( log, level=logging.INFO, contains="Command actioned: stop" ) diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index c183036d004..5330e516758 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -241,80 +241,6 @@ async def test_no_poll_waiting_tasks( assert "Orphaned tasks:\n* 1/one (running)" in log.messages -@pytest.mark.parametrize('reload', [False, True]) -@pytest.mark.parametrize( - 'test_conf, expected_msg', - [ - pytest.param( - {'Alan Wake': "It's not a lake, it's an ocean"}, - "IllegalItemError: Alan Wake", - id="illegal item" - ), - pytest.param( - { - 'scheduling': { - 'initial cycle point': "2k22", - 'graph': {'R1': "a => b"} - } - }, - ("IllegalValueError: (type=cycle point) " - "[scheduling]initial cycle point = 2k22 - (Invalid cycle point)"), - id="illegal cycle point" - ) - ] -) -async def test_illegal_config_load( - test_conf: dict, - expected_msg: str, - reload: bool, - flow: Callable, - one_conf: dict, - start: Callable, - run: Callable, - scheduler: Callable, - log_filter: Callable -): - """Test that ParsecErrors (illegal config) - that occur during config load - when running a workflow - are displayed without traceback. - - Params: - test_conf: Dict to update one_conf with. - expected_msg: Expected log message at error level. - reload: If False, test a workflow start with invalid config. - If True, test a workflow start with valid config followed by - reload with invalid config. - """ - if not reload: - one_conf.update(test_conf) - id_: str = flow(one_conf) - schd: Scheduler = scheduler(id_) - log: pytest.LogCaptureFixture - - if reload: - one_conf.update(test_conf) - async with run(schd) as log: - # Shouldn't be any errors at this stage: - assert not log_filter(log, level=logging.ERROR) - # Modify flow.cylc: - flow(one_conf, id_=id_) - schd.queue_command('reload_workflow', {}) - assert log_filter( - log, level=logging.ERROR, - exact_match=f"Command failed: reload_workflow()\n{expected_msg}" - ) - else: - with pytest.raises(ParsecError): - async with start(schd) as log: - pass - assert log_filter( - log, - level=logging.ERROR, - exact_match=f"Workflow shutting down - {expected_msg}" - ) - - assert TRACEBACK_MSG not in log.text - - async def test_unexpected_ParsecError( one: Scheduler, start: Callable, diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 530e0063b70..7ca64d6bfc6 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -558,7 +558,7 @@ async def test_reload_stopcp( schd: Scheduler = scheduler(flow(cfg)) async with start(schd): assert str(schd.pool.stop_point) == '2020' - schd.command_reload_workflow() + await schd.command_reload_workflow() assert str(schd.pool.stop_point) == '2020' @@ -827,7 +827,7 @@ async def test_reload_prereqs( flow(conf, id_=id_) # Reload the workflow config - schd.command_reload_workflow() + await schd.command_reload_workflow() schd.pool.reload_taskdefs() assert list_tasks(schd) == expected_3 @@ -958,7 +958,7 @@ async def test_graph_change_prereq_satisfaction( flow(conf, id_=id_) # Reload the workflow config - schd.command_reload_workflow() + await schd.command_reload_workflow() schd.pool.reload_taskdefs() await test.asend(schd) diff --git a/tests/unit/test_workflow_status.py b/tests/unit/test_workflow_status.py index 046783d4ff1..af88de3daab 100644 --- a/tests/unit/test_workflow_status.py +++ b/tests/unit/test_workflow_status.py @@ -36,12 +36,14 @@ def schd( stop_mode=None, stop_point=None, stop_task_id=None, + reload_pending=False, ): return SimpleNamespace( is_paused=is_paused, is_stalled=is_stalled, stop_clock_time=stop_clock_time, stop_mode=stop_mode, + reload_pending=reload_pending, pool=SimpleNamespace( hold_point=hold_point, stop_point=stop_point, @@ -58,7 +60,13 @@ def schd( ( {'is_paused': True}, WorkflowStatus.PAUSED, - 'paused'), + 'paused' + ), + ( + {'reload_pending': 'message'}, + WorkflowStatus.PAUSED, + 'reloading: message' + ), ( {'stop_mode': StopMode.AUTO}, WorkflowStatus.STOPPING,