Skip to content

Commit

Permalink
Clean up use of task ID prefix in task output messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Mar 14, 2016
1 parent 08aba1f commit 70a3fc4
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 109 deletions.
4 changes: 3 additions & 1 deletion lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1700,14 +1700,16 @@ def generate_taskdefs(self, line, left_nodes, right, section, seq,
for lbl, msg in self.cfg['runtime'][name]['outputs'].items():
outp = output(msg, base_interval)
# Check for a cycle offset placeholder.
# TODO - DEPRECATE OUTPUT OFFSET PLACEHOLDERS
if not re.search(r'\[[^\]]*\]', msg):
print >> sys.stderr, (
"Message outputs require an "
"offset placeholder (e.g. '[]' or '[-P2M]'):")
print >> sys.stderr, " %s = %s" % (lbl, msg)
raise SuiteConfigError(
'ERROR: bad message output string')
self.taskdefs[name].outputs.append(outp)
if outp not in self.taskdefs[name].outputs:
self.taskdefs[name].outputs.append(outp)

def generate_triggers(self, lexpression, left_nodes, right, seq, suicide):
if not right or not left_nodes:
Expand Down
3 changes: 3 additions & 0 deletions lib/cylc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,6 @@ def get(self, point):
if self.msg_offset:
new_point = point + self.msg_offset
return re.sub('\[.*\]', str(new_point), self.msg)

def __eq__(self, other):
return self.msg == other.msg and self.msg_offset == other.msg_offset
58 changes: 28 additions & 30 deletions lib/cylc/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,26 @@ def dump(self):
return res

def all_completed(self):
if len(self.not_completed) == 0:
return True
else:
return False
return len(self.not_completed) == 0

def is_completed(self, message):
if message in self.completed:
return True
else:
return False
def is_completed(self, msg):
return self._qualify(msg) in self.completed

def set_completed(self, message):
def _qualify(self, msg):
# Prefix a message string with task ID.
return "%s %s" % (self.owner_id, msg)

def set_completed(self, msg):
message = self._qualify(msg)
try:
del self.not_completed[message]
except:
pass
self.completed[message] = self.owner_id

def exists(self, message):
if message in self.completed or message in self.not_completed:
return True
else:
return False
def exists(self, msg):
message = self._qualify(msg)
return message in self.completed or message in self.not_completed

def set_all_incomplete(self):
for message in self.completed.keys():
Expand All @@ -86,8 +83,9 @@ def set_all_completed(self):
del self.not_completed[message]
self.completed[message] = self.owner_id

def add(self, message, completed=False):
# Add a new output message
def add(self, msg, completed=False):
# Add a new output message, prepend my task ID.
message = self._qualify(msg)
if message in self.completed or message in self.not_completed:
# duplicate output messages are an error.
print >> sys.stderr, (
Expand All @@ -97,17 +95,17 @@ def add(self, message, completed=False):
else:
self.completed[message] = self.owner_id

def remove(self, message, fail_silently=False):
if message in self.completed:
def remove(self, msg, fail_silently=False):
message = self._qualify(msg)
try:
del self.completed[message]
elif message in self.not_completed:
del self.not_completed[message]
elif not fail_silently:
print >> sys.stderr, 'WARNING: no such output to delete:'
print >> sys.stderr, ' => ', message

def register(self):
# automatically define special outputs common to all tasks
self.add(self.owner_id + ' submitted')
self.add(self.owner_id + ' started')
self.add(self.owner_id + ' succeeded')
except:
try:
del self.not_completed[message]
except:
pass

def add_standard(self):
# Add standard outputs common to all tasks.
for state in ['submitted', 'started', 'succeeded']:
self.add(state)
9 changes: 6 additions & 3 deletions lib/cylc/prerequisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
from cylc.conditional_simplifier import ConditionalSimplifier
from cylc.cycling.loader import get_point
from cylc.task_id import TaskID


"""A task prerequisite.
Expand Down Expand Up @@ -88,10 +89,12 @@ def set_condition(self, expr):
if k in drop_these:
continue
if self.start_point:
task = re.search(r'(.*).(.*) ', self.messages[k])
if task.group:
m = re.search(
r'(' + TaskID.NAME_RE + ')\.(' +
TaskID.POINT_RE + ') ', self.messages[k])
if m:
try:
foo = task.group().split(".")[1].rstrip()
foo = m.group().split(".")[1].rstrip()
if get_point(foo) < self.start_point:
drop_these.append(k)
except IndexError:
Expand Down
4 changes: 0 additions & 4 deletions lib/cylc/task_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class TaskMessage(object):
FAILED = "failed"
STARTED = "started"
SUCCEEDED = "succeeded"
STATUSES = (STARTED, SUCCEEDED, FAILED)

CYLC_JOB_PID = "CYLC_JOB_PID"
CYLC_JOB_INIT_TIME = "CYLC_JOB_INIT_TIME"
Expand Down Expand Up @@ -115,7 +114,6 @@ def __init__(self, priority=NORMAL):
def send(self, messages):
"""Send messages back to the suite."""
self._update_job_status_file(messages)

if self.mode != 'scheduler' or self.polling:
# no suite to communicate with, just print to stdout.
self._print_messages(messages)
Expand Down Expand Up @@ -310,8 +308,6 @@ def _update_job_status_file(self, messages):
job_status_file.write("%s=%s|%s|%s\n" % (
self.CYLC_MESSAGE, self.true_event_time, self.priority,
message))
if message in self.STATUSES:
messages[i] = "%s %s" % (self.task_id, message)
messages[i] += ' at ' + self.true_event_time
if job_status_file:
try:
Expand Down
Loading

0 comments on commit 70a3fc4

Please sign in to comment.