Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up task output message handling #1761

Merged
merged 9 commits into from
Mar 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions doc/cug.tex
Original file line number Diff line number Diff line change
Expand Up @@ -3348,23 +3348,12 @@ \subsubsection{Trigger Types}
\paragraph{Message Triggers}
\label{MessageTriggers}

Message triggers allow triggering off custom messages emitted by a task as it
runs. {\em Message outputs} must be registered for the task in the suite
definition, and matching messages sent back to the suite daemon by the
task at the appropriate time. {\em Note that polling does not yet detect custom
message output completion.} Custom output messages should normally contain the
cycle point in order to
distinguish between the outputs of different instances of the same task.
The task implementation can use \lstinline=$CYLC_TASK_CYCLE_POINT= for
this, or \lstinline@cylc cycle-point --offset=P2M@ for an offset value.
The matching message string registered in the suite definition, however, does
not get interpreted by the shell, so a cycle point placeholder is used instead:
\lstinline=[]= for the current cycle point, or for an offset \lstinline=[-P2M]=.

The \lstinline=$CYLC_DIR/examples/message-triggers/= suite is a self-contained
example that illustrates message triggering. Note that the graph trigger
notation uses a label that selects the message registered under the task
\lstinline=[runtime]= section.
Tasks can also trigger off custom output messages. These must be registered in
the \lstinline=[runtime]= section of the emitting task, and reported using the
\lstinline=cylc message= command in task scripting. The graph trigger notation
refers to the item name of the registered output message.
The example suite \lstinline=$CYLC_DIR/examples/message-triggers= illustrates
message triggering.

\lstset{language=suiterc}
\lstinputlisting{../examples/message-triggers/suite.rc}
Expand Down
17 changes: 7 additions & 10 deletions doc/suiterc.tex
Original file line number Diff line number Diff line change
Expand Up @@ -1706,26 +1706,23 @@ \subsection{[runtime]}

\paragraph[{[[[}outputs{]]]}]{[runtime] \textrightarrow [[\_\_NAME\_\_]] \textrightarrow [[[outputs]]]}

This section is for registering custom message outputs that other tasks can
trigger off instead of the standard triggers. The task implementation must send
corresponding messages using the \lstinline=cylc task message= command at the
appropriate time. See~\ref{MessageTriggers} for more information.
Register custom task outputs for use in message triggering in this section
(\ref{MessageTriggers})

\subparagraph[\_\_OUTPUT\_\_ ]{[runtime] \textrightarrow [[\_\_NAME\_\_]] \textrightarrow [[[outputs]]] \textrightarrow \_\_OUTPUT\_\_}

Replace \_\_OUTPUT\_\_ with one or more labelled output messages, and use the
labels in graph trigger notation. Messages should contain a placeholder for
the current cycle point (\lstinline=[]=) or some offset from it (e.g.\ \lstinline=[-P2M]=).
Replace \_\_OUTPUT\_\_ with one or more custom task output messages
(\ref{MessageTriggers}). The item name is used to select the custom output
message in graph trigger notation.
\begin{myitemize}
\item {\em type:} string
\item {\em default:} (none)
\item{ \em examples:}
\end{myitemize}
\begin{lstlisting}
foo = "sea state products ready for []"
bar = "nwp restart files ready for [-PT6H]"
out1 = "sea state products ready"
out2 = "NWP restart files completed"
\end{lstlisting}
See~\ref{MessageTriggers} for more information.

\paragraph[{[[[}suite state polling{]]]}]{[runtime] \textrightarrow [[\_\_NAME\_\_]] \textrightarrow [[[suite state polling]]]}

Expand Down
32 changes: 0 additions & 32 deletions examples/cycling/InternalOutputs/suite.rc

This file was deleted.

