Skip to content

Commit

Permalink
reload: wait for pending tasks to submit and pause the workflow
Browse files Browse the repository at this point in the history
* Closes cylc#5107
* Reload now waits for pending tasks to submit before attempting to
  reload the config itself.
* Reload now also puts the workflow into the paused state during the
  reload process. This doesn't actually achieve anything as the
  reload command is blocking in the main loop, but it does help to
  communicate that the workflow will not de-queue or submit and new
  tasks during this process.
* The workflow status message is now updated to reflect the reload
  progress.
  • Loading branch information
oliver-sanders committed Jun 19, 2023
1 parent fee6776 commit b368e7c
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 123 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ ones in. -->

### Enhancements

[#5992](https://github.com/cylc/cylc-flow/pull/5992) -
The scheduler will now wait for preparing tasks to submit before attempting
to perform a reload and will also pause/unpause the workflow.

[#5405](https://github.com/cylc/cylc-flow/pull/5405) - Improve scan command
help, and add scheduler PID to the output.

Expand Down
164 changes: 120 additions & 44 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
from typing import (
TYPE_CHECKING,
Callable,
Dict,
Iterable,
List,
NoReturn,
Optional,
List,
Set,
Dict,
Tuple,
Union,
)
Expand Down Expand Up @@ -150,6 +150,10 @@

if TYPE_CHECKING:
from cylc.flow.task_proxy import TaskProxy
# BACK COMPAT: typing_extensions.Literal
# FROM: Python 3.7
# TO: Python 3.8
from typing_extensions import Literal


class SchedulerStop(CylcError):
Expand Down Expand Up @@ -244,6 +248,7 @@ class Scheduler:
stop_mode: Optional[StopMode] = None
stop_task: Optional[str] = None
stop_clock_time: Optional[int] = None
reload_pending: 'Union[Literal[False], str]' = False

# task event loop
is_paused = False
Expand Down Expand Up @@ -427,7 +432,8 @@ async def configure(self):

self.profiler.log_memory("scheduler.py: before load_flow_file")
try:
self.load_flow_file()
cfg = self.load_flow_file()
self.apply_new_config(cfg, is_reload=False)
except ParsecError as exc:
# Mark this exc as expected (see docstring for .schd_expected):
exc.schd_expected = True
Expand Down Expand Up @@ -521,8 +527,7 @@ async def configure(self):
self.command_set_hold_point(holdcp)

if self.options.paused_start:
LOG.info("Paused on start up")
self.pause_workflow()
self.pause_workflow('Paused on start up')

self.profiler.log_memory("scheduler.py: begin run while loop")
self.is_updated = True
Expand Down Expand Up @@ -1048,35 +1053,71 @@ def command_remove_tasks(self, items) -> int:

def command_reload_workflow(self) -> None:
"""Reload workflow configuration."""
# pause the workflow if not already
was_paused = self.is_paused
if not was_paused:
self.pause_workflow('Reloading workflow defintion')
self.process_workflow_db_queue() # see #5593

# flush out preparing tasks before attempting reload
self.reload_pending = 'waiting for pending tasks to submit'
while self.release_queued_tasks():
# NOTE: this method was called by process_command_queue which is
# called synchronously in the main loop so this call is blocking to
# other main loop functions
sleep(1) # give any remove-init's time to complete
self.reload_pending = 'loading the workflow definition'

# reload the workflow definition
LOG.info("Reloading the workflow definition.")
old_tasks = set(self.config.get_task_name_list())
# Things that can't change on workflow reload:
self.workflow_db_mgr.pri_dao.select_workflow_params(
self._load_workflow_params
)

try:
self.load_flow_file(is_reload=True)
cfg = self.load_flow_file(is_reload=True)
except (ParsecError, CylcConfigError) as exc:
raise CommandFailedError(exc)
self.broadcast_mgr.linearized_ancestors = (
self.config.get_linearized_ancestors())
self.pool.set_do_reload(self.config)
self.task_events_mgr.mail_interval = self.cylc_config['mail'][
'task event batch interval']
self.task_events_mgr.mail_smtp = self._get_events_conf("smtp")
self.task_events_mgr.mail_footer = self._get_events_conf("footer")

# Log tasks that have been added by the reload, removed tasks are
# logged by the TaskPool.
add = set(self.config.get_task_name_list()) - old_tasks
for task in add:
LOG.warning(f"Added task: '{task}'")
self.workflow_db_mgr.put_workflow_template_vars(self.template_vars)
self.workflow_db_mgr.put_runtime_inheritance(self.config)
self.workflow_db_mgr.put_workflow_params(self)
self.is_updated = True
self.is_reloaded = True
if cylc.flow.flags.verbosity > 1:
# log full traceback in debug mode
LOG.exception(exc)
LOG.critical(
f'Reload failed - {exc.__class__.__name__}: {exc}'
'\nThis is probably due to an issue with the new'
' configuration.'
'\nTo continue with the pre-reload config, un-pause the'
' workflow.'
'\nOtherwise, fix the configuration and attempt to reload'
' again.'
)
else:
self.reload_pending = 'applying the new config'
old_tasks = set(self.config.get_task_name_list())
# Things that can't change on workflow reload:
self.workflow_db_mgr.pri_dao.select_workflow_params(
self._load_workflow_params
)
self.apply_new_config(cfg, is_reload=True)
self.broadcast_mgr.linearized_ancestors = (
self.config.get_linearized_ancestors())
self.pool.set_do_reload(self.config)
self.task_events_mgr.mail_interval = self.cylc_config['mail'][
'task event batch interval']
self.task_events_mgr.mail_smtp = self._get_events_conf("smtp")
self.task_events_mgr.mail_footer = self._get_events_conf("footer")

# Log tasks that have been added by the reload, removed tasks are
# logged by the TaskPool.
add = set(self.config.get_task_name_list()) - old_tasks
for task in add:
LOG.warning(f"Added task: '{task}'")
self.workflow_db_mgr.put_workflow_template_vars(self.template_vars)
self.workflow_db_mgr.put_runtime_inheritance(self.config)
self.workflow_db_mgr.put_workflow_params(self)
self.process_workflow_db_queue() # see #5593
self.is_updated = True
self.is_reloaded = True

# resume the workflow if previously paused
self.reload_pending = False
if not was_paused:
self.resume_workflow()
self.process_workflow_db_queue() # see #5593

def get_restart_num(self) -> int:
"""Return the number of the restart, else 0 if not a restart.
Expand Down Expand Up @@ -1147,7 +1188,7 @@ def _configure_contact(self) -> None:
def load_flow_file(self, is_reload=False):
"""Load, and log the workflow definition."""
# Local workflow environment set therein.
self.config = WorkflowConfig(
return WorkflowConfig(
self.workflow,
self.flow_file,
self.options,
Expand All @@ -1163,11 +1204,13 @@ def load_flow_file(self, is_reload=False):
work_dir=self.workflow_work_dir,
share_dir=self.workflow_share_dir,
)

def apply_new_config(self, config, is_reload=False):
self.config = config
self.cylc_config = DictTree(
self.config.cfg['scheduler'],
glbl_cfg().get(['scheduler'])
)

self.flow_file_update_time = time()
# Dump the loaded flow.cylc file for future reference.
config_dir = get_workflow_run_config_log_dir(
Expand Down Expand Up @@ -1294,7 +1337,7 @@ def run_event_handlers(self, event, reason=""):
return
self.workflow_event_handler.handle(self, event, str(reason))

def release_queued_tasks(self) -> None:
def release_queued_tasks(self) -> bool:
"""Release queued tasks, and submit jobs.
The task queue manages references to task proxies in the task pool.
Expand All @@ -1312,29 +1355,43 @@ def release_queued_tasks(self) -> None:
* https://github.com/cylc/cylc-flow/pull/4620
* https://github.com/cylc/cylc-flow/issues/4974
Returns:
True if tasks were passed through the submit-pipeline
(i.e. new waiting tasks have entered the preparing state OR
preparing tasks have been passed back through for
submission).
"""
if (
not self.is_paused
and self.stop_mode is None
and self.auto_restart_time is None
and self.reload_pending is False
):
pre_prep_tasks = self.pool.release_queued_tasks()

elif (
self.should_auto_restart_now()
and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL
(
# Need to get preparing tasks to submit before auto restart
self.should_auto_restart_now()
and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL
) or (
# Need to get preparing tasks to submit before reload
self.reload_pending
)
):
# Need to get preparing tasks to submit before auto restart
# don't release queued tasks, just finish processing any preparing
# ones
pre_prep_tasks = [
itask for itask in self.pool.get_tasks()
if itask.state(TASK_STATUS_PREPARING)
]

# Return, if no tasks to submit.
else:
return
return False
if not pre_prep_tasks:
return
return False

# Start the job submission process.
self.is_updated = True
Expand Down Expand Up @@ -1362,6 +1419,9 @@ def release_queued_tasks(self) -> None:
f"{itask.state.get_resolved_dependencies()} in flow {flow}"
)

# one or more tasks were passed through the submission pipeline
return True

def process_workflow_db_queue(self):
"""Update workflow DB."""
self.workflow_db_mgr.process_queued_ops()
Expand Down Expand Up @@ -1818,7 +1878,7 @@ async def _shutdown(self, reason: BaseException) -> None:
LOG.exception(ex)
# disconnect from workflow-db, stop db queue
try:
self.workflow_db_mgr.process_queued_ops()
self.process_workflow_db_queue()
self.workflow_db_mgr.on_workflow_shutdown()
except Exception as exc:
LOG.exception(exc)
Expand Down Expand Up @@ -1931,12 +1991,22 @@ def check_auto_shutdown(self):

return True

def pause_workflow(self) -> None:
"""Pause the workflow."""
def pause_workflow(self, msg: Optional[str] = None) -> None:
"""Pause the workflow.
Args:
msg:
A user-facing string explaining why the workflow was paused if
helpful.
"""
if self.is_paused:
LOG.info("Workflow is already paused")
return
LOG.info("PAUSING the workflow now")
_msg = "PAUSING the workflow now"
if msg:
_msg += f': {msg}'
LOG.info(_msg)
self.is_paused = True
self.workflow_db_mgr.put_workflow_paused()
self.update_data_store()
Expand All @@ -1945,8 +2015,14 @@ def resume_workflow(self, quiet: bool = False) -> None:
"""Resume the workflow.
Args:
quiet: whether to log anything.
quiet:
Whether to log anything in the event the workflow is not
paused.
"""
if self.reload_pending:
LOG.warning('Cannot resume - workflow is reloading')
return
if not self.is_paused:
if not quiet:
LOG.warning("Cannot resume - workflow is not paused")
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/workflow_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ def get_workflow_status(schd: 'Scheduler') -> Tuple[str, str]:
if schd.stop_mode is not None:
status = WorkflowStatus.STOPPING
status_msg = f'stopping: {schd.stop_mode.explain()}'
elif schd.reload_pending:
status_msg = f'reloading: {schd.reload_pending}'
elif schd.is_stalled:
status_msg = 'stalled'
elif schd.is_paused:
Expand Down
32 changes: 32 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,38 @@ def _log_filter(
return _log_filter


@pytest.fixture
def log_scan():
"""Ensure log messages appear in the correct order.
TRY TO AVOID DOING THIS!
If you are trying to test a sequence of events you are likely better off
doing this a different way (e.g. mock the functions you are interested in
and test the call arguments/returns later).
However, there are some occasions where this might be necessary, e.g.
testing a monolithic synchronous function.
Args:
log: The caplog fixture.
items: Iterable of string messages to compare. All are tested
by "contains" i.e. "item in string".
"""
def _log_scan(log, items):
records = iter(log.records)
record = next(records)
for item in items:
while item not in record.message:
try:
record = next(records)
except StopIteration:
raise Exception(f'Reached end of log looking for: {item}')

return _log_scan


@pytest.fixture(scope='session')
def port_range():
return glbl_cfg().get(['scheduler', 'run hosts', 'ports'])
Expand Down
12 changes: 9 additions & 3 deletions tests/functional/logging/01-basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
R1 = reloader1 => stopper => reloader2
[runtime]
[[reloader1, reloader2]]
script = cylc reload "${CYLC_WORKFLOW_ID}"
script = """
cylc reload "${CYLC_WORKFLOW_ID}"
# wait for the command to complete
cylc__job__poll_grep_workflow_log 'Reload completed'
"""
[[stopper]]
script = cylc stop --now --now "${CYLC_WORKFLOW_ID}"
__FLOW_CONFIG__

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"

workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" --main-loop='log db'

# Check scheduler logs.
ls "${WORKFLOW_RUN_DIR}/log/scheduler/" > schd_1.out
Expand All @@ -55,7 +59,9 @@ cmp_ok conf_1.out << __EOF__
flow-processed.cylc
__EOF__

workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}"
mv "$WORKFLOW_RUN_DIR/cylc.flow.main_loop.log_db.sql" "$WORKFLOW_RUN_DIR/01.cylc.flow.main_loop.log_db.sql"

workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}" --main-loop='log db'

ls "${WORKFLOW_RUN_DIR}/log/scheduler/" > schd_2.out
cmp_ok schd_2.out << __EOF__
Expand Down
4 changes: 3 additions & 1 deletion tests/functional/reload/09-garbage.t
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ TEST_NAME="${TEST_NAME_BASE}-run"
workflow_run_ok "${TEST_NAME}" \
cylc play --reference-test --debug --no-detach "${WORKFLOW_NAME}"
#-------------------------------------------------------------------------------
grep_ok 'Command failed: reload_workflow' "${WORKFLOW_RUN_DIR}/log/scheduler/log"
grep_ok \
'Reload failed - IllegalItemError: \[scheduling\]garbage' \
"${WORKFLOW_RUN_DIR}/log/scheduler/log"
#-------------------------------------------------------------------------------
purge
Loading

0 comments on commit b368e7c

Please sign in to comment.