Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow trigger CLI and defaults #4739

Merged
merged 31 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4a0399a
Implement trigger command --flow arg.
hjoliver Mar 10, 2022
3f7e946
Upgrade old reflow tests.
hjoliver Mar 11, 2022
40d52e6
Add flow-trigger func tests.
hjoliver Mar 11, 2022
0ee756d
Fix integration test.
hjoliver Mar 13, 2022
83b9b71
Add doctest unit tests.
hjoliver Mar 13, 2022
1001b91
Remove func test replace by unit test.
hjoliver Mar 13, 2022
db64747
Improve trigger command help.
hjoliver Mar 15, 2022
4e105ee
Update change log.
hjoliver Mar 15, 2022
9e4c510
Small fix.
hjoliver Mar 21, 2022
dab7b9a
Fix a func test.
hjoliver Mar 22, 2022
b9b2d4a
Implement trigger command --flow arg.
hjoliver Mar 10, 2022
626d71b
Implement trigger command --flow arg.
hjoliver Mar 10, 2022
5ddb696
Basic --wait triggers.
hjoliver Mar 13, 2022
14cbe7e
Add --wait func tests
hjoliver Mar 14, 2022
fd271f9
Spawn on completed outputs only after trigger wait.
hjoliver Mar 21, 2022
1c16f14
Fix two func tests.
hjoliver Mar 22, 2022
aecf16a
Restore a nosec comment.
hjoliver Mar 22, 2022
e4e2343
Add new flow-trigger func tests.
hjoliver Mar 22, 2022
a49ca44
Don't share flow nums ref with parent task.
hjoliver Mar 23, 2022
5e9172d
Add new flow-trigger func test.
hjoliver Mar 23, 2022
f16334c
Convert new doctests to separate unit tests.
hjoliver Mar 23, 2022
1961249
Tweak change log.
hjoliver Mar 25, 2022
37904a9
Tweak new func test.
hjoliver Apr 4, 2022
a4b3a45
Tweak trigger help wording.
hjoliver Apr 4, 2022
c8384e9
graphql: add metadata for the trigger mutation
oliver-sanders Apr 5, 2022
be68c3c
Address review comments.
hjoliver Apr 10, 2022
8006bb6
Fix docstring.
hjoliver Apr 12, 2022
a847222
Use new Flow type from schema.
hjoliver Apr 12, 2022
73430d3
tests/f: test trigger --flow=all join behaviour
oliver-sanders Apr 12, 2022
061bd34
trigger: fixes & tests
oliver-sanders Apr 12, 2022
7b05a10
Merge pull request #22 from oliver-sanders/flow-triggers-wait
hjoliver Apr 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ line when specifying a Cylc ID that includes your username (e.g. `'~user/workflo
[#4737](https://github.com/cylc/cylc-flow/pull/4737) -
Fix issue which prevented tasks with incomplete outputs from being rerun by
subsequent flows.
### Enhancements

[#4738](https://github.com/cylc/cylc-flow/pull/4738) and
[#4739](https://github.com/cylc/cylc-flow/pull/4739) - Implement `cylc trigger
[--flow=] [--wait]` for manual triggering with respect to active flows (the
default), specific flows, new flows, or one-off task runs.


-------------------------------------------------------------------------------
## __cylc-8.0rc1 (<span actions:bind='release-date'>Released 2022-02-17</span>)__
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ message PbTaskProxy {
map<string, PbTrigger> xtriggers = 24;
optional bool is_queued = 25;
optional bool is_runahead = 26;
optional bool flow_wait = 27;
}

message PbFamily {
Expand Down
85 changes: 50 additions & 35 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ def apply_task_proxy_db_history(self):
TASK_STATUS_SUCCEEDED
)
):
for message in json.loads(outputs_str).values():
for message in json.loads(outputs_str):
itask.state.outputs.set_completion(message, True)
# Gather tasks with flow id.
prereq_ids.add(f'{tokens.relative_id}/{flow_nums_str}')
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def task_state_met(self, task, cycle, status=None, message=None):
return any(
message == value
for outputs_str, in res
for value in json.loads(outputs_str).values()
for value in json.loads(outputs_str)
)

@staticmethod
Expand Down
4 changes: 4 additions & 0 deletions cylc/flow/flow_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@


FlowNums = Set[int]
# Flow constants
FLOW_ALL = "all"
FLOW_NEW = "new"
FLOW_NONE = "none"


class FlowMgr:
Expand Down
18 changes: 12 additions & 6 deletions cylc/flow/log_diagnosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@
from cylc.flow.scheduler import Scheduler


RE_TRIG = re.compile(r'(.*? -triggered off \[.*\])$')
RE_TRIG = re.compile(r'(^.*? -triggered off \[.*\].*$)')


def run_reftest(schd: 'Scheduler') -> None:
"""Run reference test at shutdown."""
reffilename = schd.config.get_ref_log_name()
curfilename = get_workflow_test_log_name(schd.workflow)
ref = _load_reflog(reffilename)
cur = _load_reflog(curfilename)
ref = _load_reflog(reffilename, False)
if not ref:
raise WorkflowEventError("No triggering events in reference log.")
cur = _load_reflog(curfilename, "in flow" not in ref[0])
if not cur:
raise WorkflowEventError("No triggering events in test log.")
if ref == cur:
Expand All @@ -53,13 +53,19 @@ def run_reftest(schd: 'Scheduler') -> None:
raise exc


def _load_reflog(filename):
"""Reference test: get trigger info from reference log."""
def _load_reflog(filename, strip_flows):
"""Reference test: get trigger info from reference log.

Back-compat for old logs: strip flow nums from each line.
"""
res = []
with open(os.path.expandvars(filename), 'r') as reflog:
for line in reflog:
match = RE_TRIG.search(line)
if match:
res.append(match.groups()[0])
if strip_flows:
res.append(re.sub(' in flow .*$', '', match.groups()[0]))
else:
res.append(match.groups()[0])
res.sort()
return res
17 changes: 10 additions & 7 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,23 +813,25 @@ def stop(

def force_trigger_tasks(
self,
tasks=None,
reflow=False,
flow_descr=None,
tasks: Iterable[str],
flow: Iterable[str],
flow_wait: bool,
flow_descr: Optional[str] = None,
):
"""Trigger submission of task jobs where possible.

Args:
tasks (list):
List of identifiers or task globs.
reflow (bool):
Start new flow from triggered tasks.
flow (list):
Flow ownership of triggered tasks.
flow_wait (bool):
Wait for flows before continuing
flow_descr (str):
Description of new flow.

Returns:
tuple: (outcome, message)

outcome (bool)
True if command successfully queued.
message (str)
Expand All @@ -841,7 +843,8 @@ def force_trigger_tasks(
"force_trigger_tasks",
(tasks or [],),
{
"reflow": reflow,
"flow": flow,
"flow_wait": flow_wait,
"flow_descr": flow_descr
}
),
Expand Down
55 changes: 52 additions & 3 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from graphene.utils.str_converters import to_snake_case

from cylc.flow.broadcast_mgr import ALL_CYCLE_POINTS_STRS, addict
from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE
from cylc.flow.id import Tokens
from cylc.flow.task_outputs import SORT_ORDERS
from cylc.flow.task_state import (
Expand Down Expand Up @@ -865,6 +866,7 @@ class Meta:
is_queued = Boolean()
is_runahead = Boolean()
flow_nums = String()
flow_wait = Boolean()
depth = Int()
job_submits = Int()
outputs = graphene.List(
Expand Down Expand Up @@ -1404,6 +1406,11 @@ def description(self):
return StopMode(self.value).describe()


class Flow(String):
"""An integer or one of {FLOW_ALL}, {FLOW_NEW} or {FLOW_NONE}."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to leave those variables unexpanded?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't expand them, or the docstring ceases to be a docstring (see my comment on that).

I think it's fine to leave like that. Docstrings describe code, so it's not unreasonable for them to refer to variable (or constant) names. I should perhaps have removed the braces, but I added to comment to note that it shouldn't be turned back into an f-string.

# (Note docstrings can't be f-strings).


# Mutations:

# TODO: re-instate:
Expand Down Expand Up @@ -1742,6 +1749,27 @@ class Arguments:
result = GenericScalar()


class FlowMutationArguments:
flow = graphene.List(
graphene.NonNull(Flow),
default_value=[FLOW_ALL],
description=sstrip(f'''
The flow(s) to trigger these tasks in.

This should be a list of flow numbers OR a single-item list
containing one of the following three strings:

Alternatively this may be a single-item list containing one of
the following values:

* {FLOW_ALL} - Triggered tasks belong to all active flows
(default).
* {FLOW_NEW} - Triggered tasks are assigned to a new flow.
* {FLOW_NONE} - Triggered tasks do not belong to any flow.
''')
)


class Hold(Mutation, TaskMutation):
class Meta:
description = sstrip('''
Expand Down Expand Up @@ -1832,9 +1860,30 @@ class Meta:
''')
resolver = partial(mutator, command='force_trigger_tasks')

class Arguments(TaskMutation.Arguments):
reflow = Boolean()
flow_descr = String()
class Arguments(TaskMutation.Arguments, FlowMutationArguments):
flow_wait = Boolean(
default_value=False,
description=sstrip('''
Should the workflow "wait" or "continue on" from this task?

If `false` the scheduler will spawn and run downstream tasks
as normal (default).

If `true` the scheduler will not spawn the downstream tasks
unless it has been caught by the same flow at a later time.

For example you might set this to True to trigger a task
ahead of a flow, where you don't want the scheduled to
"continue on" from this task until the flow has caught up
with it.
''')
)
flow_descr = String(
description=sstrip('''
If starting a new flow, this field can be used to provide the
new flow with a description for later reference.
''')
)


def _mut_field(cls):
Expand Down
34 changes: 26 additions & 8 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from typing import List, Tuple

from cylc.flow import LOG
from cylc.flow.util import deserialise
import cylc.flow.flags


Expand Down Expand Up @@ -263,6 +264,7 @@ class CylcWorkflowDAO:
TABLE_TASK_OUTPUTS: [
["cycle", {"is_primary_key": True}],
["name", {"is_primary_key": True}],
["flow_nums", {"is_primary_key": True}],
["outputs"],
],
TABLE_TASK_POOL: [
Expand Down Expand Up @@ -293,6 +295,7 @@ class CylcWorkflowDAO:
["time_updated"],
["submit_num", {"datatype": "INTEGER"}],
["status"],
["flow_wait", {"datatype": "INTEGER"}],
],
TABLE_TASK_TIMEOUT_TIMERS: [
["cycle", {"is_primary_key": True}],
Expand Down Expand Up @@ -709,11 +712,7 @@ def select_submit_nums(self, name, point):

Fetch submit number and flow_nums for spawning tasks.

Return:
{
flow_nums: submit_num,
...,
}
Return: {submit_num: (flow_wait, flow_nums)}

Args:
name: task name
Expand All @@ -724,13 +723,32 @@ def select_submit_nums(self, name, point):
# Not an injection, simply putting the table name in the SQL query
# expression as a string constant local to this module.
stmt = ( # nosec
r"SELECT flow_nums,submit_num FROM %(name)s"
r"SELECT flow_nums,submit_num,flow_wait FROM %(name)s"
r" WHERE name==? AND cycle==?"
) % {"name": self.TABLE_TASK_STATES}
ret = {}
for flow_nums, submit_num in self.connect().execute(
for flow_nums_str, submit_num, flow_wait in self.connect().execute(
stmt, (name, point,)):
ret[flow_nums] = submit_num
ret[submit_num] = (flow_wait == 1, deserialise(flow_nums_str))
return ret

def select_task_outputs(self, name, point):
"""Select task outputs for each flow.

Return: {outputs_list: flow_nums_set}

"""
stmt = rf'''
SELECT
flow_nums,outputs
FROM
{self.TABLE_TASK_OUTPUTS}
WHERE
name==? AND cycle==?
''' # nosec (table name is code constant)
ret = {}
for flow_nums, outputs in self.connect().execute(stmt, (name, point,)):
ret[outputs] = deserialise(flow_nums)
return ret

def select_xtriggers_for_restart(self, callback):
Expand Down
17 changes: 11 additions & 6 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from cylc.flow.cycling.loader import get_point
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.id import Tokens
from cylc.flow.flow_mgr import FlowMgr
from cylc.flow.flow_mgr import FLOW_NONE, FlowMgr, FLOW_NEW
from cylc.flow.exceptions import (
CommandFailedError,
CyclingError,
Expand Down Expand Up @@ -681,7 +681,7 @@ def _load_pool_from_tasks(self):
# flow number set in this call:
self.pool.force_trigger_tasks(
self.options.starttask,
reflow=True,
flow=[FLOW_NEW],
flow_descr=f"original flow from {self.options.starttask}"
)

Expand Down Expand Up @@ -1300,10 +1300,14 @@ def release_queued_tasks(self):
self.client_pub_key_dir,
self.config.run_mode('simulation')
):
# (Not using f"{itask}"_here to avoid breaking func tests)
if itask.flow_nums:
flow = ','.join(str(i) for i in itask.flow_nums)
else:
flow = FLOW_NONE
meth(
f"{itask.identity} -triggered off "
f"{itask.state.get_resolved_dependencies()}"
f" in flow {flow}"
)

def process_workflow_db_queue(self):
Expand Down Expand Up @@ -1884,9 +1888,10 @@ def resume_workflow(self, quiet: bool = False) -> None:
self.workflow_db_mgr.delete_workflow_paused()
self.update_data_store()

def command_force_trigger_tasks(self, items, reflow, flow_descr):
"""Trigger tasks."""
return self.pool.force_trigger_tasks(items, reflow, flow_descr)
def command_force_trigger_tasks(self, items, flow, flow_wait, flow_descr):
"""Manual task trigger."""
return self.pool.force_trigger_tasks(
items, flow, flow_wait, flow_descr)

def command_force_spawn_children(self, items, outputs, flow_num):
"""Force spawn task successors.
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/scripts/cylc.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ def get_version(long=False):
'use standard tools to inspect the environment'
' e.g. https://pypi.org/project/pipdeptree/',
'checkpoint':
'DB checkpoints have been removed, use a reflow to '
'"rewind" a workflow.',
'DB checkpoints have been removed. You can now "rewind" a'
' workflow by triggering the flow anywhere in the graph.',
'conditions':
'cylc conditions has been replaced by cylc help license',
'documentation':
Expand Down
3 changes: 1 addition & 2 deletions cylc/flow/scripts/set_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
This allows you to manually intervene with Cylc's scheduling algorithm by
artificially satisfying outputs of tasks.

If a flow number is given, the child tasks will start (or continue) that flow,
otherwise no reflow will occur.
If a flow number is given, the child tasks will start (or continue) that flow.

Examples:
# For example, for the following dependency graph:
Expand Down
Loading