Skip to content

Commit

Permalink
Merge pull request #1761 from hjoliver/1604.no-taskid-in-messages
Browse files Browse the repository at this point in the history
Clean up task output message handling
  • Loading branch information
arjclark committed Mar 29, 2016
2 parents 793725a + a2a2ccf commit 7abb89d
Show file tree
Hide file tree
Showing 25 changed files with 293 additions and 326 deletions.
23 changes: 6 additions & 17 deletions doc/cug.tex
Original file line number Diff line number Diff line change
Expand Up @@ -3307,23 +3307,12 @@ \subsubsection{Trigger Types}
\paragraph{Message Triggers}
\label{MessageTriggers}

Message triggers allow triggering off custom messages emitted by a task as it
runs. {\em Message outputs} must be registered for the task in the suite
definition, and matching messages sent back to the suite daemon by the
task at the appropriate time. {\em Note that polling does not yet detect custom
message output completion.} Custom output messages should normally contain the
cycle point in order to
distinguish between the outputs of different instances of the same task.
The task implementation can use \lstinline=$CYLC_TASK_CYCLE_POINT= for
this, or \lstinline@cylc cycle-point --offset=P2M@ for an offset value.
The matching message string registered in the suite definition, however, does
not get interpreted by the shell, so a cycle point placeholder is used instead:
\lstinline=[]= for the current cycle point, or for an offset \lstinline=[-P2M]=.

The \lstinline=$CYLC_DIR/examples/message-triggers/= suite is a self-contained
example that illustrates message triggering. Note that the graph trigger
notation uses a label that selects the message registered under the task
\lstinline=[runtime]= section.
Tasks can also trigger off custom output messages. These must be registered in
the \lstinline=[runtime]= section of the emitting task, and reported using the
\lstinline=cylc message= command in task scripting. The graph trigger notation
refers to the item name of the registered output message.
The example suite \lstinline=$CYLC_DIR/examples/message-triggers= illustrates
message triggering.

\lstset{language=suiterc}
\lstinputlisting{../examples/message-triggers/suite.rc}
Expand Down
17 changes: 7 additions & 10 deletions doc/suiterc.tex
Original file line number Diff line number Diff line change
Expand Up @@ -1706,26 +1706,23 @@ \subsection{[runtime]}

\paragraph[{[[[}outputs{]]]}]{[runtime] \textrightarrow [[\_\_NAME\_\_]] \textrightarrow [[[outputs]]]}

This section is for registering custom message outputs that other tasks can
trigger off instead of the standard triggers. The task implementation must send
corresponding messages using the \lstinline=cylc task message= command at the
appropriate time. See~\ref{MessageTriggers} for more information.
Register custom task outputs for use in message triggering in this section
(\ref{MessageTriggers})

\subparagraph[\_\_OUTPUT\_\_ ]{[runtime] \textrightarrow [[\_\_NAME\_\_]] \textrightarrow [[[outputs]]] \textrightarrow \_\_OUTPUT\_\_}

Replace \_\_OUTPUT\_\_ with one or more labelled output messages, and use the
labels in graph trigger notation. Messages should contain a placeholder for
the current cycle point (\lstinline=[]=) or some offset from it (e.g.\ \lstinline=[-P2M]=).
Replace \_\_OUTPUT\_\_ with one or more custom task output messages
(\ref{MessageTriggers}). The item name is used to select the custom output
message in graph trigger notation.
\begin{myitemize}
\item {\em type:} string
\item {\em default:} (none)
\item{ \em examples:}
\end{myitemize}
\begin{lstlisting}
foo = "sea state products ready for []"
bar = "nwp restart files ready for [-PT6H]"
out1 = "sea state products ready"
out2 = "NWP restart files completed"
\end{lstlisting}
See~\ref{MessageTriggers} for more information.

\paragraph[{[[[}suite state polling{]]]}]{[runtime] \textrightarrow [[\_\_NAME\_\_]] \textrightarrow [[[suite state polling]]]}

Expand Down
32 changes: 0 additions & 32 deletions examples/cycling/InternalOutputs/suite.rc

This file was deleted.

