From b368e7cdf6514c66f1200a1aad8913a7da0b9b35 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Mon, 19 Jun 2023 12:22:29 +0100 Subject: [PATCH] reload: wait for pending tasks to submit and pause the workflow * Closes https://github.com/cylc/cylc-flow/issues/5107 * Reload now waits for pending tasks to submit before attempting to reload the config itself. * Reload now also puts the workflow into the paused state during the reload process. This doesn't actually achieve anything as the reload command is blocking in the main loop, but it does help to communicate that the workflow will not de-queue or submit and new tasks during this process. * The workflow status message is now updated to reflect the reload progress. --- CHANGES.md | 4 + cylc/flow/scheduler.py | 164 ++++++++++++++++++++------- cylc/flow/workflow_status.py | 2 + tests/conftest.py | 32 ++++++ tests/functional/logging/01-basic.t | 12 +- tests/functional/reload/09-garbage.t | 4 +- tests/integration/test_reload.py | 148 ++++++++++++++++++++++++ tests/integration/test_scheduler.py | 74 ------------ tests/unit/test_workflow_status.py | 10 +- 9 files changed, 327 insertions(+), 123 deletions(-) create mode 100644 tests/integration/test_reload.py diff --git a/CHANGES.md b/CHANGES.md index 7370fff01c7..41128f3a19e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,10 @@ ones in. --> ### Enhancements +[#5992](https://github.com/cylc/cylc-flow/pull/5992) - +The scheduler will now wait for preparing tasks to submit before attempting +to perform a reload and will also pause/unpause the workflow. + [#5405](https://github.com/cylc/cylc-flow/pull/5405) - Improve scan command help, and add scheduler PID to the output. diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 7b6cfe6a44d..3d9083c4200 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -32,12 +32,12 @@ from typing import ( TYPE_CHECKING, Callable, + Dict, Iterable, + List, NoReturn, Optional, - List, Set, - Dict, Tuple, Union, ) @@ -150,6 +150,10 @@ if TYPE_CHECKING: from cylc.flow.task_proxy import TaskProxy + # BACK COMPAT: typing_extensions.Literal + # FROM: Python 3.7 + # TO: Python 3.8 + from typing_extensions import Literal class SchedulerStop(CylcError): @@ -244,6 +248,7 @@ class Scheduler: stop_mode: Optional[StopMode] = None stop_task: Optional[str] = None stop_clock_time: Optional[int] = None + reload_pending: 'Union[Literal[False], str]' = False # task event loop is_paused = False @@ -427,7 +432,8 @@ async def configure(self): self.profiler.log_memory("scheduler.py: before load_flow_file") try: - self.load_flow_file() + cfg = self.load_flow_file() + self.apply_new_config(cfg, is_reload=False) except ParsecError as exc: # Mark this exc as expected (see docstring for .schd_expected): exc.schd_expected = True @@ -521,8 +527,7 @@ async def configure(self): self.command_set_hold_point(holdcp) if self.options.paused_start: - LOG.info("Paused on start up") - self.pause_workflow() + self.pause_workflow('Paused on start up') self.profiler.log_memory("scheduler.py: begin run while loop") self.is_updated = True @@ -1048,35 +1053,71 @@ def command_remove_tasks(self, items) -> int: def command_reload_workflow(self) -> None: """Reload workflow configuration.""" + # pause the workflow if not already + was_paused = self.is_paused + if not was_paused: + self.pause_workflow('Reloading workflow defintion') + self.process_workflow_db_queue() # see #5593 + + # flush out preparing tasks before attempting reload + self.reload_pending = 'waiting for pending tasks to submit' + while self.release_queued_tasks(): + # NOTE: this method was called by process_command_queue which is + # called synchronously in the main loop so this call is blocking to + # other main loop functions + sleep(1) # give any remove-init's time to complete + self.reload_pending = 'loading the workflow definition' + + # reload the workflow definition LOG.info("Reloading the workflow definition.") - old_tasks = set(self.config.get_task_name_list()) - # Things that can't change on workflow reload: - self.workflow_db_mgr.pri_dao.select_workflow_params( - self._load_workflow_params - ) - try: - self.load_flow_file(is_reload=True) + cfg = self.load_flow_file(is_reload=True) except (ParsecError, CylcConfigError) as exc: - raise CommandFailedError(exc) - self.broadcast_mgr.linearized_ancestors = ( - self.config.get_linearized_ancestors()) - self.pool.set_do_reload(self.config) - self.task_events_mgr.mail_interval = self.cylc_config['mail'][ - 'task event batch interval'] - self.task_events_mgr.mail_smtp = self._get_events_conf("smtp") - self.task_events_mgr.mail_footer = self._get_events_conf("footer") - - # Log tasks that have been added by the reload, removed tasks are - # logged by the TaskPool. - add = set(self.config.get_task_name_list()) - old_tasks - for task in add: - LOG.warning(f"Added task: '{task}'") - self.workflow_db_mgr.put_workflow_template_vars(self.template_vars) - self.workflow_db_mgr.put_runtime_inheritance(self.config) - self.workflow_db_mgr.put_workflow_params(self) - self.is_updated = True - self.is_reloaded = True + if cylc.flow.flags.verbosity > 1: + # log full traceback in debug mode + LOG.exception(exc) + LOG.critical( + f'Reload failed - {exc.__class__.__name__}: {exc}' + '\nThis is probably due to an issue with the new' + ' configuration.' + '\nTo continue with the pre-reload config, un-pause the' + ' workflow.' + '\nOtherwise, fix the configuration and attempt to reload' + ' again.' + ) + else: + self.reload_pending = 'applying the new config' + old_tasks = set(self.config.get_task_name_list()) + # Things that can't change on workflow reload: + self.workflow_db_mgr.pri_dao.select_workflow_params( + self._load_workflow_params + ) + self.apply_new_config(cfg, is_reload=True) + self.broadcast_mgr.linearized_ancestors = ( + self.config.get_linearized_ancestors()) + self.pool.set_do_reload(self.config) + self.task_events_mgr.mail_interval = self.cylc_config['mail'][ + 'task event batch interval'] + self.task_events_mgr.mail_smtp = self._get_events_conf("smtp") + self.task_events_mgr.mail_footer = self._get_events_conf("footer") + + # Log tasks that have been added by the reload, removed tasks are + # logged by the TaskPool. + add = set(self.config.get_task_name_list()) - old_tasks + for task in add: + LOG.warning(f"Added task: '{task}'") + self.workflow_db_mgr.put_workflow_template_vars(self.template_vars) + self.workflow_db_mgr.put_runtime_inheritance(self.config) + self.workflow_db_mgr.put_workflow_params(self) + self.process_workflow_db_queue() # see #5593 + self.is_updated = True + self.is_reloaded = True + + # resume the workflow if previously paused + self.reload_pending = False + if not was_paused: + self.resume_workflow() + self.process_workflow_db_queue() # see #5593 def get_restart_num(self) -> int: """Return the number of the restart, else 0 if not a restart. @@ -1147,7 +1188,7 @@ def _configure_contact(self) -> None: def load_flow_file(self, is_reload=False): """Load, and log the workflow definition.""" # Local workflow environment set therein. - self.config = WorkflowConfig( + return WorkflowConfig( self.workflow, self.flow_file, self.options, @@ -1163,11 +1204,13 @@ def load_flow_file(self, is_reload=False): work_dir=self.workflow_work_dir, share_dir=self.workflow_share_dir, ) + + def apply_new_config(self, config, is_reload=False): + self.config = config self.cylc_config = DictTree( self.config.cfg['scheduler'], glbl_cfg().get(['scheduler']) ) - self.flow_file_update_time = time() # Dump the loaded flow.cylc file for future reference. config_dir = get_workflow_run_config_log_dir( @@ -1294,7 +1337,7 @@ def run_event_handlers(self, event, reason=""): return self.workflow_event_handler.handle(self, event, str(reason)) - def release_queued_tasks(self) -> None: + def release_queued_tasks(self) -> bool: """Release queued tasks, and submit jobs. The task queue manages references to task proxies in the task pool. @@ -1312,19 +1355,33 @@ def release_queued_tasks(self) -> None: * https://github.com/cylc/cylc-flow/pull/4620 * https://github.com/cylc/cylc-flow/issues/4974 + Returns: + True if tasks were passed through the submit-pipeline + (i.e. new waiting tasks have entered the preparing state OR + preparing tasks have been passed back through for + submission). + """ if ( not self.is_paused and self.stop_mode is None and self.auto_restart_time is None + and self.reload_pending is False ): pre_prep_tasks = self.pool.release_queued_tasks() elif ( - self.should_auto_restart_now() - and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL + ( + # Need to get preparing tasks to submit before auto restart + self.should_auto_restart_now() + and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL + ) or ( + # Need to get preparing tasks to submit before reload + self.reload_pending + ) ): - # Need to get preparing tasks to submit before auto restart + # don't release queued tasks, just finish processing any preparing + # ones pre_prep_tasks = [ itask for itask in self.pool.get_tasks() if itask.state(TASK_STATUS_PREPARING) @@ -1332,9 +1389,9 @@ def release_queued_tasks(self) -> None: # Return, if no tasks to submit. else: - return + return False if not pre_prep_tasks: - return + return False # Start the job submission process. self.is_updated = True @@ -1362,6 +1419,9 @@ def release_queued_tasks(self) -> None: f"{itask.state.get_resolved_dependencies()} in flow {flow}" ) + # one or more tasks were passed through the submission pipeline + return True + def process_workflow_db_queue(self): """Update workflow DB.""" self.workflow_db_mgr.process_queued_ops() @@ -1818,7 +1878,7 @@ async def _shutdown(self, reason: BaseException) -> None: LOG.exception(ex) # disconnect from workflow-db, stop db queue try: - self.workflow_db_mgr.process_queued_ops() + self.process_workflow_db_queue() self.workflow_db_mgr.on_workflow_shutdown() except Exception as exc: LOG.exception(exc) @@ -1931,12 +1991,22 @@ def check_auto_shutdown(self): return True - def pause_workflow(self) -> None: - """Pause the workflow.""" + def pause_workflow(self, msg: Optional[str] = None) -> None: + """Pause the workflow. + + Args: + msg: + A user-facing string explaining why the workflow was paused if + helpful. + + """ if self.is_paused: LOG.info("Workflow is already paused") return - LOG.info("PAUSING the workflow now") + _msg = "PAUSING the workflow now" + if msg: + _msg += f': {msg}' + LOG.info(_msg) self.is_paused = True self.workflow_db_mgr.put_workflow_paused() self.update_data_store() @@ -1945,8 +2015,14 @@ def resume_workflow(self, quiet: bool = False) -> None: """Resume the workflow. Args: - quiet: whether to log anything. + quiet: + Whether to log anything in the event the workflow is not + paused. + """ + if self.reload_pending: + LOG.warning('Cannot resume - workflow is reloading') + return if not self.is_paused: if not quiet: LOG.warning("Cannot resume - workflow is not paused") diff --git a/cylc/flow/workflow_status.py b/cylc/flow/workflow_status.py index b0cdf4a976c..da36119f28d 100644 --- a/cylc/flow/workflow_status.py +++ b/cylc/flow/workflow_status.py @@ -165,6 +165,8 @@ def get_workflow_status(schd: 'Scheduler') -> Tuple[str, str]: if schd.stop_mode is not None: status = WorkflowStatus.STOPPING status_msg = f'stopping: {schd.stop_mode.explain()}' + elif schd.reload_pending: + status_msg = f'reloading: {schd.reload_pending}' elif schd.is_stalled: status_msg = 'stalled' elif schd.is_paused: diff --git a/tests/conftest.py b/tests/conftest.py index fb7a7408f31..8f07a4f3441 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -109,6 +109,38 @@ def _log_filter( return _log_filter +@pytest.fixture +def log_scan(): + """Ensure log messages appear in the correct order. + + TRY TO AVOID DOING THIS! + + If you are trying to test a sequence of events you are likely better off + doing this a different way (e.g. mock the functions you are interested in + and test the call arguments/returns later). + + However, there are some occasions where this might be necessary, e.g. + testing a monolithic synchronous function. + + Args: + log: The caplog fixture. + items: Iterable of string messages to compare. All are tested + by "contains" i.e. "item in string". + + """ + def _log_scan(log, items): + records = iter(log.records) + record = next(records) + for item in items: + while item not in record.message: + try: + record = next(records) + except StopIteration: + raise Exception(f'Reached end of log looking for: {item}') + + return _log_scan + + @pytest.fixture(scope='session') def port_range(): return glbl_cfg().get(['scheduler', 'run hosts', 'ports']) diff --git a/tests/functional/logging/01-basic.t b/tests/functional/logging/01-basic.t index d16e6e8d3e9..98183026bb2 100644 --- a/tests/functional/logging/01-basic.t +++ b/tests/functional/logging/01-basic.t @@ -31,14 +31,18 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' R1 = reloader1 => stopper => reloader2 [runtime] [[reloader1, reloader2]] - script = cylc reload "${CYLC_WORKFLOW_ID}" + script = """ + cylc reload "${CYLC_WORKFLOW_ID}" + # wait for the command to complete + cylc__job__poll_grep_workflow_log 'Reload completed' + """ [[stopper]] script = cylc stop --now --now "${CYLC_WORKFLOW_ID}" __FLOW_CONFIG__ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" -workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" +workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" --main-loop='log db' # Check scheduler logs. ls "${WORKFLOW_RUN_DIR}/log/scheduler/" > schd_1.out @@ -55,7 +59,9 @@ cmp_ok conf_1.out << __EOF__ flow-processed.cylc __EOF__ -workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" +mv "$WORKFLOW_RUN_DIR/cylc.flow.main_loop.log_db.sql" "$WORKFLOW_RUN_DIR/01.cylc.flow.main_loop.log_db.sql" + +workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" --main-loop='log db' ls "${WORKFLOW_RUN_DIR}/log/scheduler/" > schd_2.out cmp_ok schd_2.out << __EOF__ diff --git a/tests/functional/reload/09-garbage.t b/tests/functional/reload/09-garbage.t index 5c9e8c6b889..0853a4e4c1e 100644 --- a/tests/functional/reload/09-garbage.t +++ b/tests/functional/reload/09-garbage.t @@ -29,6 +29,8 @@ TEST_NAME="${TEST_NAME_BASE}-run" workflow_run_ok "${TEST_NAME}" \ cylc play --reference-test --debug --no-detach "${WORKFLOW_NAME}" #------------------------------------------------------------------------------- -grep_ok 'Command failed: reload_workflow' "${WORKFLOW_RUN_DIR}/log/scheduler/log" +grep_ok \ + 'Reload failed - IllegalItemError: \[scheduling\]garbage' \ + "${WORKFLOW_RUN_DIR}/log/scheduler/log" #------------------------------------------------------------------------------- purge diff --git a/tests/integration/test_reload.py b/tests/integration/test_reload.py new file mode 100644 index 00000000000..a36f45acda9 --- /dev/null +++ b/tests/integration/test_reload.py @@ -0,0 +1,148 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Tests for reload behaviour in the scheduler.""" + +from contextlib import suppress + +from cylc.flow.task_state import ( + TASK_STATUS_WAITING, + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, +) + + +async def test_reload_waits_for_pending_tasks( + flow, + scheduler, + start, + monkeypatch, + capture_submission, + log_scan, +): + """Reload should flush out preparing tasks and pause the workflow. + + Reloading a workflow with preparing tasks may be unsafe and is at least + confusing. For safety we should pause the workflow and flush out any + preparing tasks before attempting reload. + + See https://github.com/cylc/cylc-flow/issues/5107 + """ + # a simple workflow with a single task + id_ = flow({ + 'scheduling': { + 'graph': { + 'R1': 'foo', + }, + }, + 'runtime': { + 'foo': {}, + }, + }) + schd = scheduler(id_, paused_start=False) + + # we will artificially push the task through these states + state_seq = [ + # repeat the preparing state a few times to simulate a task + # taking multiple main-loop cycles to submit + TASK_STATUS_PREPARING, + TASK_STATUS_PREPARING, + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, + ] + + # start the scheduler + async with start(schd) as log: + # disable submission events to prevent anything from actually running + capture_submission(schd) + + # set the task to go through some state changes + def change_state(_=0): + with suppress(IndexError): + foo.state_reset(state_seq.pop(0)) + monkeypatch.setattr( + 'cylc.flow.scheduler.sleep', + change_state + ) + + # the task should start as waiting + tasks = schd.pool.get_tasks() + assert len(tasks) == 1 + foo = tasks[0] + assert tasks[0].state(TASK_STATUS_WAITING) + + # put the task into the preparing state + change_state() + + # reload the workflow + schd.command_reload_workflow() + + # the task should end in the submitted state + assert foo.state(TASK_STATUS_SUBMITTED) + + # ensure the order of events was correct + log_scan( + log, + [ + # the task should have entered the preparing state before the + # reload was requested + '[1/foo waiting(queued) job:00 flows:1] => preparing(queued)', + # the reload should have put the workflow into the paused state + 'PAUSING the workflow now: Reloading workflow defintion', + # reload should have waited for the task to submit + '[1/foo preparing(queued) job:00 flows:1]' + ' => submitted(queued)', + # before then reloading the workflow config + 'Reloading the workflow definition.', + # post-reload the workflow should have been resumed + 'RESUMING the workflow now', + ], + ) + + +async def test_reload_failure( + flow, + one_conf, + scheduler, + start, + log_filter, +): + """Reload should not crash the workflow on config errors. + + A warning should be logged along with the error. + """ + id_ = flow(one_conf) + schd = scheduler(id_) + async with start(schd) as log: + # corrupt the config by removing the scheduling section + two_conf = {**one_conf, 'scheduling': {}} + flow(two_conf, id_=id_) + + # reload the workflow + schd.command_reload_workflow() + + # the reload should have failed but the workflow should still be + # running + assert log_filter( + log, + contains=( + 'Reload failed - WorkflowConfigError:' + ' missing [scheduling][[graph]] section' + ) + ) + + # the config should be unchanged + assert schd.config.cfg['scheduling']['graph']['R1'] == 'one' diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index c183036d004..5330e516758 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -241,80 +241,6 @@ async def test_no_poll_waiting_tasks( assert "Orphaned tasks:\n* 1/one (running)" in log.messages -@pytest.mark.parametrize('reload', [False, True]) -@pytest.mark.parametrize( - 'test_conf, expected_msg', - [ - pytest.param( - {'Alan Wake': "It's not a lake, it's an ocean"}, - "IllegalItemError: Alan Wake", - id="illegal item" - ), - pytest.param( - { - 'scheduling': { - 'initial cycle point': "2k22", - 'graph': {'R1': "a => b"} - } - }, - ("IllegalValueError: (type=cycle point) " - "[scheduling]initial cycle point = 2k22 - (Invalid cycle point)"), - id="illegal cycle point" - ) - ] -) -async def test_illegal_config_load( - test_conf: dict, - expected_msg: str, - reload: bool, - flow: Callable, - one_conf: dict, - start: Callable, - run: Callable, - scheduler: Callable, - log_filter: Callable -): - """Test that ParsecErrors (illegal config) - that occur during config load - when running a workflow - are displayed without traceback. - - Params: - test_conf: Dict to update one_conf with. - expected_msg: Expected log message at error level. - reload: If False, test a workflow start with invalid config. - If True, test a workflow start with valid config followed by - reload with invalid config. - """ - if not reload: - one_conf.update(test_conf) - id_: str = flow(one_conf) - schd: Scheduler = scheduler(id_) - log: pytest.LogCaptureFixture - - if reload: - one_conf.update(test_conf) - async with run(schd) as log: - # Shouldn't be any errors at this stage: - assert not log_filter(log, level=logging.ERROR) - # Modify flow.cylc: - flow(one_conf, id_=id_) - schd.queue_command('reload_workflow', {}) - assert log_filter( - log, level=logging.ERROR, - exact_match=f"Command failed: reload_workflow()\n{expected_msg}" - ) - else: - with pytest.raises(ParsecError): - async with start(schd) as log: - pass - assert log_filter( - log, - level=logging.ERROR, - exact_match=f"Workflow shutting down - {expected_msg}" - ) - - assert TRACEBACK_MSG not in log.text - - async def test_unexpected_ParsecError( one: Scheduler, start: Callable, diff --git a/tests/unit/test_workflow_status.py b/tests/unit/test_workflow_status.py index 046783d4ff1..c46a681f2bc 100644 --- a/tests/unit/test_workflow_status.py +++ b/tests/unit/test_workflow_status.py @@ -36,12 +36,14 @@ def schd( stop_mode=None, stop_point=None, stop_task_id=None, + reload_pending=False, ): return SimpleNamespace( is_paused=is_paused, is_stalled=is_stalled, stop_clock_time=stop_clock_time, stop_mode=stop_mode, + reload_pending=reload_pending, pool=SimpleNamespace( hold_point=hold_point, stop_point=stop_point, @@ -58,7 +60,13 @@ def schd( ( {'is_paused': True}, WorkflowStatus.PAUSED, - 'paused'), + 'paused' + ), + ( + {'reload_pending': 'message'}, + WorkflowStatus.RUNNING, + 'reloading: message' + ), ( {'stop_mode': StopMode.AUTO}, WorkflowStatus.STOPPING,