Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't release from runahead if paused. #4617

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ access to information on configured platforms.

### Fixes

[#4617](https://github.com/cylc/cylc-flow/pull/4617) - Fix task hold on paused
start-up.

[#4566](https://github.com/cylc/cylc-flow/pull/4566) - Fix `cylc scan`
invocation for remote scheduler host on a shared filesystem.

Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ async def main_loop(self):
tinit = time()

# Useful for debugging core scheduler issues:
# self.pool.log_task_pool(logging.CRITICAL)
self.pool.log_task_pool(logging.CRITICAL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you'll want to re-comment-out this?

Suggested change
self.pool.log_task_pool(logging.CRITICAL)
# self.pool.log_task_pool(logging.CRITICAL)


if self.pool.do_reload:
# Re-initialise data model on reload
Expand All @@ -1470,7 +1470,7 @@ async def main_loop(self):

self.process_command_queue()

if self.pool.release_runahead_tasks():
if not self.is_paused and self.pool.release_runahead_tasks():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this would appear to be the reverse of a previous commit:

22e7a20#diff-90711b005fe27b9f894e6cd09f08816d847dcf853518894c6b7e2139784cbcb8R1444

Copy link
Member

@oliver-sanders oliver-sanders Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think runahead tasks should still be released in "paused" mode, but that the "paused" state should stop them from running.

I.E. as implemented in #4436

Copy link
Member Author

@hjoliver hjoliver Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spotting! Admittedly this attempted fix was done in a desperate rush.

self.is_updated = True
self.reset_inactivity_timer()

Expand Down
95 changes: 94 additions & 1 deletion tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def assert_expected_log(


@pytest.fixture(scope='module')
async def mod_example_flow(
async def mod_example_flow_paused_start(
mod_flow: Callable, mod_scheduler: Callable, mod_run: Callable
) -> Scheduler:
"""Return a scheduler for interrogating its task pool.
Expand All @@ -94,6 +94,23 @@ async def mod_example_flow(
return schd


@pytest.fixture(scope='module')
async def mod_example_flow(
mod_flow: Callable, mod_scheduler: Callable, mod_run: Callable
) -> Scheduler:
"""Return a scheduler for interrogating its task pool.

This is module-scoped so faster than example_flow, but should only be used
where the test does not mutate the state of the scheduler or task pool.
"""
reg = mod_flow(EXAMPLE_FLOW_CFG)
schd: Scheduler = mod_scheduler(reg, paused_start=False)
async with mod_run(schd):
pass
return schd



@pytest.fixture
async def example_flow(
flow: Callable, scheduler: Callable, caplog: pytest.LogCaptureFixture
Expand All @@ -114,6 +131,82 @@ async def example_flow(
return schd


@pytest.mark.parametrize(
'items, expected_task_ids, expected_bad_items, expected_warnings',
[
param(
['*/foo'], ['1/foo'], [], [],
id="Basic"
),
param(
['1/*'],
['1/foo', '1/bar'], [], [],
id="Name glob"
),
param(
['1/FAM'], ['1/bar'], [], [],
id="Family name"
),
param(
['*/foo'], ['1/foo'], [], [],
id="Point glob"
),
param(
['*:waiting'],
['1/foo', '1/bar'], [], [],
id="Task state"
),
param(
['8/foo'], [], ['8/foo'], ["No active tasks matching: 8/foo"],
id="Task not yet spawned"
),
param(
['1/foo', '8/bar'], ['1/foo'], ['8/bar'],
["No active tasks matching: 8/bar"],
id="Multiple items"
),
param(
['1/grogu', '*/grogu'], [], ['1/grogu', '*/grogu'],
["No active tasks matching: 1/grogu",
"No active tasks matching: */grogu"],
id="No such task"
),
param(
['*'],
['1/foo', '1/bar'], [], [],
id="No items given - get all tasks"
)
]
)
async def test_filter_task_proxies_paused_start(
items: List[str],
expected_task_ids: List[str],
expected_bad_items: List[str],
expected_warnings: List[str],
mod_example_flow_paused_start: Scheduler,
caplog: pytest.LogCaptureFixture
) -> None:
"""Test TaskPool.filter_task_proxies().

The NOTE before EXAMPLE_FLOW_CFG above explains which tasks should be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the note need updating?

# NOTE: foo & bar have no parents so at start-up (even with the workflow
# paused) they are spawned out to the runahead limit.

+# NOTE: foo & bar have no parents so at start-up they are
+spawned out to the runahead limit (only when not paused at start-up).

expected for the tests here.

Params:
items: Arg passed to filter_task_proxies().
expected_task_ids: IDs of the TaskProxys that are expected to be
returned, of the form "{point}/{name}"/
expected_bad_items: Expected to be returned.
expected_warnings: Expected to be logged.
"""
caplog.set_level(logging.WARNING, CYLC_LOG)
task_pool = mod_example_flow_paused_start.pool
itasks, _, bad_items = task_pool.filter_task_proxies(items)
task_ids = [itask.identity for itask in itasks]
assert sorted(task_ids) == sorted(expected_task_ids)
assert sorted(bad_items) == sorted(expected_bad_items)
assert_expected_log(caplog, expected_warnings)


@pytest.mark.parametrize(
'items, expected_task_ids, expected_bad_items, expected_warnings',
[
Expand Down