Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow cylc hold to hold future tasks that haven't spawned yet #4238

Merged
merged 13 commits into from
Jun 22, 2021
22 changes: 13 additions & 9 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ Third beta release of Cylc 8.

### Enhancements

[#4103](https://github.com/cylc/cylc-flow/pull/4103)
- Expose runahead limiting to UIs; restore correct force-triggering of queued
- tasks for Cylc 8.
[#4103](https://github.com/cylc/cylc-flow/pull/4103) -
Expose runahead limiting to UIs; restore correct force-triggering of queued
tasks for Cylc 8.

[#4218](https://github.com/cylc/cylc-flow/pull/4218)
- Start a new run from specified tasks.
[#4218](https://github.com/cylc/cylc-flow/pull/4218) - Add ability to
start a new run from specified tasks instead of a cycle point.

[#4214](https://github.com/cylc/cylc-flow/pull/4214)
- Unify `-v --verbose`, `-q --quiet` and `--debug` options.
[#4214](https://github.com/cylc/cylc-flow/pull/4214) -
Unify `-v --verbose`, `-q --quiet` and `--debug` options.

[#4174](https://github.com/cylc/cylc-flow/pull/4174) - Terminology: replace
"suite" with "workflow".
Expand All @@ -77,8 +77,9 @@ functionality is now provided by `[symlink dirs]`.
[#4142](https://github.com/cylc/cylc-flow/pull/4142) - Record source directory
version control information on installation of a workflow.

[#4222](https://github.com/cylc/cylc-flow/pull/4222) - Fix bug where a
workflow's public database file was not closed properly.
[#4238](https://github.com/cylc/cylc-flow/pull/4238) - Future tasks can now
be held in advance using `cylc hold` (previously it was only active tasks
that could be held).

### Fixes

Expand All @@ -103,6 +104,9 @@ a workflow that uses the deprecated `suite.rc` filename would symlink `flow.cylc
to the `suite.rc` in the source dir instead of the run dir. Also fixes a couple
of other, small bugs.

[#4222](https://github.com/cylc/cylc-flow/pull/4222) - Fix bug where a
workflow's public database file was not closed properly.

-------------------------------------------------------------------------------
## __cylc-8.0b1 (<span actions:bind='release-date'>Released 2021-04-21</span>)__

Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2103,14 +2103,14 @@ def _proc_triggers(self, triggers, original, seq, task_triggers):
self.generate_triggers(
expr, lefts, right, seq, suicide, task_triggers)

def find_taskdefs(self, name):
def find_taskdefs(self, name: str) -> List[TaskDef]:
"""Find TaskDef objects in family "name" or matching "name".

Return a list of TaskDef objects which:
* have names that glob matches "name".
* are in a family that glob matches "name".
"""
ret = []
ret: List[TaskDef] = []
if name in self.taskdefs:
# Match a task name
ret.append(self.taskdefs[name])
Expand Down
30 changes: 24 additions & 6 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from copy import deepcopy
import json
from time import time
from typing import Union, Tuple, TYPE_CHECKING
import zlib

from cylc.flow import __version__ as CYLC_VERSION, LOG, ID_DELIM
Expand Down Expand Up @@ -92,6 +93,9 @@
get_time_string_from_unix_time as time2str
)

if TYPE_CHECKING:
from cylc.flow.cyclers import PointBase


EDGES = 'edges'
FAMILIES = 'families'
Expand Down Expand Up @@ -797,6 +801,10 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):
id=tp_id,
task=t_id,
cycle_point=point_string,
is_held=(
(itask.tdef.name, itask.point)
in self.schd.pool.tasks_to_hold
),
depth=task_def.depth,
name=task_def.name,
state=TASK_STATUS_WAITING,
Expand Down Expand Up @@ -1448,22 +1456,32 @@ def delta_task_state(self, itask):
PbTask(id=t_id)).MergeFrom(t_delta)
self.updates_pending = True

def delta_task_held(self, itask):
def delta_task_held(
self,
itask: Union[TaskProxy, Tuple[str, 'PointBase', bool]]
):
"""Create delta for change in task proxy held state.

Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.
itask:
The TaskProxy to hold/release OR a tuple of the form
(name, cycle, is_held).

"""
tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point)
if isinstance(itask, TaskProxy):
name = itask.tdef.name
cycle = itask.point
is_held = itask.state.is_held
else:
name, cycle, is_held = itask

tp_id, tproxy = self.store_node_fetcher(name, cycle)
if not tproxy:
return
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{time()}'
tp_delta.is_held = itask.state.is_held
tp_delta.is_held = is_held
self.state_update_families.add(tproxy.first_parent)
self.updates_pending = True

Expand Down
22 changes: 18 additions & 4 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Provide data access object for the workflow runtime database."""

from os.path import expandvars
import sqlite3
import traceback
from os.path import expandvars
from typing import List, Tuple

from cylc.flow import LOG
import cylc.flow.flags
Expand Down Expand Up @@ -182,6 +183,7 @@ class CylcWorkflowDAO:
TABLE_TASK_PREREQUISITES = "task_prerequisites"
TABLE_TASK_STATES = "task_states"
TABLE_TASK_TIMEOUT_TIMERS = "task_timeout_timers"
TABLE_TASKS_TO_HOLD = "tasks_to_hold"
TABLE_XTRIGGERS = "xtriggers"
TABLE_ABS_OUTPUTS = "absolute_outputs"

Expand Down Expand Up @@ -295,6 +297,10 @@ class CylcWorkflowDAO:
["name"],
["output"],
],
TABLE_TASKS_TO_HOLD: [
["name"],
["cycle"],
],
}

def __init__(self, db_file_name, is_public=False):
Expand Down Expand Up @@ -731,7 +737,9 @@ def select_task_pool_for_restart(self, callback):
for row_idx, row in enumerate(self.connect().execute(stmt)):
callback(row_idx, list(row))

def select_task_prerequisites(self, cycle, name):
def select_task_prerequisites(
self, cycle: str, name: str
) -> List[Tuple[str, str, str, str]]:
"""Return prerequisites of a task of the given name & cycle point."""
stmt = f"""
SELECT
Expand All @@ -742,9 +750,15 @@ def select_task_prerequisites(self, cycle, name):
FROM
{self.TABLE_TASK_PREREQUISITES}
WHERE
cycle == '{cycle}' AND
name == '{name}'
cycle == ? AND
name == ?
"""
stmt_args = [cycle, name]
return list(self.connect().execute(stmt, stmt_args))

def select_tasks_to_hold(self) -> List[Tuple[str, str]]:
"""Return all tasks to hold stored in the DB."""
stmt = f"SELECT name, cycle FROM {self.TABLE_TASKS_TO_HOLD}"
return list(self.connect().execute(stmt))

def select_task_times(self):
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ def _load_pool_from_db(self):
self.xtrigger_mgr.load_xtrigger_for_restart)
self.workflow_db_mgr.pri_dao.select_abs_outputs_for_restart(
self.pool.load_abs_outputs_for_restart)
self.pool.load_db_tasks_to_hold()

def restart_remote_init(self):
"""Remote init for all submitted/running tasks in the pool."""
Expand Down Expand Up @@ -1449,7 +1450,7 @@ async def main_loop(self):
itask, self.workflow_db_mgr.put_xtriggers):
housekeep_xtriggers = True
if all(itask.is_ready_to_run()):
self.pool.queue_tasks([itask])
self.pool.queue_task(itask)

# Check for satisfied ext_triggers, and queue if ready.
if (
Expand All @@ -1459,7 +1460,7 @@ async def main_loop(self):
itask, self.ext_trigger_queue)
and all(itask.is_ready_to_run())
):
self.pool.queue_tasks([itask])
self.pool.queue_task(itask)

if housekeep_xtriggers:
# (Could do this periodically?)
Expand Down
12 changes: 11 additions & 1 deletion cylc/flow/scripts/hold.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
Held tasks do not submit their jobs even if ready to run.

Examples:
# Hold mytask at cycle point 1234 in my_flow
# Hold mytask at cycle point 1234 in my_flow (if it has not yet spawned, it
# will hold as soon as it spawns)
$ cylc hold my_flow mytask.1234

# Hold all active tasks at cycle 1234 in my_flow (note: tasks before/after
Expand All @@ -33,13 +34,21 @@
# Hold all active instances of mytask in my_flow (note: this will not hold
# any unspawned tasks that might spawn in the future)
$ cylc hold my_flow 'mytask.*'
# or
$ cylc hold my_flow mytask

# Hold all active failed tasks
$ cylc hold my_flow '*:failed'

# Hold all tasks after cycle point 1234 in my_flow
$ cylc hold my_flow --after=1234

Note: To pause a workflow (immediately preventing all job submission), use
'cylc pause' instead.

Note: globs and ":<state>" selectors will only match active tasks;
to hold future tasks when they spawn, use exact identifiers e.g. "mytask.1234".

See also 'cylc release'.
"""

Expand Down Expand Up @@ -89,6 +98,7 @@ def get_option_parser() -> COP:
__doc__, comms=True, multitask=True,
argdoc=[
('REG', "Workflow name"),
# TODO: switch back to TASK_ID?
('[TASK_GLOB ...]', "Task matching patterns")]
)

Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/scripts/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@

Held tasks do not submit their jobs even if ready to run.

Note: globs and ":<state>" selectors will only match active tasks;
to release future tasks, use exact identifiers e.g. "mytask.1234".

See also 'cylc hold'.
"""

Expand Down
Loading