Skip to content

Commit

Permalink
Merge pull request #1011 from benfitzpatrick/119.restore-runahead-limit
Browse files Browse the repository at this point in the history
#983: restore runahead limit
  • Loading branch information
hjoliver committed Jul 23, 2014
2 parents 2ef7019 + a13ee12 commit fff9d25
Show file tree
Hide file tree
Showing 41 changed files with 560 additions and 253 deletions.
4 changes: 4 additions & 0 deletions bin/cylc-test-battery
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ machines this may result in some test failures due to timeouts intended
to catch problems that can prevent a suite from shutting down normally.
Use the "-j N" option to change the amount of concurrency.
To output stderr from failed tests to the terminal, export
CYLC_TEST_DEBUG=true before running this command. To perform comparison
tests using `xxdiff -D`, export CYLC_TEST_DEBUG_CMP=true.
For more information see "Reference Tests" in the User Guide.
Options:
Expand Down
49 changes: 49 additions & 0 deletions dev/suites/busy/suite.rc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!jinja2
title = Suite-1000-x3
description = A suite of 1000 tasks per cycle
[cylc]
UTC mode = True # Ignore DST
[scheduling]
initial cycle point = 20130101T00
final cycle point = 20130103T00
runahead limit = PT12H
[[dependencies]]
[[[T00]]]
graph="""
{% for i0 in range(10) -%}
{% for i1 in range(10) -%}
{% for i2 in range(1) -%}
t{{i0}}{{i1}}{{i2}}9[-P1D] => t{{i0}}{{i1}}{{i2}}0
t{{i0}}{{i1}}{{i2}}0 => t{{i0}}{{i1}}{{i2}}1 => \
t{{i0}}{{i1}}{{i2}}2 => t{{i0}}{{i1}}{{i2}}3
t{{i0}}{{i1}}{{i2}}3 => t{{i0}}{{i1}}{{i2}}4 => \
t{{i0}}{{i1}}{{i2}}5 => t{{i0}}{{i1}}{{i2}}6
t{{i0}}{{i1}}{{i2}}6 => t{{i0}}{{i1}}{{i2}}7 => \
t{{i0}}{{i1}}{{i2}}8 => t{{i0}}{{i1}}{{i2}}9
{% endfor -%}
{% endfor -%}
{% endfor -%}
"""
[runtime]
[[root]]
command scripting = sleep 1
[[[job submission]]]
method = at
[[[event hooks]]]
succeeded handler = true
failed handler = true
retry handler = true
submission failed handler = true
submission timeout handler = true
execution timeout handler = true
execution timeout = PT6M
submission timeout = PT1M
{% for i0 in range(10) -%}
{% for i1 in range(10) -%}
{% for i2 in range(1) -%}
{% for i3 in range(10) -%}
[[t{{i0}}{{i1}}{{i2}}{{i3}}]]
{% endfor -%}
{% endfor -%}
{% endfor -%}
{% endfor -%}
2 changes: 1 addition & 1 deletion dev/suites/integer/one/suite.rc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
[scheduling]
initial cycle point = 1
final cycle point = 16
runahead factor = 4
runahead limit = 12
cycling mode = integer
[[special tasks]]
sequential = seq
Expand Down
48 changes: 42 additions & 6 deletions doc/suiterc.tex
Original file line number Diff line number Diff line change
Expand Up @@ -506,19 +506,55 @@ \subsection{[scheduling]}
\end{myitemize}


\subsubsection[runahead limit]{[scheduling] $\rightarrow$ runahead factor}
\subsubsection[runahead limit]{[scheduling] $\rightarrow$ runahead limit}
\label{runahead limit}

The suite runahead limit prevents the fastest tasks in a suite from
Runahead limiting prevents the fastest tasks in a suite from
getting too far ahead of the slowest ones, as documented in
Section~\ref{RunaheadLimit}. Tasks exceeding the limit are put into
a special runahead held state until slower tasks have caught up
sufficiently. The limit computed as this factor of the suite's minimum
cycling interval.
sufficiently.

The default behaviour is to limit the number of tasks
by only allowing up to \lstinline=N= (default 3) consecutive cycle
points to be active at any time, up-adjusted if necessary for any
future triggering.

