Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reload: wait for pending tasks to submit and pause the workflow #5592

Merged
merged 5 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ issue which could cause jobs to fail if this variable became too long.

### Enhancements

[#5992](https://github.com/cylc/cylc-flow/pull/5992) -
The scheduler will now wait for preparing tasks to submit before attempting
to perform a reload and will also pause/unpause the workflow.

-[#5605](https://github.com/cylc/cylc-flow/pull/5605) - A shorthand for defining
-a list of strings - Before: `cylc command -s "X=['a', 'bc', 'd']"` - After:
-`cylc command -z X=a,bc,d`.
Expand Down
260 changes: 194 additions & 66 deletions cylc/flow/scheduler.py

Large diffs are not rendered by default.

49 changes: 21 additions & 28 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def __init__(
self.data_store_mgr: 'DataStoreMgr' = data_store_mgr
self.flow_mgr: 'FlowMgr' = flow_mgr

self.do_reload = False
self.max_future_offset: Optional['IntervalBase'] = None
self._prev_runahead_base_point: Optional['PointBase'] = None
self._prev_runahead_sequence_points: Optional[Set['PointBase']] = None
Expand All @@ -132,7 +131,6 @@ def __init__(
self.abort_task_failed = False
self.expected_failed_tasks = self.config.get_expected_failed_tasks()

self.orphans: List[str] = []
self.task_name_list = self.config.get_task_name_list()
self.task_queue_mgr = IndepQueueManager(
self.config.cfg['scheduling']['queues'],
Expand Down Expand Up @@ -918,25 +916,7 @@ def set_max_future_offset(self):
if max_offset != orig and self.compute_runahead(force=True):
self.release_runahead_tasks()

def set_do_reload(self, config: 'WorkflowConfig') -> None:
"""Set the task pool to reload mode."""
self.config = config
self.stop_point = config.stop_point or config.final_point
self.do_reload = True

# find any old tasks that have been removed from the workflow
old_task_name_list = self.task_name_list
self.task_name_list = self.config.get_task_name_list()
for name in old_task_name_list:
if name not in self.task_name_list:
self.orphans.append(name)
for name in self.task_name_list:
if name in self.orphans:
self.orphans.remove(name)
# adjust the new workflow config to handle the orphans
self.config.adopt_orphans(self.orphans)

def reload_taskdefs(self) -> None:
def reload_taskdefs(self, config: 'WorkflowConfig') -> None:
"""Reload the definitions of task proxies in the pool.

Orphaned tasks (whose definitions were removed from the workflow):
Expand All @@ -946,18 +926,33 @@ def reload_taskdefs(self) -> None:
Otherwise: replace task definitions but copy over existing outputs etc.

"""
self.config = config
self.stop_point = config.stop_point or config.final_point

# find any old tasks that have been removed from the workflow
old_task_name_list = self.task_name_list
self.task_name_list = self.config.get_task_name_list()
orphans = [
task
for task in old_task_name_list
if task not in self.task_name_list
]

# adjust the new workflow config to handle the orphans
self.config.adopt_orphans(orphans)

LOG.info("Reloading task definitions.")
tasks = self.get_all_tasks()
# Log tasks orphaned by a reload but not currently in the task pool.
for name in self.orphans:
for name in orphans:
if name not in (itask.tdef.name for itask in tasks):
LOG.warning("Removed task: '%s'", name)
for itask in tasks:
if itask.tdef.name in self.orphans:
if itask.tdef.name in orphans:
if (
itask.state(TASK_STATUS_WAITING)
or itask.state.is_held
or itask.state.is_queued
itask.state(TASK_STATUS_WAITING)
or itask.state.is_held
or itask.state.is_queued
):
# Remove orphaned task if it hasn't started running yet.
self.remove(itask, 'task definition removed')
Expand Down Expand Up @@ -1012,8 +1007,6 @@ def reload_taskdefs(self) -> None:
if all(ready_check_items) and not itask.state.is_runahead:
self.queue_task(itask)

self.do_reload = False

def set_stop_point(self, stop_point: 'PointBase') -> bool:
"""Set the workflow stop cycle point.

Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/workflow_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ def get_workflow_status(schd: 'Scheduler') -> Tuple[str, str]:
if schd.stop_mode is not None:
status = WorkflowStatus.STOPPING
status_msg = f'stopping: {schd.stop_mode.explain()}'
elif schd.reload_pending:
status = WorkflowStatus.PAUSED
status_msg = f'reloading: {schd.reload_pending}'
elif schd.is_stalled:
status_msg = 'stalled'
elif schd.is_paused:
Expand Down
32 changes: 32 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,38 @@ def _log_filter(
return _log_filter


@pytest.fixture
def log_scan():
"""Ensure log messages appear in the correct order.

TRY TO AVOID DOING THIS!

If you are trying to test a sequence of events you are likely better off
doing this a different way (e.g. mock the functions you are interested in
and test the call arguments/returns later).

However, there are some occasions where this might be necessary, e.g.
testing a monolithic synchronous function.

Args:
log: The caplog fixture.
items: Iterable of string messages to compare. All are tested
by "contains" i.e. "item in string".

"""
def _log_scan(log, items):
records = iter(log.records)
record = next(records)
for item in items:
while item not in record.message:
try:
record = next(records)
except StopIteration:
raise Exception(f'Reached end of log looking for: {item}')

return _log_scan


@pytest.fixture(scope='session')
def port_range():
return glbl_cfg().get(['scheduler', 'run hosts', 'ports'])
Expand Down
8 changes: 7 additions & 1 deletion tests/functional/logging/01-basic.t
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
R1 = reloader1 => stopper => reloader2
[runtime]
[[reloader1, reloader2]]
script = cylc reload "${CYLC_WORKFLOW_ID}"
script = """
cylc reload "${CYLC_WORKFLOW_ID}"
# wait for the command to complete
cylc__job__poll_grep_workflow_log 'Reload completed'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Orthogonal change: This test was timing dependent because it didn't wait for reload to complete.

"""
[[stopper]]
script = cylc stop --now --now "${CYLC_WORKFLOW_ID}"
__FLOW_CONFIG__
Expand All @@ -55,6 +59,8 @@ cmp_ok conf_1.out << __EOF__
flow-processed.cylc
__EOF__

mv "$WORKFLOW_RUN_DIR/cylc.flow.main_loop.log_db.sql" "$WORKFLOW_RUN_DIR/01.cylc.flow.main_loop.log_db.sql"

workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "${WORKFLOW_NAME}"

ls "${WORKFLOW_RUN_DIR}/log/scheduler/" > schd_2.out
Expand Down
4 changes: 3 additions & 1 deletion tests/functional/reload/09-garbage.t
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ TEST_NAME="${TEST_NAME_BASE}-run"
workflow_run_ok "${TEST_NAME}" \
cylc play --reference-test --debug --no-detach "${WORKFLOW_NAME}"
#-------------------------------------------------------------------------------
grep_ok 'Command failed: reload_workflow' "${WORKFLOW_RUN_DIR}/log/scheduler/log"
grep_ok \
'Reload failed - IllegalItemError: \[scheduling\]garbage' \
"${WORKFLOW_RUN_DIR}/log/scheduler/log"
#-------------------------------------------------------------------------------
purge
148 changes: 148 additions & 0 deletions tests/integration/test_reload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Tests for reload behaviour in the scheduler."""

from contextlib import suppress

from cylc.flow.task_state import (
TASK_STATUS_WAITING,
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
)


async def test_reload_waits_for_pending_tasks(
flow,
scheduler,
start,
monkeypatch,
capture_submission,
log_scan,
):
"""Reload should flush out preparing tasks and pause the workflow.

Reloading a workflow with preparing tasks may be unsafe and is at least
confusing. For safety we should pause the workflow and flush out any
preparing tasks before attempting reload.

See https://github.com/cylc/cylc-flow/issues/5107
"""
# a simple workflow with a single task
id_ = flow({
'scheduling': {
'graph': {
'R1': 'foo',
},
},
'runtime': {
'foo': {},
},
})
schd = scheduler(id_, paused_start=False)

# we will artificially push the task through these states
state_seq = [
# repeat the preparing state a few times to simulate a task
# taking multiple main-loop cycles to submit
TASK_STATUS_PREPARING,
TASK_STATUS_PREPARING,
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
]

# start the scheduler
async with start(schd) as log:
# disable submission events to prevent anything from actually running
capture_submission(schd)

# set the task to go through some state changes
def change_state(_=0):
with suppress(IndexError):
foo.state_reset(state_seq.pop(0))
monkeypatch.setattr(
'cylc.flow.scheduler.sleep',
change_state
)

# the task should start as waiting
tasks = schd.pool.get_tasks()
assert len(tasks) == 1
foo = tasks[0]
assert tasks[0].state(TASK_STATUS_WAITING)

# put the task into the preparing state
change_state()

# reload the workflow
await schd.command_reload_workflow()

# the task should end in the submitted state
assert foo.state(TASK_STATUS_SUBMITTED)

# ensure the order of events was correct
log_scan(
log,
[
# the task should have entered the preparing state before the
# reload was requested
'[1/foo waiting(queued) job:00 flows:1] => preparing(queued)',
# the reload should have put the workflow into the paused state
'PAUSING the workflow now: Reloading workflow',
# reload should have waited for the task to submit
'[1/foo preparing(queued) job:00 flows:1]'
' => submitted(queued)',
# before then reloading the workflow config
'Reloading the workflow definition.',
# post-reload the workflow should have been resumed
'RESUMING the workflow now',
],
)


async def test_reload_failure(
flow,
one_conf,
scheduler,
start,
log_filter,
):
"""Reload should not crash the workflow on config errors.

A warning should be logged along with the error.
"""
id_ = flow(one_conf)
schd = scheduler(id_)
async with start(schd) as log:
# corrupt the config by removing the scheduling section
two_conf = {**one_conf, 'scheduling': {}}
flow(two_conf, id_=id_)

# reload the workflow
await schd.command_reload_workflow()

# the reload should have failed but the workflow should still be
# running
assert log_filter(
log,
contains=(
'Reload failed - WorkflowConfigError:'
' missing [scheduling][[graph]] section'
)
)

# the config should be unchanged
assert schd.config.cfg['scheduling']['graph']['R1'] == 'one'
2 changes: 1 addition & 1 deletion tests/integration/test_resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ async def test_stop(
schd=one
)
resolvers.stop(StopMode.REQUEST_CLEAN)
one.process_command_queue()
await one.process_command_queue()
assert log_filter(
log, level=logging.INFO, contains="Command actioned: stop"
)
Expand Down
Loading