-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TaskTrigger Refactor #2303
Merged
Merged
TaskTrigger Refactor #2303
Changes from 5 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
a371810
TaskTrigger refactor.
oliver-sanders 3896539
Addressed feedback.
oliver-sanders 7697159
Reverted "spurious" change.
oliver-sanders 52462d1
Fixed database lock tests.
oliver-sanders 4dbc1dc
Removed cylc 5 message offset regex.
oliver-sanders cfa6bcf
Moved task message offset check and added test.
oliver-sanders df0c1fa
Addressed feedback.
oliver-sanders File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
import traceback | ||
|
||
from cylc.c3mro import C3 | ||
from cylc.conditional_simplifier import ConditionalSimplifier | ||
from cylc.exceptions import CylcError | ||
from cylc.graph_parser import GraphParser | ||
from cylc.param_expand import NameExpander | ||
|
@@ -44,30 +45,23 @@ | |
from cylc.print_tree import print_tree | ||
from cylc.taskdef import TaskDef, TaskDefError | ||
from cylc.task_id import TaskID | ||
from cylc.task_trigger import TaskTrigger | ||
from cylc.task_trigger import TaskTrigger, Dependency | ||
from cylc.wallclock import get_current_time_string | ||
from isodatetime.data import Calendar | ||
from isodatetime.parsers import DurationParser | ||
from parsec.OrderedDict import OrderedDictWithDefaults | ||
from parsec.util import replicate | ||
from cylc.suite_logging import OUT, ERR | ||
|
||
from cylc.task_outputs import TASK_OUTPUT_SUCCEEDED | ||
|
||
RE_SUITE_NAME_VAR = re.compile('\${?CYLC_SUITE_(REG_)?NAME}?') | ||
RE_TASK_NAME_VAR = re.compile('\${?CYLC_TASK_NAME}?') | ||
CLOCK_OFFSET_RE = re.compile(r'(' + TaskID.NAME_RE + r')(?:\(\s*(.+)\s*\))?') | ||
EXT_TRIGGER_RE = re.compile('(.*)\s*\(\s*(.+)\s*\)\s*') | ||
NUM_RUNAHEAD_SEQ_POINTS = 5 # Number of cycle points to look at per sequence. | ||
|
||
# Replace \W characters in conditional graph expressions. | ||
CONDITIONAL_REGEX_REPLACEMENTS = [ | ||
("\[", "_leftsquarebracket_"), | ||
("\]", "_rightsquarebracket_"), | ||
("-", "_minus_"), | ||
("\^", "_caret_"), | ||
(":", "_colon_"), | ||
("\+", "_plus_"), | ||
] | ||
# Message trigger offset regex. | ||
BCOMPAT_MSG_RE_C6 = re.compile(r'^(.*)\[\s*(([+-])?\s*(.*))?\s*\](.*)$') | ||
|
||
try: | ||
import cylc.graphing | ||
|
@@ -1504,55 +1498,105 @@ def generate_taskdefs(self, orig_expr, left_nodes, right, section, seq, | |
|
||
# Record custom message outputs. | ||
for item in self.cfg['runtime'][name]['outputs'].items(): | ||
if (item, base_interval) not in self.taskdefs[name].outputs: | ||
self.taskdefs[name].outputs.append((item, base_interval)) | ||
if item not in self.taskdefs[name].outputs: | ||
self.taskdefs[name].outputs.append(item) | ||
|
||
def generate_triggers(self, lexpression, left_nodes, right, seq, | ||
suicide, base_interval, task_triggers): | ||
"""Create Dependency and TaskTrigger objects. | ||
|
||
Register dependency with the relevant TaskDef object. | ||
|
||
def generate_triggers(self, lexpression, left_nodes, | ||
right, seq, suicide, base_interval): | ||
""" | ||
if not right or not left_nodes: | ||
# Lone nodes have no triggers. | ||
return | ||
|
||
ctrig = {} | ||
cname = {} | ||
# Convert expression to a (nested) list. | ||
try: | ||
expr_list = ConditionalSimplifier.listify(lexpression) | ||
except SyntaxError: | ||
raise SuiteConfigError('Error in expression "%s"' % lexpression) | ||
|
||
def recursive_replace(lst, obj, replacement): | ||
"""Replace occurances of an object in a list with a standin. | ||
|
||
Works recursively to replace instances in nested lists. | ||
|
||
""" | ||
ret = [] | ||
for item in lst: | ||
if type(item) is list: | ||
ret.append(recursive_replace(item, obj, replacement)) | ||
elif item == obj: | ||
ret.append(replacement) | ||
else: | ||
ret.append(item) | ||
return ret | ||
|
||
trigers = set([]) | ||
for left in left_nodes: | ||
# (GraphNodeError checked above) | ||
cycle_point = None | ||
lnode = graphnode(left, base_interval=base_interval) | ||
ltaskdef = self.taskdefs[lnode.name] | ||
|
||
# Determine intercycle offsets. | ||
abs_cycle_point = None | ||
cycle_point_offset = None | ||
if lnode.offset_is_from_ict: | ||
first_point = get_point_relative( | ||
lnode.offset_string, self.initial_point) | ||
last_point = seq.get_stop_point() | ||
abs_cycle_point = first_point | ||
if last_point is None: | ||
# This dependency persists for the whole suite run. | ||
ltaskdef.intercycle_offsets.add( | ||
(None, seq)) | ||
else: | ||
ltaskdef.intercycle_offsets.add( | ||
(str(-(last_point - first_point)), seq)) | ||
cycle_point = first_point | ||
elif lnode.is_absolute: | ||
abs_cycle_point = lnode.offset_string | ||
elif lnode.intercycle: | ||
if lnode.offset_is_irregular: | ||
offset_tuple = (lnode.offset_string, seq) | ||
else: | ||
offset_tuple = (lnode.offset_string, None) | ||
ltaskdef.intercycle_offsets.add(offset_tuple) | ||
cycle_point_offset = lnode.offset_string | ||
|
||
trig = TaskTrigger( | ||
lnode.name, lnode.output, lnode.offset_string, cycle_point, | ||
suicide, self.cfg['runtime'][lnode.name]['outputs'], | ||
base_interval) | ||
# Check for message trigger offsets. | ||
outputs = self.cfg['runtime'][lnode.name]['outputs'] | ||
for message in outputs.values(): | ||
if BCOMPAT_MSG_RE_C6.match(message): | ||
raise SuiteConfigError( | ||
'ERROR: Message trigger offsets are obsolete.') | ||
|
||
# Use fully qualified name for trigger expression label | ||
# (task name is not unique, e.g.: "F | F:fail => G"). | ||
label = self.get_conditional_label(left) | ||
ctrig[label] = trig | ||
cname[label] = lnode.name | ||
# Qualifier. | ||
if outputs and lnode.output in outputs: | ||
# Task message. | ||
qualifier = outputs[lnode.output] | ||
else: | ||
# Built-in qualifier. | ||
if lnode.output: | ||
qualifier = TaskTrigger.get_trigger_name(lnode.output) | ||
else: | ||
qualifier = TASK_OUTPUT_SUCCEEDED | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if outputs and lnode.output in outputs:
# Task message.
qualifier = outputs[lnode.output]
elif lnode.output:
# Built-in qualifier.
qualifier = TaskTrigger.get_trigger_name(lnode.output)
else:
qualifier = TASK_OUTPUT_SUCCEEDED A slightly better style? (This lines up all the assignment statements of |
||
|
||
expr = self.get_conditional_label(lexpression) | ||
self.taskdefs[right].add_trigger(ctrig, expr, seq) | ||
# Generate TaskTrigger if not already done. | ||
key = (lnode.name, abs_cycle_point, cycle_point_offset, qualifier) | ||
try: | ||
task_trigger = task_triggers[key] | ||
except KeyError: | ||
task_trigger = TaskTrigger(*key) | ||
task_triggers[key] = task_trigger | ||
|
||
trigers.add(task_trigger) | ||
|
||
expr_list = recursive_replace(expr_list, left, task_trigger) | ||
|
||
dependency = Dependency(expr_list, trigers, suicide) | ||
self.taskdefs[right].add_dependency(dependency, seq) | ||
|
||
def get_actual_first_point(self, start_point): | ||
# Get actual first cycle point for the suite (get all | ||
|
@@ -1574,30 +1618,6 @@ def get_actual_first_point(self, start_point): | |
self.actual_first_point = start_point | ||
return self.actual_first_point | ||
|
||
def get_conditional_label(self, expression): | ||
"""Return a label to ID the expression. | ||
|
||
Special characters such as [, or ^ are replaced with | ||
nice \w+ text for use in regular expressions and trigger | ||
task matching. We don't back-transform the label, so | ||
all it needs to is provide locally unique IDs for the | ||
bits of the trigger. | ||
|
||
For example, "foo[^] | bar" is represented in text as | ||
"foo_leftsquarebracket__caret__rightsquarebracket_ | bar". | ||
As long as no one uses that exact "foo_leftsquare...." text as | ||
a task name as part of a conditional trigger for the *same* | ||
task, we're OK. | ||
|
||
Should we use unicodedata.name to convert the character names, | ||
and support much more characters in the task names? | ||
|
||
""" | ||
label = expression | ||
for regex, replacement in CONDITIONAL_REGEX_REPLACEMENTS: | ||
label = re.sub(regex, replacement, label) | ||
return label | ||
|
||
def get_graph_raw(self, start_point_string, stop_point_string, | ||
group_nodes=None, ungroup_nodes=None, | ||
ungroup_recursive=False, group_all=False, | ||
|
@@ -1896,6 +1916,7 @@ def load_graph(self): | |
sections.append((section, sec_map['graph'])) | ||
|
||
# Parse and process each graph section. | ||
task_triggers = {} | ||
for section, graph in sections: | ||
try: | ||
seq = get_sequence(section, icp, fcp) | ||
|
@@ -1914,9 +1935,10 @@ def load_graph(self): | |
gp = GraphParser(family_map, self.parameters) | ||
gp.parse_graph(graph) | ||
self.suite_polling_tasks.update(gp.suite_state_polling_tasks) | ||
self._proc_triggers(gp.triggers, gp.original, section, seq) | ||
self._proc_triggers(gp.triggers, gp.original, section, seq, | ||
task_triggers) | ||
|
||
def _proc_triggers(self, triggers, original, section, seq): | ||
def _proc_triggers(self, triggers, original, section, seq, task_triggers): | ||
"""Define graph edges, taskdefs, and triggers, from graph sections.""" | ||
base_interval = seq.get_interval() | ||
for right, val in triggers.items(): | ||
|
@@ -1929,7 +1951,8 @@ def _proc_triggers(self, triggers, original, section, seq): | |
self.generate_taskdefs( | ||
orig_expr, lefts, right, section, seq, base_interval) | ||
self.generate_triggers( | ||
expr, lefts, right, seq, suicide, base_interval) | ||
expr, lefts, right, seq, suicide, base_interval, | ||
task_triggers) | ||
|
||
def find_taskdefs(self, name): | ||
"""Find TaskDef objects in family "name" or matching "name". | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The combination backslash escape + pipes (or-logic) + bracket (capture) are making the regular expression very difficult to read. Perhaps better to capture a set in square bracket like this
r'([&|()])'
?