Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow triggers wait #22

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cylc/flow/scripts/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def get_option_parser():
parser.add_option(
"--meta", metavar="DESCRIPTION", action="store",
dest="flow_descr", default=None,
help="description of triggered flow (with --flow=new)."
help=f"description of triggered flow (with --flow={FLOW_NEW})."
)

parser.add_option(
Expand Down
38 changes: 23 additions & 15 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,7 +1403,7 @@ def spawn_task(
self.workflow_db_mgr.pri_dao.select_task_outputs(
itask.tdef.name, str(itask.point))
).items():
if set(flow_nums).intersection(fnums):
if flow_nums.intersection(fnums):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flow_nums is already a set.

for msg in json.loads(outputs_str):
itask.state.outputs.set_completed_by_msg(msg)
break
Expand Down Expand Up @@ -1476,6 +1476,28 @@ def force_trigger_tasks(
Queue the task if not queued, otherwise release it to run.

"""
if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}):
if len(flow) != 1:
LOG.warning(
f'The "flow" values {FLOW_ALL}, {FLOW_NEW} & {FLOW_NONE}'
' cannot be used in combination with integer flow numbers.'
)
return 0
if flow[0] == FLOW_ALL:
flow_nums = self._get_active_flow_nums()
elif flow[0] == FLOW_NEW:
flow_nums = {self.flow_mgr.get_new_flow(flow_descr)}
elif flow[0] == FLOW_NONE:
flow_nums = set()
else:
try:
flow_nums = {int(n) for n in flow}
except ValueError:
LOG.warning(
f"Trigger ignored, illegal flow values {flow}"
)
return 0

# n_warnings, task_items = self.match_taskdefs(items)
itasks, future_tasks, unmatched = self.filter_task_proxies(
items,
Expand All @@ -1495,20 +1517,6 @@ def force_trigger_tasks(
)
# Flow values already validated by the trigger client.
if itask is None:
if flow[0] == FLOW_ALL:
flow_nums = self._get_active_flow_nums()
elif flow[0] == FLOW_NEW:
flow_nums = {self.flow_mgr.get_new_flow(flow_descr)}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this was within the for name, point in future_tasks loop this was creating a new flow for each triggered task rather than for each trigger event.

elif flow[0] == FLOW_NONE:
flow_nums = set()
else:
try:
flow_nums = {int(n) for n in flow}
except ValueError:
LOG.warning(
f"Trigger ignored, illegal flow values {flow}"
)
return 0
itask = self.spawn_task(
name,
point,
Expand Down
21 changes: 21 additions & 0 deletions tests/functional/flow-triggers/12-all-future-multi.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
. "$(dirname "$0")/test_header"
set_test_number 2
reftest
exit
53 changes: 53 additions & 0 deletions tests/functional/flow-triggers/12-all-future-multi/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# flow:1
# 1/a(running)
# flow:2
# 3/a(running)
# flow:1,2
# 5/a(running)
#
# Result:
# The task 5/a is triggered in both flows so joins the two.
#
# flow:1
# 1/a
# 2/a
# 3/a
# 4/a
# flow:2
# 3/a
# 4/a
# flow:1,2
# 5/a
# 6/a
# 7/a

[scheduler]
allow implicit tasks = True
[[events]]
abort on stall timeout = True
stall timeout = PT0S
abort on inactivity timeout = True
inactivity timeout = PT1M

[scheduling]
cycling mode = integer
initial cycle point = 1
final cycle point = 7
[[graph]]
P1 = a[-P1] => a

[runtime]
[[a]]
script = """
if ((
CYLC_TASK_CYCLE_POINT == 1
&& CYLC_TASK_SUBMIT_NUMBER == 1
)); then
# trigger 3/a in a new flow
cylc trigger --flow=new ${CYLC_WORKFLOW_ID}//3/a
cylc__job__poll_grep_workflow_log -E '3/a.*started'
# trigger 5/a in all flows
cylc trigger ${CYLC_WORKFLOW_ID}//5/a
cylc__job__poll_grep_workflow_log -E '5/a.*started'
fi
"""
14 changes: 14 additions & 0 deletions tests/functional/flow-triggers/12-all-future-multi/reference.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Initial point: 1
Final point: 1
# the first flow
1/a -triggered off ['0/a'] in flow 1
2/a -triggered off ['1/a'] in flow 1
3/a -triggered off ['2/a'] in flow 1
4/a -triggered off ['3/a'] in flow 1
# the second flow
3/a -triggered off [] in flow 2
4/a -triggered off ['3/a'] in flow 2
# the joined flow
5/a -triggered off [] in flow 1,2
6/a -triggered off ['5/a'] in flow 1,2
7/a -triggered off ['6/a'] in flow 1,2
40 changes: 40 additions & 0 deletions tests/integration/test_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging

from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE

import pytest


@pytest.mark.parametrize(
'flow_strs',
(
[FLOW_ALL, '1'],
['1', FLOW_ALL],
[FLOW_NEW, '1'],
[FLOW_NONE, '1'],
['a'],
['1', 'a'],
)
)
async def test_trigger_invalid(mod_one, start, log_filter, flow_strs):
"""Ensure invalid flow values are rejected."""
async with start(mod_one) as log:
log.clear()
assert mod_one.pool.force_trigger_tasks(['*'], flow_strs) == 0
assert len(log_filter(log, level=logging.WARN)) == 1