Skip to content

Commit

Permalink
Move get_requisites out of task_pool; format strings.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Aug 14, 2019
1 parent e4f5f76 commit f5a1f10
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 67 deletions.
7 changes: 3 additions & 4 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1690,14 +1690,13 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
xtrig = SubFuncContext(
'wall_clock', 'wall_clock', [], {})
else:
raise SuiteConfigError(
"undefined xtrigger label: %s" % label)
raise SuiteConfigError(f"xtrigger not defined: {label}")
if (xtrig.func_name == 'wall_clock' and
self.cfg['scheduling']['cycling mode'] == (
INTEGER_CYCLING_TYPE)):
sig = xtrig.get_signature()
raise SuiteConfigError(
"clock triggers are not compatible with integer "
"cycling.\n %s = %s" % (label, xtrig.get_signature()))
f"clock xtriggers need date-time cycling: {label} = {sig}")
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
self.taskdefs[right].add_xtrig_label(label, seq)

Expand Down
42 changes: 40 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,8 +701,46 @@ def info_get_graph_raw(self, cto, ctn, group_nodes=None,
self.config.feet)

def info_get_task_requisites(self, items, list_prereqs=False):
"""Return prerequisites of a task."""
return self.pool.get_task_requisites(items, list_prereqs=list_prereqs)
"""Return prerequisites and outputs etc. of a task."""
itasks, bad_items = self.pool.filter_task_proxies(items)
results = {}
now = time()
for itask in itasks:
if list_prereqs:
results[itask.identity] = {
'prerequisites': itask.state.prerequisites_dump(
list_prereqs=True)}
continue
extras = {}
if itask.tdef.clocktrigger_offset is not None:
extras['Clock trigger time reached'] = (
not itask.is_waiting_clock(now))
extras['Triggers at'] = get_time_string_from_unix_time(
itask.clock_trigger_time)
for trig, satisfied in itask.state.external_triggers.items():
key = f'External trigger "{trig}"'
if satisfied:
extras[key] = 'satisfied'
else:
extras[key] = 'NOT satisfied'
for label, satisfied in itask.state.xtriggers.items():
sig = self.xtrigger_mgr.get_xtrig_ctx(
itask, label).get_signature()
extra = f'xtrigger "{label} = {sig}"'
if satisfied:
extras[extra] = 'satisfied'
else:
extras[extra] = 'NOT satisfied'
outputs = []
for _, msg, is_completed in itask.state.outputs.get_all():
outputs.append(
[f"{itask.identity} {msg})", is_completed])
results[itask.identity] = {
"meta": itask.tdef.describe(),
"prerequisites": itask.state.prerequisites_dump(),
"outputs": outputs,
"extras": extras}
return results, bad_items

def info_ping_task(self, task_id, exists_only=False):
"""Return True if task exists and running."""
Expand Down
52 changes: 0 additions & 52 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,58 +1225,6 @@ def ping_task(self, id_, exists_only=False):
else:
return False, "task not found"

def get_task_requisites(self, items, list_prereqs=False):
"""Return task prerequisites.
Result in a dict of a dict:
{
"task_id": {
"meta": {key: value, ...},
"prerequisites": {key: value, ...},
"outputs": {key: value, ...},
"extras": {key: value, ...},
},
...
}
"""
itasks, bad_items = self.filter_task_proxies(items)
results = {}
now = time()
for itask in itasks:
if list_prereqs:
results[itask.identity] = {
'prerequisites': itask.state.prerequisites_dump(
list_prereqs=True)}
continue

extras = {}
if itask.tdef.clocktrigger_offset is not None:
extras['Clock trigger time reached'] = (
not itask.is_waiting_clock(now))
extras['Triggers at'] = get_time_string_from_unix_time(
itask.clock_trigger_time)
for trig, satisfied in itask.state.external_triggers.items():
if satisfied:
extras['External trigger "%s"' % trig] = 'satisfied'
else:
extras['External trigger "%s"' % trig] = 'NOT satisfied'
for label, satisfied in itask.state.xtriggers.items():
extra = 'xtrigger "%s"' % label
if satisfied:
extras[extra] = 'satisfied'
else:
extras[extra] = 'NOT satisfied'

outputs = []
for _, msg, is_completed in itask.state.outputs.get_all():
outputs.append(["%s %s" % (itask.identity, msg), is_completed])
results[itask.identity] = {
"meta": itask.tdef.describe(),
"prerequisites": itask.state.prerequisites_dump(),
"outputs": outputs,
"extras": extras}
return results, bad_items

def filter_task_proxies(self, items):
"""Return task proxies that match names, points, states in items.
Expand Down
16 changes: 7 additions & 9 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,18 @@ def add_trig(self, label: str, fctx: SubFuncContext, fdir: str):
ValueError: if any string template in the function context
arguments are not present in the expected template values.
"""
fname = fctx.func_name
try:
func = get_func(fctx.func_name, fdir)
func = get_func(fname, fdir)
except ImportError:
raise ImportError(
"ERROR: xtrigger module '%s' not found" % fctx.func_name)
f"ERROR: xtrigger module '{fname}' not found")
except AttributeError:
raise AttributeError(
"ERROR: attribute '%s' not found in xtrigger module '%s'" % (
fctx.func_name, fctx.func_name))
f"ERROR: '{fname}' not found in xtrigger module '{fname}'")
if not callable(func):
raise ValueError(
"ERROR: '%s' in xtrigger module '%s' is not callable" % (
fctx.func_name, fctx.func_name))
f"ERROR: '{fname}' not callable in xtrigger module '{fname}'")
self.functx_map[label] = fctx
# Check any string templates in the function arg values (note this
# won't catch bad task-specific values - which are added dynamically).
Expand All @@ -172,8 +171,7 @@ def add_trig(self, label: str, fctx: SubFuncContext, fdir: str):
for match in RE_STR_TMPL.findall(argv):
if match not in ARG_VAL_TEMPLATES:
raise ValueError(
"Illegal template in xtrigger %s: %s" % (
label, match))
f"Illegal template in xtrigger {label}: {match}")
except TypeError:
# Not a string arg.
pass
Expand Down Expand Up @@ -330,7 +328,7 @@ def callback(self, ctx: SubFuncContext):
satisfied, results = json.loads(ctx.out)
except (ValueError, TypeError):
return
LOG.debug('%s: returned %s' % (sig, results))
LOG.debug('%s: returned %s', sig, results)
if satisfied:
LOG.info('xtrigger satisfied: %s = %s', ctx.label, sig)
self.pflag = True
Expand Down

0 comments on commit f5a1f10

Please sign in to comment.