\lstinline=runahead limit= is deprecated in favour of specifying this
maximum number of active cycle points (\ref{max active cycle points}).

\begin{myitemize}
\item {\em type:} integer multiple of the suite's minimum cycling interval
\item {\em default:} $2$
\item {\em type:} Cycle interval string e.g. \lstinline=PT12H=
for a 12 hour limit under ISO 8601 cycling.
\item {\em default:} (none)
\end{myitemize}


\subsubsection[max active cycle points]{[scheduling] $\rightarrow$
max active cycle points}
\label{max active cycle points}

Runahead limiting prevents the fastest tasks in a suite from
getting too far ahead of the slowest ones, as documented in
Section~\ref{RunaheadLimit}. Tasks exceeding the limit are put into
a special runahead held state until slower tasks have caught up
sufficiently.

The default behaviour is to limit the number of tasks
by only allowing up to \lstinline=N= (default 3) consecutive cycle
points to be active at any time, up-adjusted if necessary for any
future triggering.

This input allows you to change \lstinline=N=, the maximum
number of consecutive cycle points that can be active at once. This
supersedes \ref{runahead limit}.

\begin{myitemize}
\item {\em type:} integer
\item {\em default:} 3
\end{myitemize}


\subsubsection[{[[}queues{]]}]{[scheduling] $\rightarrow$ [[queues]]}

Configuration of internal queues, by which the number of simultaneously
Expand Down
4 changes: 2 additions & 2 deletions examples/future-trigger/suite.rc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

[scheduling]
initial cycle point = 2010080800
runahead factor = 4
initial cycle point = 20100808T00
runahead limit = P1D
[[special tasks]]
cold-start = cold
[[dependencies]]
Expand Down
20 changes: 16 additions & 4 deletions lib/cylc/cfgspec/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,22 @@
from isodatetime.data import Calendar, TimePoint
from isodatetime.parsers import TimePointParser, TimeIntervalParser


"Define all legal items and values for cylc suite definition files."

interval_parser = TimeIntervalParser()

def _coerce_cycleinterval( value, keys, args ):
"""Coerce value to a cycle interval."""
value = _strip_and_unquote( keys, value )
if value.isdigit():
# Old runahead limit format.
return value
parser = TimeIntervalParser()
try:
parser.parse(value)
except ValueError:
raise IllegalValueError("interval", keys, value)
return value

def _coerce_cycletime( value, keys, args ):
"""Coerce value to a cycle point."""
Expand Down Expand Up @@ -140,6 +151,7 @@ def _coerce_interval_list( value, keys, args, back_comp_unit_factor=1 ):
coercers['cycletime'] = _coerce_cycletime
coercers['cycletime_format'] = _coerce_cycletime_format
coercers['cycletime_time_zone'] = _coerce_cycletime_time_zone
coercers['cycleinterval'] = _coerce_cycleinterval
coercers['interval'] = _coerce_interval
coercers['interval_minutes'] = lambda *a: _coerce_interval(
*a, back_comp_unit_factor=60)
Expand All @@ -149,6 +161,7 @@ def _coerce_interval_list( value, keys, args, back_comp_unit_factor=1 ):
*a, back_comp_unit_factor=60)
coercers['interval_seconds_list'] = _coerce_interval_list