27 changes: 11 additions & 16 deletions examples/message-triggers/suite.rc
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,18 @@ title = "test suite for cylc-6 message triggers"
final cycle point = 20141201T00
[[dependencies]]
[[[P2M]]]
graph = """
# bar triggers off message 'x' emitted by foo:
foo:x => bar
# baz triggers off message 'y' emitted by the previous instance of foo:
foo[-P2M]:y => baz
"""
graph = """foo:out1 => bar
foo[-P2M]:out2 => baz"""
[runtime]
[[foo]]
script = """
echo HELLO
sleep 2
TARGET_POINT=$CYLC_TASK_CYCLE_POINT
cylc message "file 1 for $TARGET_POINT done"
sleep 2
TARGET_POINT=$(cylc cycle-point --offset P2M)
cylc message "file 2 for $TARGET_POINT done"
sleep 2"""
sleep 5
cylc message "file 1 done"
sleep 10
cylc message "file 2 done"
sleep 10"""
[[[outputs]]]
x = "file 1 for [] done"
y = "file 2 for [P2M] done"
out1 = "file 1 done"
out2 = "file 2 done"
[[bar, baz]]
script = sleep 10
19 changes: 6 additions & 13 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
from isodatetime.data import Calendar
from envvar import check_varnames, expandvars
from copy import deepcopy, copy
from output import output
from message_output import MessageOutput
from graphnode import graphnode, GraphNodeError
from print_tree import print_tree
from cylc.prerequisite import TriggerExpressionError
from regpath import RegPath
from trigger import trigger
from task_trigger import TaskTrigger
from parsec.util import replicate
from cylc.task_id import TaskID
from C3MRO import C3
Expand Down Expand Up @@ -1698,16 +1698,9 @@ def generate_taskdefs(self, line, left_nodes, right, section, seq,
if self.run_mode == 'live':
# Record message outputs.
for lbl, msg in self.cfg['runtime'][name]['outputs'].items():
outp = output(msg, base_interval)
# Check for a cycle offset placeholder.
if not re.search(r'\[[^\]]*\]', msg):
print >> sys.stderr, (
"Message outputs require an "
"offset placeholder (e.g. '[]' or '[-P2M]'):")
print >> sys.stderr, " %s = %s" % (lbl, msg)
raise SuiteConfigError(
'ERROR: bad message output string')
self.taskdefs[name].outputs.append(outp)
outp = MessageOutput(msg, base_interval)
if outp not in self.taskdefs[name].outputs:
self.taskdefs[name].outputs.append(outp)

def generate_triggers(self, lexpression, left_nodes, right, seq, suicide):
if not right or not left_nodes:
Expand Down Expand Up @@ -1743,7 +1736,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, suicide):
offset_tuple = (lnode.offset_string, None)
ltaskdef.intercycle_offsets.append(offset_tuple)

trig = trigger(
trig = TaskTrigger(
lnode.name, lnode.output, lnode.offset_string, cycle_point,
suicide, self.cfg['runtime'][lnode.name]['outputs'],
base_interval)
Expand Down
51 changes: 51 additions & 0 deletions lib/cylc/message_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2015 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import re
from cycling.loader import get_interval, get_interval_cls
from task_trigger import get_message_offset


class MessageOutput(object):
"""
A task message output.
Used to generate an output string for a message trigger at a cycle point.
TODO - these can be plain strings once the deprecated cycle point offset
placeholders are removed from cylc (see GitHub #1761).
"""

def __init__(self, msg, base_interval=None):
self.msg = msg
self.msg_offset = get_message_offset(msg, base_interval)

def get_string(self, point):
"""Return the message string for this cycle point.
Placeholders are replaced with the actual cycle point offset.
"""
new_point = point
if self.msg_offset:
new_point = point + self.msg_offset
return re.sub('\[.*\]', str(new_point), self.msg)

def __eq__(self, other):
return self.msg == other.msg and self.msg_offset == other.msg_offset
62 changes: 0 additions & 62 deletions lib/cylc/output.py

This file was deleted.

9 changes: 6 additions & 3 deletions lib/cylc/prerequisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
from cylc.conditional_simplifier import ConditionalSimplifier
from cylc.cycling.loader import get_point
from cylc.task_id import TaskID


"""A task prerequisite.
Expand Down Expand Up @@ -90,10 +91,12 @@ def set_condition(self, expr):
if k in drop_these:
continue
if self.start_point:
task = re.search(r'(.*).(.*) ', self.messages[k])
if task.group:
m = re.search(
r'(' + TaskID.NAME_RE + ')\.(' +
TaskID.POINT_RE + ') ', self.messages[k])
if m:
try:
foo = task.group().split(".")[1].rstrip()
foo = m.group().split(".")[1].rstrip()
if get_point(foo) < self.start_point:
drop_these.append(k)
except IndexError:
Expand Down
4 changes: 0 additions & 4 deletions lib/cylc/task_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class TaskMessage(object):
FAILED = "failed"
STARTED = "started"
SUCCEEDED = "succeeded"
STATUSES = (STARTED, SUCCEEDED, FAILED)

CYLC_JOB_PID = "CYLC_JOB_PID"
CYLC_JOB_INIT_TIME = "CYLC_JOB_INIT_TIME"
Expand Down Expand Up @@ -115,7 +114,6 @@ def __init__(self, priority=NORMAL):
def send(self, messages):
"""Send messages back to the suite."""
self._update_job_status_file(messages)

if self.mode != 'scheduler' or self.polling:
# no suite to communicate with, just print to stdout.
self._print_messages(messages)
Expand Down Expand Up @@ -310,8 +308,6 @@ def _update_job_status_file(self, messages):
job_status_file.write("%s=%s|%s|%s\n" % (
self.CYLC_MESSAGE, self.true_event_time, self.priority,
message))
if message in self.STATUSES:
messages[i] = "%s %s" % (self.task_id, message)
messages[i] += ' at ' + self.true_event_time
if job_status_file:
try:
Expand Down
Loading

0 comments on commit 7abb89d

Please sign in to comment.