diff --git a/CHANGES.md b/CHANGES.md index 7370fff01c7..41128f3a19e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,10 @@ ones in. --> ### Enhancements +[#5992](https://github.com/cylc/cylc-flow/pull/5992) - +The scheduler will now wait for preparing tasks to submit before attempting +to perform a reload and will also pause/unpause the workflow. + [#5405](https://github.com/cylc/cylc-flow/pull/5405) - Improve scan command help, and add scheduler PID to the output. diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 7b6cfe6a44d..3d9083c4200 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -32,12 +32,12 @@ from typing import ( TYPE_CHECKING, Callable, + Dict, Iterable, + List, NoReturn, Optional, - List, Set, - Dict, Tuple, Union, ) @@ -150,6 +150,10 @@ if TYPE_CHECKING: from cylc.flow.task_proxy import TaskProxy + # BACK COMPAT: typing_extensions.Literal + # FROM: Python 3.7 + # TO: Python 3.8 + from typing_extensions import Literal class SchedulerStop(CylcError): @@ -244,6 +248,7 @@ class Scheduler: stop_mode: Optional[StopMode] = None stop_task: Optional[str] = None stop_clock_time: Optional[int] = None + reload_pending: 'Union[Literal[False], str]' = False # task event loop is_paused = False @@ -427,7 +432,8 @@ async def configure(self): self.profiler.log_memory("scheduler.py: before load_flow_file") try: - self.load_flow_file() + cfg = self.load_flow_file() + self.apply_new_config(cfg, is_reload=False) except ParsecError as exc: # Mark this exc as expected (see docstring for .schd_expected): exc.schd_expected = True @@ -521,8 +527,7 @@ async def configure(self): self.command_set_hold_point(holdcp) if self.options.paused_start: - LOG.info("Paused on start up") - self.pause_workflow() + self.pause_workflow('Paused on start up') self.profiler.log_memory("scheduler.py: begin run while loop") self.is_updated = True @@ -1048,35 +1053,71 @@ def command_remove_tasks(self, items) -> int: def command_reload_workflow(self) -> None: """Reload workflow configuration.""" + # pause the workflow if not already + was_paused = self.is_paused + if not was_paused: + self.pause_workflow('Reloading workflow defintion') + self.process_workflow_db_queue() # see #5593 + + # flush out preparing tasks before attempting reload + self.reload_pending = 'waiting for pending tasks to submit' + while self.release_queued_tasks(): + # NOTE: this method was called by process_command_queue which is + # called synchronously in the main loop so this call is blocking to + # other main loop functions + sleep(1) # give any remove-init's time to complete + self.reload_pending = 'loading the workflow definition' + + # reload the workflow definition LOG.info("Reloading the workflow definition.") - old_tasks = set(self.config.get_task_name_list()) - # Things that can't change on workflow reload: - self.workflow_db_mgr.pri_dao.select_workflow_params( - self._load_workflow_params - ) - try: - self.load_flow_file(is_reload=True) + cfg = self.load_flow_file(is_reload=True) except (ParsecError, CylcConfigError) as exc: - raise CommandFailedError(exc) - self.broadcast_mgr.linearized_ancestors = ( - self.config.get_linearized_ancestors()) - self.pool.set_do_reload(self.config) - self.task_events_mgr.mail_interval = self.cylc_config['mail'][ - 'task event batch interval'] - self.task_events_mgr.mail_smtp = self._get_events_conf("smtp") - self.task_events_mgr.mail_footer = self._get_events_conf("footer") - - # Log tasks that have been added by the reload, removed tasks are - # logged by the TaskPool. - add = set(self.config.get_task_name_list()) - old_tasks - for task in add: - LOG.warning(f"Added task: '{task}'") - self.workflow_db_mgr.put_workflow_template_vars(self.template_vars) - self.workflow_db_mgr.put_runtime_inheritance(self.config) - self.workflow_db_mgr.put_workflow_params(self) - self.is_updated = True - self.is_reloaded = True + if cylc.flow.flags.verbosity > 1: + # log full traceback in debug mode + LOG.exception(exc) + LOG.critical( + f'Reload failed - {exc.__class__.__name__}: {exc}' + '\nThis is probably due to an issue with the new' + ' configuration.' + '\nTo continue with the pre-reload config, un-pause the' + ' workflow.' + '\nOtherwise, fix the configuration and attempt to reload' + ' again.' + ) + else: + self.reload_pending = 'applying the new config' + old_tasks = set(self.config.get_task_name_list()) + # Things that can't change on workflow reload: + self.workflow_db_mgr.pri_dao.select_workflow_params( + self._load_workflow_params + ) + self.apply_new_config(cfg, is_reload=True) + self.broadcast_mgr.linearized_ancestors = ( + self.config.get_linearized_ancestors()) + self.pool.set_do_reload(self.config) + self.task_events_mgr.mail_interval = self.cylc_config['mail'][ + 'task event batch interval'] + self.task_events_mgr.mail_smtp = self._get_events_conf("smtp") + self.task_events_mgr.mail_footer = self._get_events_conf("footer") + + # Log tasks that have been added by the reload, removed tasks are + # logged by the TaskPool. + add = set(self.config.get_task_name_list()) - old_tasks + for task in add: + LOG.warning(f"Added task: '{task}'") + self.workflow_db_mgr.put_workflow_template_vars(self.template_vars) + self.workflow_db_mgr.put_runtime_inheritance(self.config) + self.workflow_db_mgr.put_workflow_params(self) + self.process_workflow_db_queue() # see #5593 + self.is_updated = True + self.is_reloaded = True + + # resume the workflow if previously paused + self.reload_pending = False + if not was_paused: + self.resume_workflow() + self.process_workflow_db_queue() # see #5593 def get_restart_num(self) -> int: """Return the number of the restart, else 0 if not a restart. @@ -1147,7 +1188,7 @@ def _configure_contact(self) -> None: def load_flow_file(self, is_reload=False): """Load, and log the workflow definition.""" # Local workflow environment set therein. - self.config = WorkflowConfig( + return WorkflowConfig( self.workflow, self.flow_file, self.options, @@ -1163,11 +1204,13 @@ def load_flow_file(self, is_reload=False): work_dir=self.workflow_work_dir, share_dir=self.workflow_share_dir, ) + + def apply_new_config(self, config, is_reload=False): + self.config = config self.cylc_config = DictTree( self.config.cfg['scheduler'], glbl_cfg().get(['scheduler']) ) - self.flow_file_update_time = time() # Dump the loaded flow.cylc file for future reference. config_dir = get_workflow_run_config_log_dir( @@ -1294,7 +1337,7 @@ def run_event_handlers(self, event, reason=""): return self.workflow_event_handler.handle(self, event, str(reason)) - def release_queued_tasks(self) -> None: + def release_queued_tasks(self) -> bool: """Release queued tasks, and submit jobs. The task queue manages references to task proxies in the task pool. @@ -1312,19 +1355,33 @@ def release_queued_tasks(self) -> None: * https://github.com/cylc/cylc-flow/pull/4620 * https://github.com/cylc/cylc-flow/issues/4974 + Returns: + True if tasks were passed through the submit-pipeline + (i.e. new waiting tasks have entered the preparing state OR + preparing tasks have been passed back through for + submission). + """ if ( not self.is_paused and self.stop_mode is None and self.auto_restart_time is None + and self.reload_pending is False ): pre_prep_tasks = self.pool.release_queued_tasks() elif ( - self.should_auto_restart_now() - and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL + ( + # Need to get preparing tasks to submit before auto restart + self.should_auto_restart_now() + and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL + ) or ( + # Need to get preparing tasks to submit before reload + self.reload_pending + ) ): - # Need to get preparing tasks to submit before auto restart + # don't release queued tasks, just finish processing any preparing + # ones pre_prep_tasks = [ itask for itask in self.pool.get_tasks() if itask.state(TASK_STATUS_PREPARING) @@ -1332,9 +1389,9 @@ def release_queued_tasks(self) -> None: # Return, if no tasks to submit. else: - return + return False if not pre_prep_tasks: - return + return False # Start the job submission process. self.is_updated = True @@ -1362,6 +1419,9 @@ def release_queued_tasks(self) -> None: f"{itask.state.get_resolved_dependencies()} in flow {flow}" ) + # one or more tasks were passed through the submission pipeline + return True + def process_workflow_db_queue(self): """Update workflow DB.""" self.workflow_db_mgr.process_queued_ops() @@ -1818,7 +1878,7 @@ async def _shutdown(self, reason: BaseException) -> None: LOG.exception(ex) # disconnect from workflow-db, stop db queue try: - self.workflow_db_mgr.process_queued_ops() + self.process_workflow_db_queue() self.workflow_db_mgr.on_workflow_shutdown() except Exception as exc: LOG.exception(exc) @@ -1931,12 +1991,22 @@ def check_auto_shutdown(self): return True - def pause_workflow(self) -> None: - """Pause the workflow.""" + def pause_workflow(self, msg: Optional[str] = None) -> None: + """Pause the workflow. + + Args: + msg: + A user-facing string explaining why the workflow was paused if + helpful. + + """ if self.is_paused: LOG.info("Workflow is already paused") return - LOG.info("PAUSING the workflow now") + _msg = "PAUSING the workflow now" + if msg: + _msg += f': {msg}' + LOG.info(_msg) self.is_paused = True self.workflow_db_mgr.put_workflow_paused() self.update_data_store() @@ -1945,8 +2015,14 @@ def resume_workflow(self, quiet: bool = False) -> None: """Resume the workflow. Args: - quiet: whether to log anything. + quiet: + Whether to log anything in the event the workflow is not + paused. + """ + if self.reload_pending: + LOG.warning('Cannot resume - workflow is reloading') + return if not self.is_paused: if not quiet: LOG.warning("Cannot resume - workflow is not paused") diff --git a/cylc/flow/workflow_status.py b/cylc/flow/workflow_status.py index b0cdf4a976c..da36119f28d 100644 --- a/cylc/flow/workflow_status.py +++ b/cylc/flow/workflow_status.py @@ -165,6 +165,8 @@ def get_workflow_status(schd: 'Scheduler') -> Tuple[str, str]: if schd.stop_mode is not None: status = WorkflowStatus.STOPPING status_msg = f'stopping: {schd.stop_mode.explain()}' + elif schd.reload_pending: + status_msg = f'reloading: {schd.reload_pending}' elif schd.is_stalled: status_msg = 'stalled' elif schd.is_paused: diff --git a/tests/conftest.py b/tests/conftest.py index fb7a7408f31..8f07a4f3441 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -109,6 +109,38 @@ def _log_filter( return _log_filter +@pytest.fixture +def log_scan(): + """Ensure log messages appear in the correct order. + + TRY TO AVOID DOING THIS! + + If you are trying to test a sequence of events you are likely better off + doing this a different way (e.g. mock the functions you are interested in + and test the call arguments/returns later). + + However, there are some occasions where this might be necessary, e.g. + testing a monolithic synchronous function. + + Args: + log: The caplog fixture. + items: Iterable of string messages to compare. All are tested + by "contains" i.e. "item in string". + + """ + def _log_scan(log, items): + records = iter(log.records) + record = next(records) + for item in items: + while item not in record.message: + try: + record = next(records) + except StopIteration: + raise Exception(f'Reached end of log looking for: {item}') + + return _log_scan + + @pytest.fixture(scope='session') def port_range(): return glbl_cfg().get(['scheduler', 'run hosts', 'ports']) diff --git a/tests/functional/logging/01-basic.t b/tests/functional/logging/01-basic.t index d16e6e8d3e9..98183026bb2 100644 --- a/tests/functional/logging/01-basic.t +++ b/tests/functional/logging/01-basic.t @@ -31,14 +31,18 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' R1 = reloader1 => stopper => reloader2 [runtime] [[reloader1, reloader2]] - script = cylc reload "${CYLC_WORKFLOW_ID}" + script = """ + cylc reload "${CYLC_WORKFLOW_ID}" + # wait for the command to complete + cylc__job__poll_grep_workflow_log 'Reload completed' + """ [[stopper]] script = cylc stop --now --now "${CYLC_WORKFLOW_ID}" __FLOW_CONFIG__ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" -workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" +workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" --main-loop='log db' # Check scheduler logs. ls "${WORKFLOW_RUN_DIR}/log/scheduler/" > schd_1.out @@ -55,7 +59,9 @@ cmp_ok conf_1.out << __EOF__ flow-processed.cylc __EOF__ -workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" +mv "$WORKFLOW_RUN_DIR/cylc.flow.main_loop.log_db.sql" "$WORKFLOW_RUN_DIR/01.cylc.flow.main_loop.log_db.sql" + +workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" --main-loop='log db' ls "${WORKFLOW_RUN_DIR}/log/scheduler/" > schd_2.out cmp_ok schd_2.out << __EOF__ diff --git a/tests/functional/reload/09-garbage.t b/tests/functional/reload/09-garbage.t index 5c9e8c6b889..0853a4e4c1e 100644 --- a/tests/functional/reload/09-garbage.t +++ b/tests/functional/reload/09-garbage.t @@ -29,6 +29,8 @@ TEST_NAME="${TEST_NAME_BASE}-run" workflow_run_ok "${TEST_NAME}" \ cylc play --reference-test --debug --no-detach "${WORKFLOW_NAME}" #------------------------------------------------------------------------------- -grep_ok 'Command failed: reload_workflow' "${WORKFLOW_RUN_DIR}/log/scheduler/log" +grep_ok \ + 'Reload failed - IllegalItemError: \[scheduling\]garbage' \ + "${WORKFLOW_RUN_DIR}/log/scheduler/log" #------------------------------------------------------------------------------- purge diff --git a/tests/integration/test_reload.py b/tests/integration/test_reload.py new file mode 100644 index 00000000000..a36f45acda9 --- /dev/null +++ b/tests/integration/test_reload.py @@ -0,0 +1,148 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Tests for reload behaviour in the scheduler.""" + +from contextlib import suppress + +from cylc.flow.task_state import ( + TASK_STATUS_WAITING, + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, +) + + +async def test_reload_waits_for_pending_tasks( + flow, + scheduler, + start, + monkeypatch, + capture_submission, + log_scan, +): + """Reload should flush out preparing tasks and pause the workflow. + + Reloading a workflow with preparing tasks may be unsafe and is at least + confusing. For safety we should pause the workflow and flush out any + preparing tasks before attempting reload. + + See https://github.com/cylc/cylc-flow/issues/5107 + """ + # a simple workflow with a single task + id_ = flow({ + 'scheduling': { + 'graph': { + 'R1': 'foo', + }, + }, + 'runtime': { + 'foo': {}, + }, + }) + schd = scheduler(id_, paused_start=False) + + # we will artificially push the task through these states + state_seq = [ + # repeat the preparing state a few times to simulate a task + # taking multiple main-loop cycles to submit + TASK_STATUS_PREPARING, + TASK_STATUS_PREPARING, + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, + ] + + # start the scheduler + async with start(schd) as log: + # disable submission events to prevent anything from actually running + capture_submission(schd) + + # set the task to go through some state changes + def change_state(_=0): + with suppress(IndexError): + foo.state_reset(state_seq.pop(0)) + monkeypatch.setattr( + 'cylc.flow.scheduler.sleep', + change_state + ) + + # the task should start as waiting + tasks = schd.pool.get_tasks() + assert len(tasks) == 1 + foo = tasks[0] + assert tasks[0].state(TASK_STATUS_WAITING) + + # put the task into the preparing state + change_state() + + # reload the workflow + schd.command_reload_workflow() + + # the task should end in the submitted state + assert foo.state(TASK_STATUS_SUBMITTED) + + # ensure the order of events was correct + log_scan( + log, + [ + # the task should have entered the preparing state before the + # reload was requested + '[1/foo waiting(queued) job:00 flows:1] => preparing(queued)', + # the reload should have put the workflow into the paused state + 'PAUSING the workflow now: Reloading workflow defintion', + # reload should have waited for the task to submit + '[1/foo preparing(queued) job:00 flows:1]' + ' => submitted(queued)', + # before then reloading the workflow config + 'Reloading the workflow definition.', + # post-reload the workflow should have been resumed + 'RESUMING the workflow now', + ], + ) + + +async def test_reload_failure( + flow, + one_conf, + scheduler, + start, + log_filter, +): + """Reload should not crash the workflow on config errors. + + A warning should be logged along with the error. + """ + id_ = flow(one_conf) + schd = scheduler(id_) + async with start(schd) as log: + # corrupt the config by removing the scheduling section + two_conf = {**one_conf, 'scheduling': {}} + flow(two_conf, id_=id_) + + # reload the workflow + schd.command_reload_workflow() + + # the reload should have failed but the workflow should still be + # running + assert log_filter( + log, + contains=( + 'Reload failed - WorkflowConfigError:' + ' missing [scheduling][[graph]] section' + ) + ) + + # the config should be unchanged + assert schd.config.cfg['scheduling']['graph']['R1'] == 'one' diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index c183036d004..5330e516758 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -241,80 +241,6 @@ async def test_no_poll_waiting_tasks( assert "Orphaned tasks:\n* 1/one (running)" in log.messages -@pytest.mark.parametrize('reload', [False, True]) -@pytest.mark.parametrize( - 'test_conf, expected_msg', - [ - pytest.param( - {'Alan Wake': "It's not a lake, it's an ocean"}, - "IllegalItemError: Alan Wake", - id="illegal item" - ), - pytest.param( - { - 'scheduling': { - 'initial cycle point': "2k22", - 'graph': {'R1': "a => b"} - } - }, - ("IllegalValueError: (type=cycle point) " - "[scheduling]initial cycle point = 2k22 - (Invalid cycle point)"), - id="illegal cycle point" - ) - ] -) -async def test_illegal_config_load( - test_conf: dict, - expected_msg: str, - reload: bool, - flow: Callable, - one_conf: dict, - start: Callable, - run: Callable, - scheduler: Callable, - log_filter: Callable -): - """Test that ParsecErrors (illegal config) - that occur during config load - when running a workflow - are displayed without traceback. - - Params: - test_conf: Dict to update one_conf with. - expected_msg: Expected log message at error level. - reload: If False, test a workflow start with invalid config. - If True, test a workflow start with valid config followed by - reload with invalid config. - """ - if not reload: - one_conf.update(test_conf) - id_: str = flow(one_conf) - schd: Scheduler = scheduler(id_) - log: pytest.LogCaptureFixture - - if reload: - one_conf.update(test_conf) - async with run(schd) as log: - # Shouldn't be any errors at this stage: - assert not log_filter(log, level=logging.ERROR) - # Modify flow.cylc: - flow(one_conf, id_=id_) - schd.queue_command('reload_workflow', {}) - assert log_filter( - log, level=logging.ERROR, - exact_match=f"Command failed: reload_workflow()\n{expected_msg}" - ) - else: - with pytest.raises(ParsecError): - async with start(schd) as log: - pass - assert log_filter( - log, - level=logging.ERROR, - exact_match=f"Workflow shutting down - {expected_msg}" - ) - - assert TRACEBACK_MSG not in log.text - - async def test_unexpected_ParsecError( one: Scheduler, start: Callable, diff --git a/tests/unit/test_workflow_status.py b/tests/unit/test_workflow_status.py index 046783d4ff1..c46a681f2bc 100644 --- a/tests/unit/test_workflow_status.py +++ b/tests/unit/test_workflow_status.py @@ -36,12 +36,14 @@ def schd( stop_mode=None, stop_point=None, stop_task_id=None, + reload_pending=False, ): return SimpleNamespace( is_paused=is_paused, is_stalled=is_stalled, stop_clock_time=stop_clock_time, stop_mode=stop_mode, + reload_pending=reload_pending, pool=SimpleNamespace( hold_point=hold_point, stop_point=stop_point, @@ -58,7 +60,13 @@ def schd( ( {'is_paused': True}, WorkflowStatus.PAUSED, - 'paused'), + 'paused' + ), + ( + {'reload_pending': 'message'}, + WorkflowStatus.RUNNING, + 'reloading: message' + ), ( {'stop_mode': StopMode.AUTO}, WorkflowStatus.STOPPING,