SPEC = {
'title' : vdr( vtype='string', default="" ),
'description' : vdr( vtype='string', default="" ),
Expand Down Expand Up @@ -200,7 +213,8 @@ def _coerce_interval_list( value, keys, args, back_comp_unit_factor=1 ):
'initial cycle point' : vdr(vtype='cycletime'),
'final cycle point' : vdr(vtype='cycletime'),
'cycling mode' : vdr(vtype='string', default=Calendar.MODE_GREGORIAN, options=Calendar.MODES.keys() + ["integer"] ),
'runahead factor' : vdr(vtype='integer', default=2 ),
'runahead limit' : vdr(vtype='cycleinterval' ),
'max active cycle points' : vdr(vtype='integer', default=3),
'queues' : {
'default' : {
'limit' : vdr( vtype='integer', default=0),
Expand Down Expand Up @@ -352,8 +366,6 @@ def upg( cfg, descr ):
['visualization', 'final cycle time'], ['visualization', 'final cycle point'],
converter( lambda x: x, 'changed naming to reflect non-date-time cycling' )
)
u.deprecate( '6.0.0', ['scheduling', 'runahead limit'], ['scheduling', 'runahead factor'],
converter( lambda x:'2', 'using default runahead factor' ))
u.obsolete('6.0.0', ['scheduling', 'dependencies', '__MANY__', 'daemon'])
u.obsolete('6.0.0', ['cylc', 'job submission'])
u.obsolete('6.0.0', ['cylc', 'event handler submission'])
Expand Down
120 changes: 72 additions & 48 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import re, os, sys
import taskdef
from cylc.cfgspec.suite import get_suitecfg
from cylc.cycling.loader import (get_point, get_interval_cls,
from cylc.cycling.loader import (get_point,
get_interval, get_interval_cls,
get_sequence, get_sequence_cls,
init_cyclers, INTEGER_CYCLING_TYPE,
ISO8601_CYCLING_TYPE,
get_backwards_compatibility_mode)
from envvar import check_varnames, expandvars
from copy import deepcopy, copy
Expand All @@ -42,7 +44,9 @@
checking, then construct task proxy objects and graph structures.
"""

AUTO_RUNAHEAD_FACTOR = 2 # Factor to apply to the minimum cycling interval.
CLOCK_OFFSET_RE = re.compile('(\w+)\s*\(\s*([-+]*\s*[\d.]+)\s*\)')
NUM_RUNAHEAD_SEQ_POINTS = 5 # Number of cycle points to look at per sequence.
TRIGGER_TYPES = [ 'submit', 'submit-fail', 'start', 'succeed', 'fail', 'finish' ]

try:
Expand Down Expand Up @@ -113,7 +117,8 @@ def __init__( self, suite, fpath, template_vars=[],
self.sequences = []
self.actual_first_ctime = None

self.runahead_limit = None
self.custom_runahead_limit = None
self.max_num_active_cycle_points = None

# runtime hierarchy dicts keyed by namespace name:
self.runtime = {
Expand Down Expand Up @@ -296,7 +301,7 @@ def __init__( self, suite, fpath, template_vars=[],
if not self.graph_found:
raise SuiteConfigError, 'No suite dependency graph defined.'

self.compute_runahead_limit()
self.compute_runahead_limits()

self.configure_queues()

Expand All @@ -319,8 +324,10 @@ def __init__( self, suite, fpath, template_vars=[],
self.cfg['visualization']['initial cycle point'] = vict

vict_rh = None
if vict and self.runahead_limit:
vict_rh = str( get_point( vict ) + self.runahead_limit )
v_runahead_limit = (
self.custom_runahead_limit or self.minimum_runahead_limit)
if vict and v_runahead_limit:
vict_rh = str( get_point( vict ) + v_runahead_limit )

vfct = self.cfg['visualization']['final cycle point'] or vict_rh or vict
self.cfg['visualization']['final cycle point'] = vfct
Expand Down Expand Up @@ -559,43 +566,55 @@ def print_inheritance(self):
for item, val in self.runtime[foo].items():
print ' ', ' ', item, val

def compute_runahead_limit( self ):
rfactor = self.cfg['scheduling']['runahead factor']
if not rfactor:
# no runahead limit!
return
try:
rfactor = int( rfactor )
except ValueError:
raise SuiteConfigError, "ERROR, illegal runahead limit: " + str(rfactor)
def compute_runahead_limits( self ):
"""Extract the custom and the minimum runahead limits."""

rlim = None
intervals = []
offsets = []
for seq in self.sequences:
i = seq.get_interval()
if i:
intervals.append( i )
offsets.append( seq.get_offset() )
self.max_num_active_cycle_points = self.cfg['scheduling'][
'max active cycle points']

limit = self.cfg['scheduling']['runahead limit']
if (limit is not None and limit.isdigit() and
get_interval_cls().get_null().TYPE == ISO8601_CYCLING_TYPE):
# Backwards-compatibility for raw number of hours.
limit = "PT%sH" % limit

# The custom runahead limit is None if not user-configured.
self.custom_runahead_limit = get_interval(limit)

# Find the minimum runahead limit necessary for any future triggers.
self.minimum_runahead_limit = None

offsets = set()
for name, taskdef in self.taskdefs.items():
if taskdef.min_intercycle_offset:
offsets.add(taskdef.min_intercycle_offset)

if intervals:
rlim = min( intervals ) * rfactor
if offsets:
min_offset = min( offsets )
if min_offset < get_interval_cls().get_null():
# future triggers...
if abs(min_offset) >= rlim:
#... that extend past the default rl
# set to offsets plus one minimum interval
rlim = abs(min_offset) + rlim

self.runahead_limit = rlim
if flags.verbose:
print "Runahead limit:", self.runahead_limit
min_offset = min(offsets)
if min_offset < get_interval_cls().get_null():
# A negative offset comes from future triggering.
self.minimum_runahead_limit = abs(min_offset)
if (self.custom_runahead_limit is not None and
self.custom_runahead_limit <
self.minimum_runahead_limit):
print >> sys.stderr, (
' WARNING, custom runahead limit of %s is less than '
'future triggering offset %s: suite may stall.' %
(self.custom_runahead_limit,
self.minimum_runahead_limit)
)

def get_custom_runahead_limit( self ):
"""Return the custom runahead limit (may be None)."""
return self.custom_runahead_limit

def get_max_num_active_cycle_points( self ):
"""Return the maximum allowed number of pool cycle points."""
return self.max_num_active_cycle_points

def get_runahead_limit( self ):
# may be None (no cycling tasks)
return self.runahead_limit
def get_minimum_runahead_limit( self ):
"""Return the minimum runahead limit to apply."""
return self.minimum_runahead_limit

def get_config( self, args, sparse=False ):
return self.pcfg.get( args, sparse )
Expand Down Expand Up @@ -1295,6 +1314,8 @@ def generate_taskdefs( self, line, left_nodes, right, ttype, section, seq,
offset_seq_map[str(offset)] = seq_offset
self.taskdefs[name].add_sequence(
seq_offset, is_implicit=True)
if seq_offset not in self.sequences:
self.sequences.append(seq_offset)
# We don't handle implicit cycling in new-style cycling.
else:
self.taskdefs[ name ].add_sequence(seq)
Expand Down Expand Up @@ -1326,20 +1347,23 @@ def generate_triggers( self, lexpression, left_nodes, right, seq, suicide ):
# (GraphNodeError checked above)
cycle_point = None
lnode = graphnode(left, base_interval=base_interval)
ltaskdef = self.taskdefs[lnode.name]

if lnode.intercycle:
self.taskdefs[lnode.name].intercycle = True
if (self.taskdefs[lnode.name].intercycle_offset is None or (
lnode.offset is not None and
lnode.offset >
self.taskdefs[lnode.name].intercycle_offset)):
self.taskdefs[lnode.name].intercycle_offset = lnode.offset
ltaskdef.intercycle = True
if (ltaskdef.max_intercycle_offset is None or
lnode.offset > ltaskdef.max_intercycle_offset):
ltaskdef.max_intercycle_offset = lnode.offset
if (ltaskdef.min_intercycle_offset is None or
lnode.offset < ltaskdef.min_intercycle_offset):
ltaskdef.min_intercycle_offset = lnode.offset

if lnode.offset_is_from_ict:
last_point = seq.get_stop_point()
first_point = self.taskdefs[lnode.name].ict - lnode.offset
first_point = ltaskdef.ict - lnode.offset
if first_point and last_point is not None:
self.taskdefs[lnode.name].intercycle_offset = (last_point - first_point)
else:
self.taskdefs[lnode.name].intercycle_offset = None
offset = (last_point - first_point)
ltaskdef.max_intercycle_offset = offset
cycle_point = first_point
trigger = self.set_trigger( lnode.name, right, lnode.output, lnode.offset, cycle_point, suicide, seq.get_interval() )
if not trigger:
Expand Down
1 change: 0 additions & 1 deletion lib/cylc/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ def __abs__( self ):
raise NotImplementedError()

def __mul__( self, m ):
# the suite runahead limit is a multiple of the smallest sequence interval
raise NotImplementedError()

def __nonzero__(self):
Expand Down
Loading

0 comments on commit fff9d25

Please sign in to comment.