Skip to content

Commit

Permalink
cylc#119: auto-detect minimum task repetition
Browse files Browse the repository at this point in the history
  • Loading branch information
benfitzpatrick committed Jul 4, 2014
1 parent d8d7213 commit 3b8e15d
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 37 deletions.
105 changes: 81 additions & 24 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@
checking, then construct task proxy objects and graph structures.
"""

AUTO_RUNAHEAD_FACTOR = 2
AUTO_RUNAHEAD_FACTOR = 2 # Factor to apply to the minimum cycling interval.
CLOCK_OFFSET_RE = re.compile('(\w+)\s*\(\s*([-+]*\s*[\d.]+)\s*\)')
NUM_RUNAHEAD_SEQ_POINTS = 5 # Number of cycle points to look at per sequence.
TRIGGER_TYPES = [ 'submit', 'submit-fail', 'start', 'succeed', 'fail', 'finish' ]

try:
Expand Down Expand Up @@ -554,17 +555,67 @@ def compute_runahead_limit( self ):
self.runahead_limit = self.custom_runahead_limit
return

rlim = None
intervals = []
offsets = []
for seq in self.sequences:
i = seq.get_interval()
if i:
intervals.append( i )
offsets.append( seq.get_offset() )
initial_point = get_point(
self.cfg['scheduling']['initial cycle point'])

offsets = set([])
seq_points = {}
point_tasks = {}

if intervals:
rlim = min( intervals ) * AUTO_RUNAHEAD_FACTOR
# Loop through all sequences and extract the first few points.
for seq in self.sequences:
seq_points[seq] = []
seq_point_0 = seq.get_first_point(initial_point)
if seq_point_0 is None:
continue
seq_points[seq].append(seq_point_0)
point_tasks.setdefault(seq_point_0, set())
iter_point = seq_point_0
for i in range(NUM_RUNAHEAD_SEQ_POINTS - 1):
next_point = seq.get_next_point(iter_point)
if next_point is None:
break
seq_points[seq].append(next_point)
point_tasks.setdefault(next_point, set())
iter_point = next_point

# Loop through all tasks and log their sequences.
for name, taskdef in self.taskdefs.items():
if taskdef.min_intercycle_offset is not None:
if taskdef.min_intercycle_offset:
offsets.add(taskdef.min_intercycle_offset)
for seq in taskdef.sequences:
for point in seq_points.get(seq, []):
point_tasks[point].add(name)

points = sorted(point_tasks.keys())
min_interval = None

# Loop through all point pairs with common tasks to get the interval.
for i, point in enumerate(points):
tasks = point_tasks[point]
for other_point in points[i + 1:]:
other_tasks = point_tasks[other_point]
if point == other_point:
# Technically, hash-different points could compare equal.
continue
common_tasks = tasks.intersection(other_tasks)
if common_tasks:
# These points share the same task or tasks.
interval = abs(other_point - point)
if min_interval is None or interval < min_interval:
min_interval = interval
min_interval_task_example = common_tasks.pop()
min_interval_points = [point, other_point]

if min_interval is None:
rlim = get_interval_cls().get_null() # Null interval.
description = "(null)"
else:
rlim = min_interval * AUTO_RUNAHEAD_FACTOR
description = "from %d * %s (min interval)" % (
AUTO_RUNAHEAD_FACTOR, min_interval
)
if offsets:
min_offset = min( offsets )
if min_offset < get_interval_cls().get_null():
Expand All @@ -573,11 +624,12 @@ def compute_runahead_limit( self ):
#... that extend past the default rl
# set to offsets plus one minimum interval
rlim = abs(min_offset) + rlim
description += " + %s (future trigger)" % (
abs(min_offset))
if flags.verbose:
print "Runahead limit: %s: %s" % (rlim, description)

self.runahead_limit = rlim

if flags.verbose:
print "Runahead limit:", self.runahead_limit

def get_runahead_limit( self ):
# may be None (no cycling tasks)
Expand Down Expand Up @@ -1282,6 +1334,8 @@ def generate_taskdefs( self, line, left_nodes, right, ttype, section, seq,
offset_seq_map[str(offset)] = seq_offset
self.taskdefs[name].add_sequence(
seq_offset, is_implicit=True)
if seq_offset not in self.sequences:
self.sequences.append(seq_offset)
# We don't handle implicit cycling in new-style cycling.
else:
self.taskdefs[ name ].add_sequence(seq)
Expand Down Expand Up @@ -1313,20 +1367,23 @@ def generate_triggers( self, lexpression, left_nodes, right, seq, suicide ):
# (GraphNodeError checked above)
cycle_point = None
lnode = graphnode(left, base_interval=base_interval)
ltaskdef = self.taskdefs[lnode.name]

if lnode.intercycle:
self.taskdefs[lnode.name].intercycle = True
if (self.taskdefs[lnode.name].intercycle_offset is None or (
lnode.offset is not None and
lnode.offset >
self.taskdefs[lnode.name].intercycle_offset)):
self.taskdefs[lnode.name].intercycle_offset = lnode.offset
ltaskdef.intercycle = True
if (ltaskdef.max_intercycle_offset is None or
lnode.offset > ltaskdef.max_intercycle_offset):
ltaskdef.max_intercycle_offset = lnode.offset
if (ltaskdef.min_intercycle_offset is None or
lnode.offset < ltaskdef.min_intercycle_offset):
ltaskdef.min_intercycle_offset = lnode.offset

if lnode.offset_is_from_ict:
last_point = seq.get_stop_point()
first_point = self.taskdefs[lnode.name].ict - lnode.offset
first_point = ltaskdef.ict - lnode.offset
if first_point and last_point is not None:
self.taskdefs[lnode.name].intercycle_offset = (last_point - first_point)
else:
self.taskdefs[lnode.name].intercycle_offset = None
offset = (last_point - first_point)
ltaskdef.max_intercycle_offset = offset
cycle_point = first_point
trigger = self.set_trigger( lnode.name, right, lnode.output, lnode.offset, cycle_point, suicide, seq.get_interval() )
if not trigger:
Expand Down
2 changes: 1 addition & 1 deletion lib/cylc/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def get_next_point_on_sequence(self, point):
return result

def get_first_point( self, point):
"""Return the first point >= to poing, or None if out of bounds."""
"""Return the first point >= to point, or None if out of bounds."""
try:
return ISO8601Point(self._cached_first_point_values[point.value])
except KeyError:
Expand Down
15 changes: 9 additions & 6 deletions lib/cylc/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def __init__( self, name, rtcfg, run_mode, ict ):

# some defaults
self.intercycle = False
self.intercycle_offset = get_interval_cls().get_null()
self.max_intercycle_offset = get_interval_cls().get_null()
self.min_intercycle_offset = get_interval_cls().get_null()
self.sequential = False
self.cycling = False
self.modifiers = []
Expand Down Expand Up @@ -270,7 +271,7 @@ def tclass_init( sself, start_point, initial_state, stop_c_time=None,
sself.startup = startup
sself.submit_num = submit_num
sself.exists=exists
sself.intercycle_offset = self.intercycle_offset
sself.max_intercycle_offset = self.max_intercycle_offset

if self.cycling and startup:
# adjust up to the first on-sequence cycle point
Expand All @@ -282,10 +283,11 @@ def tclass_init( sself, start_point, initial_state, stop_c_time=None,
adjusted.append( adj )
if adjusted:
sself.tag = min( adjusted )
if sself.intercycle_offset is None:
if sself.max_intercycle_offset is None:
sself.cleanup_cutoff = None
else:
sself.cleanup_cutoff = sself.tag + sself.intercycle_offset
sself.cleanup_cutoff = (
sself.tag + sself.max_intercycle_offset)
sself.id = TaskID.get( sself.name, str(sself.tag) )
else:
sself.tag = None
Expand All @@ -294,10 +296,11 @@ def tclass_init( sself, start_point, initial_state, stop_c_time=None,
return
else:
sself.tag = start_point
if sself.intercycle_offset is None:
if sself.max_intercycle_offset is None:
sself.cleanup_cutoff = None
else:
sself.cleanup_cutoff = sself.tag + sself.intercycle_offset
sself.cleanup_cutoff = (
sself.tag + sself.max_intercycle_offset)
sself.id = TaskID.get( sself.name, str(sself.tag) )

sself.c_time = sself.tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
#-------------------------------------------------------------------------------
set_test_number 4
#-------------------------------------------------------------------------------
install_suite $TEST_NAME_BASE default
install_suite $TEST_NAME_BASE default-simple
#-------------------------------------------------------------------------------
TEST_NAME=$TEST_NAME_BASE-validate
run_ok $TEST_NAME cylc validate $SUITE_NAME
run_ok $TEST_NAME cylc validate -v $SUITE_NAME
#-------------------------------------------------------------------------------
TEST_NAME=$TEST_NAME_BASE-run
run_fail $TEST_NAME cylc run --debug $SUITE_NAME
Expand All @@ -32,7 +32,7 @@ TEST_NAME=$TEST_NAME_BASE-check-fail
DB=$(cylc get-global-config --print-run-dir)/$SUITE_NAME/cylc-suite.db
RUNAHEAD=$(sqlite3 $DB "select max(cycle) from task_states")
# manual comparison for the test
if [[ "$RUNAHEAD" == "20100101T0600Z" ]]; then
if [[ "$RUNAHEAD" == "20100101T1200Z" ]]; then
ok $TEST_NAME
else
fail $TEST_NAME
Expand Down
46 changes: 46 additions & 0 deletions tests/runahead/02-check-default-complex.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/bin/bash
#C: THIS FILE IS PART OF THE CYLC SUITE ENGINE.
#C: Copyright (C) 2008-2014 Hilary Oliver, NIWA
#C:
#C: This program is free software: you can redistribute it and/or modify
#C: it under the terms of the GNU General Public License as published by
#C: the Free Software Foundation, either version 3 of the License, or
#C: (at your option) any later version.
#C:
#C: This program is distributed in the hope that it will be useful,
#C: but WITHOUT ANY WARRANTY; without even the implied warranty of
#C: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#C: GNU General Public License for more details.
#C:
#C: You should have received a copy of the GNU General Public License
#C: along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
#C: Test default runahead limit behaviour is still the same
. $(dirname $0)/test_header
#-------------------------------------------------------------------------------
set_test_number 5
#-------------------------------------------------------------------------------
install_suite $TEST_NAME_BASE default-complex
#-------------------------------------------------------------------------------
TEST_NAME=$TEST_NAME_BASE-validate
run_ok $TEST_NAME cylc validate -v $SUITE_NAME
grep_ok "Runahead limit: PT4H" $TEST_NAME.stdout
#-------------------------------------------------------------------------------
TEST_NAME=$TEST_NAME_BASE-run
run_fail $TEST_NAME cylc run --debug $SUITE_NAME
#-------------------------------------------------------------------------------
TEST_NAME=$TEST_NAME_BASE-check-fail
DB=$(cylc get-global-config --print-run-dir)/$SUITE_NAME/cylc-suite.db
RUNAHEAD=$(sqlite3 $DB "select max(cycle) from task_states")
# manual comparison for the test
if [[ "$RUNAHEAD" == "20100101T0400Z" ]]; then
ok $TEST_NAME
else
fail $TEST_NAME
fi
#-------------------------------------------------------------------------------
TEST_NAME=$TEST_NAME_BASE-check-timeout
LOG=$(cylc get-global-config --print-run-dir)/$SUITE_NAME/log/suite/log
run_ok $TEST_NAME grep 'Abort on suite timeout is set' $LOG
#-------------------------------------------------------------------------------
purge_suite $SUITE_NAME
48 changes: 48 additions & 0 deletions tests/runahead/03-check-default-future.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/bin/bash
#C: THIS FILE IS PART OF THE CYLC SUITE ENGINE.
#C: Copyright (C) 2008-2014 Hilary Oliver, NIWA
#C:
#C: This program is free software: you can redistribute it and/or modify
#C: it under the terms of the GNU General Public License as published by
#C: the Free Software Foundation, either version 3 of the License, or
#C: (at your option) any later version.
#C:
#C: This program is distributed in the hope that it will be useful,
#C: but WITHOUT ANY WARRANTY; without even the implied warranty of
#C: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#C: GNU General Public License for more details.
#C:
#C: You should have received a copy of the GNU General Public License
#C: along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
#C: Test default runahead limit behaviour is still the same
. $(dirname $0)/test_header
#-------------------------------------------------------------------------------
set_test_number 5
#-------------------------------------------------------------------------------
install_suite $TEST_NAME_BASE default-future
#-------------------------------------------------------------------------------
TEST_NAME=$TEST_NAME_BASE-validate
run_ok $TEST_NAME cylc validate -v $SUITE_NAME
grep_ok "Runahead limit: PT8H" $TEST_NAME.stdout
cat $TEST_NAME.stdout >/dev/tty
cat $TEST_NAME.stderr >/dev/tty
#-------------------------------------------------------------------------------
TEST_NAME=$TEST_NAME_BASE-run
run_fail $TEST_NAME cylc run --debug $SUITE_NAME
#-------------------------------------------------------------------------------
TEST_NAME=$TEST_NAME_BASE-check-fail
DB=$(cylc get-global-config --print-run-dir)/$SUITE_NAME/cylc-suite.db
RUNAHEAD=$(sqlite3 $DB "select max(cycle) from task_states")
# manual comparison for the test
if [[ "$RUNAHEAD" == "20100101T0800Z" ]]; then
ok $TEST_NAME
else
fail $TEST_NAME
fi
#-------------------------------------------------------------------------------
TEST_NAME=$TEST_NAME_BASE-check-timeout
LOG=$(cylc get-global-config --print-run-dir)/$SUITE_NAME/log/suite/log
run_ok $TEST_NAME grep 'Abort on suite timeout is set' $LOG
#-------------------------------------------------------------------------------
purge_suite $SUITE_NAME
File renamed without changes.
24 changes: 24 additions & 0 deletions tests/runahead/default-complex/suite.rc
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[cylc]
UTC mode = True
[[event hooks]]
timeout = 0.1
abort on timeout = True
[scheduling]
initial cycle point = 20100101T00
final cycle point = 20100105T00
[[dependencies]]
# T00, T07, T14, ...
[[[PT7H]]]
graph = "foo => bar"
# T00, T12, T18... 2 hours difference
[[[T00, T12, T18]]]
graph = "foo"
[[[T04]]]
graph = "run_ok"
[[[T05]]]
graph = "never_run"
[runtime]
[[foo]]
command scripting = false
[[bar]]
command scripting = true
23 changes: 23 additions & 0 deletions tests/runahead/default-future/suite.rc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[cylc]
UTC mode = True
[[event hooks]]
timeout = 0.1
abort on timeout = True
[scheduling]
initial cycle point = 20100101T00
final cycle point = 20100105T00
[[dependencies]]
# Every hour, so would expect a 2 hour runahead limit.
[[[PT1H]]]
graph = "foo => bar"
# A 6 hour future trigger, => 2 + 6 = 8 hour runahead limit.
[[[T04/PT6H]]]
graph = """
baz[+PT6H] => bar
baz
"""
[runtime]
[[foo]]
command scripting = false
[[bar]]
command scripting = true
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
timeout = 0.1
abort on timeout = True
[scheduling]
initial cycle time = 20100101T00
final cycle time = 20100105T00
initial cycle point = 20100101T00
final cycle point = 20100105T00
[[dependencies]]
[[[PT3H]]]
# Intervals are all 24 hours, but we really have a 6 hour repetition.
[[[T00, T06, T12, T18]]]
graph = "foo => bar"
[runtime]
[[foo]]
Expand Down

0 comments on commit 3b8e15d

Please sign in to comment.