27 changes: 11 additions & 16 deletions examples/message-triggers/suite.rc
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,18 @@ title = "test suite for cylc-6 message triggers"
final cycle point = 20141201T00
[[dependencies]]
[[[P2M]]]
graph = """
# bar triggers off message 'x' emitted by foo:
foo:x => bar
# baz triggers off message 'y' emitted by the previous instance of foo:
foo[-P2M]:y => baz
"""
graph = """foo:out1 => bar
foo[-P2M]:out2 => baz"""
[runtime]
[[foo]]
script = """
echo HELLO
sleep 2
TARGET_POINT=$CYLC_TASK_CYCLE_POINT
cylc message "file 1 for $TARGET_POINT done"
sleep 2
TARGET_POINT=$(cylc cycle-point --offset P2M)
cylc message "file 2 for $TARGET_POINT done"
sleep 2"""
sleep 5
cylc message "file 1 done"
sleep 10
cylc message "file 2 done"
sleep 10"""
[[[outputs]]]
x = "file 1 for [] done"
y = "file 2 for [P2M] done"
out1 = "file 1 done"
out2 = "file 2 done"
[[bar, baz]]
script = sleep 10
19 changes: 6 additions & 13 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
from isodatetime.data import Calendar
from envvar import check_varnames, expandvars
from copy import deepcopy, copy
from output import output
from message_output import MessageOutput
from graphnode import graphnode, GraphNodeError
from print_tree import print_tree
from cylc.prerequisite import TriggerExpressionError
from regpath import RegPath
from trigger import trigger
from task_trigger import TaskTrigger
from parsec.util import replicate
from cylc.task_id import TaskID
from C3MRO import C3
Expand Down Expand Up @@ -1698,16 +1698,9 @@ def generate_taskdefs(self, line, left_nodes, right, section, seq,
if self.run_mode == 'live':
# Record message outputs.
for lbl, msg in self.cfg['runtime'][name]['outputs'].items():
outp = output(msg, base_interval)
# Check for a cycle offset placeholder.
if not re.search(r'\[[^\]]*\]', msg):
print >> sys.stderr, (
"Message outputs require an "
"offset placeholder (e.g. '[]' or '[-P2M]'):")
print >> sys.stderr, " %s = %s" % (lbl, msg)
raise SuiteConfigError(
'ERROR: bad message output string')
self.taskdefs[name].outputs.append(outp)
outp = MessageOutput(msg, base_interval)
if outp not in self.taskdefs[name].outputs:
self.taskdefs[name].outputs.append(outp)

def generate_triggers(self, lexpression, left_nodes, right, seq, suicide):
if not right or not left_nodes:
Expand Down Expand Up @@ -1743,7 +1736,7 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, suicide):
offset_tuple = (lnode.offset_string, None)
ltaskdef.intercycle_offsets.append(offset_tuple)

trig = trigger(
trig = TaskTrigger(
lnode.name, lnode.output, lnode.offset_string, cycle_point,
suicide, self.cfg['runtime'][lnode.name]['outputs'],
base_interval)
Expand Down
51 changes: 51 additions & 0 deletions lib/cylc/message_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2015 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import re
from cycling.loader import get_interval, get_interval_cls
from task_trigger import get_message_offset


class MessageOutput(object):
"""
A task message output.

Used to generate an output string for a message trigger at a cycle point.

TODO - these can be plain strings once the deprecated cycle point offset
placeholders are removed from cylc (see GitHub #1761).

"""

def __init__(self, msg, base_interval=None):
self.msg = msg
self.msg_offset = get_message_offset(msg, base_interval)

def get_string(self, point):
"""Return the message string for this cycle point.

Placeholders are replaced with the actual cycle point offset.

"""
new_point = point
if self.msg_offset:
new_point = point + self.msg_offset
return re.sub('\[.*\]', str(new_point), self.msg)

def __eq__(self, other):
return self.msg == other.msg and self.msg_offset == other.msg_offset
62 changes: 0 additions & 62 deletions lib/cylc/output.py

This file was deleted.

9 changes: 6 additions & 3 deletions lib/cylc/prerequisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
from cylc.conditional_simplifier import ConditionalSimplifier
from cylc.cycling.loader import get_point
from cylc.task_id import TaskID


"""A task prerequisite.
Expand Down Expand Up @@ -88,10 +89,12 @@ def set_condition(self, expr):
if k in drop_these:
continue
if self.start_point:
task = re.search(r'(.*).(.*) ', self.messages[k])
if task.group:
m = re.search(
r'(' + TaskID.NAME_RE + ')\.(' +
TaskID.POINT_RE + ') ', self.messages[k])
if m:
try:
foo = task.group().split(".")[1].rstrip()
foo = m.group().split(".")[1].rstrip()
if get_point(foo) < self.start_point:
drop_these.append(k)
except IndexError:
Expand Down
4 changes: 0 additions & 4 deletions lib/cylc/task_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class TaskMessage(object):
FAILED = "failed"
STARTED = "started"
SUCCEEDED = "succeeded"
STATUSES = (STARTED, SUCCEEDED, FAILED)

CYLC_JOB_PID = "CYLC_JOB_PID"
CYLC_JOB_INIT_TIME = "CYLC_JOB_INIT_TIME"
Expand Down Expand Up @@ -115,7 +114,6 @@ def __init__(self, priority=NORMAL):
def send(self, messages):
"""Send messages back to the suite."""
self._update_job_status_file(messages)

if self.mode != 'scheduler' or self.polling:
# no suite to communicate with, just print to stdout.
self._print_messages(messages)
Expand Down Expand Up @@ -310,8 +308,6 @@ def _update_job_status_file(self, messages):
job_status_file.write("%s=%s|%s|%s\n" % (
self.CYLC_MESSAGE, self.true_event_time, self.priority,
message))
if message in self.STATUSES:
messages[i] = "%s %s" % (self.task_id, message)
messages[i] += ' at ' + self.true_event_time
if job_status_file:
try:
Expand Down
Loading