From 3b8e15de116adbaf7571f259a0c01139930f0248 Mon Sep 17 00:00:00 2001 From: Ben Fitzpatrick Date: Fri, 4 Jul 2014 15:20:26 +0100 Subject: [PATCH] #119: auto-detect minimum task repetition --- lib/cylc/config.py | 105 ++++++++++++++---- lib/cylc/cycling/iso8601.py | 2 +- lib/cylc/taskdef.py | 15 ++- ...ck-default.t => 01-check-default-simple.t} | 6 +- tests/runahead/02-check-default-complex.t | 46 ++++++++ tests/runahead/03-check-default-future.t | 48 ++++++++ ...2-no-final-cycle.t => 04-no-final-cycle.t} | 0 tests/runahead/default-complex/suite.rc | 24 ++++ tests/runahead/default-future/suite.rc | 23 ++++ .../{default => default-simple}/suite.rc | 7 +- 10 files changed, 239 insertions(+), 37 deletions(-) rename tests/runahead/{01-check-default.t => 01-check-default-simple.t} (93%) create mode 100644 tests/runahead/02-check-default-complex.t create mode 100644 tests/runahead/03-check-default-future.t rename tests/runahead/{02-no-final-cycle.t => 04-no-final-cycle.t} (100%) create mode 100644 tests/runahead/default-complex/suite.rc create mode 100644 tests/runahead/default-future/suite.rc rename tests/runahead/{default => default-simple}/suite.rc (59%) diff --git a/lib/cylc/config.py b/lib/cylc/config.py index d96591ab0ab..2dc06cfa609 100644 --- a/lib/cylc/config.py +++ b/lib/cylc/config.py @@ -43,8 +43,9 @@ checking, then construct task proxy objects and graph structures. """ -AUTO_RUNAHEAD_FACTOR = 2 +AUTO_RUNAHEAD_FACTOR = 2 # Factor to apply to the minimum cycling interval. CLOCK_OFFSET_RE = re.compile('(\w+)\s*\(\s*([-+]*\s*[\d.]+)\s*\)') +NUM_RUNAHEAD_SEQ_POINTS = 5 # Number of cycle points to look at per sequence. TRIGGER_TYPES = [ 'submit', 'submit-fail', 'start', 'succeed', 'fail', 'finish' ] try: @@ -554,17 +555,67 @@ def compute_runahead_limit( self ): self.runahead_limit = self.custom_runahead_limit return - rlim = None - intervals = [] - offsets = [] - for seq in self.sequences: - i = seq.get_interval() - if i: - intervals.append( i ) - offsets.append( seq.get_offset() ) + initial_point = get_point( + self.cfg['scheduling']['initial cycle point']) + + offsets = set([]) + seq_points = {} + point_tasks = {} - if intervals: - rlim = min( intervals ) * AUTO_RUNAHEAD_FACTOR + # Loop through all sequences and extract the first few points. + for seq in self.sequences: + seq_points[seq] = [] + seq_point_0 = seq.get_first_point(initial_point) + if seq_point_0 is None: + continue + seq_points[seq].append(seq_point_0) + point_tasks.setdefault(seq_point_0, set()) + iter_point = seq_point_0 + for i in range(NUM_RUNAHEAD_SEQ_POINTS - 1): + next_point = seq.get_next_point(iter_point) + if next_point is None: + break + seq_points[seq].append(next_point) + point_tasks.setdefault(next_point, set()) + iter_point = next_point + + # Loop through all tasks and log their sequences. + for name, taskdef in self.taskdefs.items(): + if taskdef.min_intercycle_offset is not None: + if taskdef.min_intercycle_offset: + offsets.add(taskdef.min_intercycle_offset) + for seq in taskdef.sequences: + for point in seq_points.get(seq, []): + point_tasks[point].add(name) + + points = sorted(point_tasks.keys()) + min_interval = None + + # Loop through all point pairs with common tasks to get the interval. + for i, point in enumerate(points): + tasks = point_tasks[point] + for other_point in points[i + 1:]: + other_tasks = point_tasks[other_point] + if point == other_point: + # Technically, hash-different points could compare equal. + continue + common_tasks = tasks.intersection(other_tasks) + if common_tasks: + # These points share the same task or tasks. + interval = abs(other_point - point) + if min_interval is None or interval < min_interval: + min_interval = interval + min_interval_task_example = common_tasks.pop() + min_interval_points = [point, other_point] + + if min_interval is None: + rlim = get_interval_cls().get_null() # Null interval. + description = "(null)" + else: + rlim = min_interval * AUTO_RUNAHEAD_FACTOR + description = "from %d * %s (min interval)" % ( + AUTO_RUNAHEAD_FACTOR, min_interval + ) if offsets: min_offset = min( offsets ) if min_offset < get_interval_cls().get_null(): @@ -573,11 +624,12 @@ def compute_runahead_limit( self ): #... that extend past the default rl # set to offsets plus one minimum interval rlim = abs(min_offset) + rlim + description += " + %s (future trigger)" % ( + abs(min_offset)) + if flags.verbose: + print "Runahead limit: %s: %s" % (rlim, description) self.runahead_limit = rlim - - if flags.verbose: - print "Runahead limit:", self.runahead_limit def get_runahead_limit( self ): # may be None (no cycling tasks) @@ -1282,6 +1334,8 @@ def generate_taskdefs( self, line, left_nodes, right, ttype, section, seq, offset_seq_map[str(offset)] = seq_offset self.taskdefs[name].add_sequence( seq_offset, is_implicit=True) + if seq_offset not in self.sequences: + self.sequences.append(seq_offset) # We don't handle implicit cycling in new-style cycling. else: self.taskdefs[ name ].add_sequence(seq) @@ -1313,20 +1367,23 @@ def generate_triggers( self, lexpression, left_nodes, right, seq, suicide ): # (GraphNodeError checked above) cycle_point = None lnode = graphnode(left, base_interval=base_interval) + ltaskdef = self.taskdefs[lnode.name] + if lnode.intercycle: - self.taskdefs[lnode.name].intercycle = True - if (self.taskdefs[lnode.name].intercycle_offset is None or ( - lnode.offset is not None and - lnode.offset > - self.taskdefs[lnode.name].intercycle_offset)): - self.taskdefs[lnode.name].intercycle_offset = lnode.offset + ltaskdef.intercycle = True + if (ltaskdef.max_intercycle_offset is None or + lnode.offset > ltaskdef.max_intercycle_offset): + ltaskdef.max_intercycle_offset = lnode.offset + if (ltaskdef.min_intercycle_offset is None or + lnode.offset < ltaskdef.min_intercycle_offset): + ltaskdef.min_intercycle_offset = lnode.offset + if lnode.offset_is_from_ict: last_point = seq.get_stop_point() - first_point = self.taskdefs[lnode.name].ict - lnode.offset + first_point = ltaskdef.ict - lnode.offset if first_point and last_point is not None: - self.taskdefs[lnode.name].intercycle_offset = (last_point - first_point) - else: - self.taskdefs[lnode.name].intercycle_offset = None + offset = (last_point - first_point) + ltaskdef.max_intercycle_offset = offset cycle_point = first_point trigger = self.set_trigger( lnode.name, right, lnode.output, lnode.offset, cycle_point, suicide, seq.get_interval() ) if not trigger: diff --git a/lib/cylc/cycling/iso8601.py b/lib/cylc/cycling/iso8601.py index 29d7d1539bd..9ac8063d559 100755 --- a/lib/cylc/cycling/iso8601.py +++ b/lib/cylc/cycling/iso8601.py @@ -381,7 +381,7 @@ def get_next_point_on_sequence(self, point): return result def get_first_point( self, point): - """Return the first point >= to poing, or None if out of bounds.""" + """Return the first point >= to point, or None if out of bounds.""" try: return ISO8601Point(self._cached_first_point_values[point.value]) except KeyError: diff --git a/lib/cylc/taskdef.py b/lib/cylc/taskdef.py index 87c5b868b84..e33e31d700d 100644 --- a/lib/cylc/taskdef.py +++ b/lib/cylc/taskdef.py @@ -63,7 +63,8 @@ def __init__( self, name, rtcfg, run_mode, ict ): # some defaults self.intercycle = False - self.intercycle_offset = get_interval_cls().get_null() + self.max_intercycle_offset = get_interval_cls().get_null() + self.min_intercycle_offset = get_interval_cls().get_null() self.sequential = False self.cycling = False self.modifiers = [] @@ -270,7 +271,7 @@ def tclass_init( sself, start_point, initial_state, stop_c_time=None, sself.startup = startup sself.submit_num = submit_num sself.exists=exists - sself.intercycle_offset = self.intercycle_offset + sself.max_intercycle_offset = self.max_intercycle_offset if self.cycling and startup: # adjust up to the first on-sequence cycle point @@ -282,10 +283,11 @@ def tclass_init( sself, start_point, initial_state, stop_c_time=None, adjusted.append( adj ) if adjusted: sself.tag = min( adjusted ) - if sself.intercycle_offset is None: + if sself.max_intercycle_offset is None: sself.cleanup_cutoff = None else: - sself.cleanup_cutoff = sself.tag + sself.intercycle_offset + sself.cleanup_cutoff = ( + sself.tag + sself.max_intercycle_offset) sself.id = TaskID.get( sself.name, str(sself.tag) ) else: sself.tag = None @@ -294,10 +296,11 @@ def tclass_init( sself, start_point, initial_state, stop_c_time=None, return else: sself.tag = start_point - if sself.intercycle_offset is None: + if sself.max_intercycle_offset is None: sself.cleanup_cutoff = None else: - sself.cleanup_cutoff = sself.tag + sself.intercycle_offset + sself.cleanup_cutoff = ( + sself.tag + sself.max_intercycle_offset) sself.id = TaskID.get( sself.name, str(sself.tag) ) sself.c_time = sself.tag diff --git a/tests/runahead/01-check-default.t b/tests/runahead/01-check-default-simple.t similarity index 93% rename from tests/runahead/01-check-default.t rename to tests/runahead/01-check-default-simple.t index 27571bdc943..98a428a62d3 100644 --- a/tests/runahead/01-check-default.t +++ b/tests/runahead/01-check-default-simple.t @@ -20,10 +20,10 @@ #------------------------------------------------------------------------------- set_test_number 4 #------------------------------------------------------------------------------- -install_suite $TEST_NAME_BASE default +install_suite $TEST_NAME_BASE default-simple #------------------------------------------------------------------------------- TEST_NAME=$TEST_NAME_BASE-validate -run_ok $TEST_NAME cylc validate $SUITE_NAME +run_ok $TEST_NAME cylc validate -v $SUITE_NAME #------------------------------------------------------------------------------- TEST_NAME=$TEST_NAME_BASE-run run_fail $TEST_NAME cylc run --debug $SUITE_NAME @@ -32,7 +32,7 @@ TEST_NAME=$TEST_NAME_BASE-check-fail DB=$(cylc get-global-config --print-run-dir)/$SUITE_NAME/cylc-suite.db RUNAHEAD=$(sqlite3 $DB "select max(cycle) from task_states") # manual comparison for the test -if [[ "$RUNAHEAD" == "20100101T0600Z" ]]; then +if [[ "$RUNAHEAD" == "20100101T1200Z" ]]; then ok $TEST_NAME else fail $TEST_NAME diff --git a/tests/runahead/02-check-default-complex.t b/tests/runahead/02-check-default-complex.t new file mode 100644 index 00000000000..602adf6338b --- /dev/null +++ b/tests/runahead/02-check-default-complex.t @@ -0,0 +1,46 @@ +#!/bin/bash +#C: THIS FILE IS PART OF THE CYLC SUITE ENGINE. +#C: Copyright (C) 2008-2014 Hilary Oliver, NIWA +#C: +#C: This program is free software: you can redistribute it and/or modify +#C: it under the terms of the GNU General Public License as published by +#C: the Free Software Foundation, either version 3 of the License, or +#C: (at your option) any later version. +#C: +#C: This program is distributed in the hope that it will be useful, +#C: but WITHOUT ANY WARRANTY; without even the implied warranty of +#C: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +#C: GNU General Public License for more details. +#C: +#C: You should have received a copy of the GNU General Public License +#C: along with this program. If not, see . +#------------------------------------------------------------------------------- +#C: Test default runahead limit behaviour is still the same +. $(dirname $0)/test_header +#------------------------------------------------------------------------------- +set_test_number 5 +#------------------------------------------------------------------------------- +install_suite $TEST_NAME_BASE default-complex +#------------------------------------------------------------------------------- +TEST_NAME=$TEST_NAME_BASE-validate +run_ok $TEST_NAME cylc validate -v $SUITE_NAME +grep_ok "Runahead limit: PT4H" $TEST_NAME.stdout +#------------------------------------------------------------------------------- +TEST_NAME=$TEST_NAME_BASE-run +run_fail $TEST_NAME cylc run --debug $SUITE_NAME +#------------------------------------------------------------------------------- +TEST_NAME=$TEST_NAME_BASE-check-fail +DB=$(cylc get-global-config --print-run-dir)/$SUITE_NAME/cylc-suite.db +RUNAHEAD=$(sqlite3 $DB "select max(cycle) from task_states") +# manual comparison for the test +if [[ "$RUNAHEAD" == "20100101T0400Z" ]]; then + ok $TEST_NAME +else + fail $TEST_NAME +fi +#------------------------------------------------------------------------------- +TEST_NAME=$TEST_NAME_BASE-check-timeout +LOG=$(cylc get-global-config --print-run-dir)/$SUITE_NAME/log/suite/log +run_ok $TEST_NAME grep 'Abort on suite timeout is set' $LOG +#------------------------------------------------------------------------------- +purge_suite $SUITE_NAME diff --git a/tests/runahead/03-check-default-future.t b/tests/runahead/03-check-default-future.t new file mode 100644 index 00000000000..68f4ea53640 --- /dev/null +++ b/tests/runahead/03-check-default-future.t @@ -0,0 +1,48 @@ +#!/bin/bash +#C: THIS FILE IS PART OF THE CYLC SUITE ENGINE. +#C: Copyright (C) 2008-2014 Hilary Oliver, NIWA +#C: +#C: This program is free software: you can redistribute it and/or modify +#C: it under the terms of the GNU General Public License as published by +#C: the Free Software Foundation, either version 3 of the License, or +#C: (at your option) any later version. +#C: +#C: This program is distributed in the hope that it will be useful, +#C: but WITHOUT ANY WARRANTY; without even the implied warranty of +#C: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +#C: GNU General Public License for more details. +#C: +#C: You should have received a copy of the GNU General Public License +#C: along with this program. If not, see . +#------------------------------------------------------------------------------- +#C: Test default runahead limit behaviour is still the same +. $(dirname $0)/test_header +#------------------------------------------------------------------------------- +set_test_number 5 +#------------------------------------------------------------------------------- +install_suite $TEST_NAME_BASE default-future +#------------------------------------------------------------------------------- +TEST_NAME=$TEST_NAME_BASE-validate +run_ok $TEST_NAME cylc validate -v $SUITE_NAME +grep_ok "Runahead limit: PT8H" $TEST_NAME.stdout +cat $TEST_NAME.stdout >/dev/tty +cat $TEST_NAME.stderr >/dev/tty +#------------------------------------------------------------------------------- +TEST_NAME=$TEST_NAME_BASE-run +run_fail $TEST_NAME cylc run --debug $SUITE_NAME +#------------------------------------------------------------------------------- +TEST_NAME=$TEST_NAME_BASE-check-fail +DB=$(cylc get-global-config --print-run-dir)/$SUITE_NAME/cylc-suite.db +RUNAHEAD=$(sqlite3 $DB "select max(cycle) from task_states") +# manual comparison for the test +if [[ "$RUNAHEAD" == "20100101T0800Z" ]]; then + ok $TEST_NAME +else + fail $TEST_NAME +fi +#------------------------------------------------------------------------------- +TEST_NAME=$TEST_NAME_BASE-check-timeout +LOG=$(cylc get-global-config --print-run-dir)/$SUITE_NAME/log/suite/log +run_ok $TEST_NAME grep 'Abort on suite timeout is set' $LOG +#------------------------------------------------------------------------------- +purge_suite $SUITE_NAME diff --git a/tests/runahead/02-no-final-cycle.t b/tests/runahead/04-no-final-cycle.t similarity index 100% rename from tests/runahead/02-no-final-cycle.t rename to tests/runahead/04-no-final-cycle.t diff --git a/tests/runahead/default-complex/suite.rc b/tests/runahead/default-complex/suite.rc new file mode 100644 index 00000000000..c1793219e77 --- /dev/null +++ b/tests/runahead/default-complex/suite.rc @@ -0,0 +1,24 @@ +[cylc] + UTC mode = True + [[event hooks]] + timeout = 0.1 + abort on timeout = True +[scheduling] + initial cycle point = 20100101T00 + final cycle point = 20100105T00 + [[dependencies]] + # T00, T07, T14, ... + [[[PT7H]]] + graph = "foo => bar" + # T00, T12, T18... 2 hours difference + [[[T00, T12, T18]]] + graph = "foo" + [[[T04]]] + graph = "run_ok" + [[[T05]]] + graph = "never_run" +[runtime] + [[foo]] + command scripting = false + [[bar]] + command scripting = true diff --git a/tests/runahead/default-future/suite.rc b/tests/runahead/default-future/suite.rc new file mode 100644 index 00000000000..396f4ddbf3d --- /dev/null +++ b/tests/runahead/default-future/suite.rc @@ -0,0 +1,23 @@ +[cylc] + UTC mode = True + [[event hooks]] + timeout = 0.1 + abort on timeout = True +[scheduling] + initial cycle point = 20100101T00 + final cycle point = 20100105T00 + [[dependencies]] + # Every hour, so would expect a 2 hour runahead limit. + [[[PT1H]]] + graph = "foo => bar" + # A 6 hour future trigger, => 2 + 6 = 8 hour runahead limit. + [[[T04/PT6H]]] + graph = """ + baz[+PT6H] => bar + baz + """ +[runtime] + [[foo]] + command scripting = false + [[bar]] + command scripting = true diff --git a/tests/runahead/default/suite.rc b/tests/runahead/default-simple/suite.rc similarity index 59% rename from tests/runahead/default/suite.rc rename to tests/runahead/default-simple/suite.rc index 0b1f05d85eb..f8af7de5c1d 100644 --- a/tests/runahead/default/suite.rc +++ b/tests/runahead/default-simple/suite.rc @@ -4,10 +4,11 @@ timeout = 0.1 abort on timeout = True [scheduling] - initial cycle time = 20100101T00 - final cycle time = 20100105T00 + initial cycle point = 20100101T00 + final cycle point = 20100105T00 [[dependencies]] - [[[PT3H]]] + # Intervals are all 24 hours, but we really have a 6 hour repetition. + [[[T00, T06, T12, T18]]] graph = "foo => bar" [runtime] [[foo]]