Skip to content

Commit

Permalink
cylc#983: first draft of runahead-by-num-cycle-points
Browse files Browse the repository at this point in the history
  • Loading branch information
benfitzpatrick committed Jul 14, 2014
1 parent 52ab258 commit 06b3150
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 136 deletions.
42 changes: 37 additions & 5 deletions doc/suiterc.tex
Original file line number Diff line number Diff line change
Expand Up @@ -583,20 +583,52 @@ \subsection{[scheduling]}


\subsubsection[runahead limit]{[scheduling] $\rightarrow$ runahead limit}
\label{runahead limit}

The suite runahead limit prevents the fastest tasks in a suite from
Runahead limiting prevents the fastest tasks in a suite from
getting too far ahead of the slowest ones, as documented in
Section~\ref{RunaheadLimit}. Tasks exceeding the limit are put into
a special runahead held state until slower tasks have caught up
sufficiently. The default limit is computed as a factor (2) of the
suite's minimum cycling interval, and is up-adjusted from there as
necessary for any longer-offset future triggering.
sufficiently.

The default behaviour is to limit the number of tasks
by only allowing up to \lstinline=N= (default 3) active cycle points at
any time, up-adjusted if necessary for any future triggering.

\lstinline=runahead limit= is deprecated in favour of specifying this
maximum number of active cycle points (\ref{max active cycle points}).

\begin{myitemize}
\item {\em type:} Cycle interval string e.g. \lstinline=PT12H=
for a 12 hour limit under ISO 8601 cycling.
\item {\em default:} (twice the minimum cycling interval in the suite)
\item {\em default:} (none)
\end{myitemize}


\subsubsection[max active cycle points]{[scheduling] $\rightarrow$
max active cycle points}
\label{max active cycle points}

Runahead limiting prevents the fastest tasks in a suite from
getting too far ahead of the slowest ones, as documented in
Section~\ref{RunaheadLimit}. Tasks exceeding the limit are put into
a special runahead held state until slower tasks have caught up
sufficiently.

The default behaviour is to limit the number of tasks
by only allowing up to \lstinline=N= (default 3) active cycle points at
any time, up-adjusted if necessary for any future triggering.

This input allows you to change \lstinline=N=, the maximum
number of cycle points that can be active at once. This supersedes
\ref{runahead limit}.

\begin{myitemize}
\item {\em type:} integer
\item {\em default:} 3
\end{myitemize}


\subsubsection[{[[}queues{]]}]{[scheduling] $\rightarrow$ [[queues]]}

Configuration of internal queues, by which the number of simultaneously
Expand Down
3 changes: 2 additions & 1 deletion lib/cylc/cfgspec/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ def _coerce_cycletime_time_zone( value, keys, args ):
'initial cycle point' : vdr(vtype='cycletime'),
'final cycle point' : vdr(vtype='cycletime'),
'cycling mode' : vdr(vtype='string', default="gregorian", options=["360day","gregorian","integer"] ),
'runahead limit' : vdr(vtype='cycleinterval' ),
'runahead limit' : vdr(vtype='cycleinterval' ),
'max active cycle points' : vdr(vtype='integer', default=3),
'queues' : {
'default' : {
'limit' : vdr( vtype='integer', default=0),
Expand Down
125 changes: 43 additions & 82 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def __init__( self, suite, fpath, template_vars=[],
self.actual_first_ctime = None

self.custom_runahead_limit = None
self.runahead_limit = None
self.max_num_active_cycle_points = None

# runtime hierarchy dicts keyed by namespace name:
self.runtime = {
Expand Down Expand Up @@ -286,7 +286,7 @@ def __init__( self, suite, fpath, template_vars=[],
if not self.graph_found:
raise SuiteConfigError, 'No suite dependency graph defined.'

self.compute_runahead_limit()
self.compute_runahead_limits()

self.configure_queues()

Expand All @@ -309,8 +309,10 @@ def __init__( self, suite, fpath, template_vars=[],
self.cfg['visualization']['initial cycle point'] = vict

vict_rh = None
if vict and self.runahead_limit:
vict_rh = str( get_point( vict ) + self.runahead_limit )
v_runahead_limit = (
self.custom_runahead_limit or self.minimum_runahead_limit)
if vict and v_runahead_limit:
vict_rh = str( get_point( vict ) + v_runahead_limit )

vfct = self.cfg['visualization']['final cycle point'] or vict_rh or vict
self.cfg['visualization']['final cycle point'] = vfct
Expand Down Expand Up @@ -549,96 +551,55 @@ def print_inheritance(self):
for item, val in self.runtime[foo].items():
print ' ', ' ', item, val

def compute_runahead_limit( self ):
def compute_runahead_limits( self ):
"""Extract the custom and the minimum runahead limits."""

self.max_num_active_cycle_points = self.cfg['scheduling'][
'max active cycle points']

limit = self.cfg['scheduling']['runahead limit']
if (limit is not None and limit.isdigit() and
get_interval_cls().get_null().TYPE == ISO8601_CYCLING_TYPE):
# Backwards-compatibility for raw number of hours.
limit = "PT%sH" % limit
self.custom_runahead_limit = get_interval(limit)
if self.custom_runahead_limit is not None:
self.runahead_limit = self.custom_runahead_limit
return

initial_point = get_point(
self.cfg['scheduling']['initial cycle point'])

offsets = set([])
seq_points = {}
point_tasks = {}
# The custom runahead limit is None if not user-configured.
self.custom_runahead_limit = get_interval(limit)

# Loop through all sequences and extract the first few points.
for seq in self.sequences:
seq_points[seq] = []
seq_point_0 = seq.get_first_point(initial_point)
if seq_point_0 is None:
continue
seq_points[seq].append(seq_point_0)
point_tasks.setdefault(seq_point_0, set())
iter_point = seq_point_0
for i in range(NUM_RUNAHEAD_SEQ_POINTS - 1):
next_point = seq.get_next_point(iter_point)
if next_point is None:
break
seq_points[seq].append(next_point)
point_tasks.setdefault(next_point, set())
iter_point = next_point
# Find the minimum runahead limit necessary for any future triggers.
self.minimum_runahead_limit = None

# Loop through all tasks and log their sequences.
offsets = set()
for name, taskdef in self.taskdefs.items():
if taskdef.min_intercycle_offset is not None:
if taskdef.min_intercycle_offset:
offsets.add(taskdef.min_intercycle_offset)
for seq in taskdef.sequences:
for point in seq_points.get(seq, []):
point_tasks[point].add(name)

points = sorted(point_tasks.keys())
min_interval = None

# Loop through all point pairs with common tasks to get the interval.
for i, point in enumerate(points):
tasks = point_tasks[point]
for other_point in points[i + 1:]:
other_tasks = point_tasks[other_point]
if point == other_point:
# Technically, hash-different points could compare equal.
continue
common_tasks = tasks.intersection(other_tasks)
if common_tasks:
# These points share the same task or tasks.
interval = abs(other_point - point)
if min_interval is None or interval < min_interval:
min_interval = interval
min_interval_task_example = common_tasks.pop()
min_interval_points = [point, other_point]

if min_interval is None:
rlim = get_interval_cls().get_null() # Null interval.
description = "(null)"
else:
rlim = min_interval * AUTO_RUNAHEAD_FACTOR
description = "from %d * %s (min interval)" % (
AUTO_RUNAHEAD_FACTOR, min_interval
)
if taskdef.min_intercycle_offset:
offsets.add(taskdef.min_intercycle_offset)

if offsets:
min_offset = min( offsets )
if min_offset < get_interval_cls().get_null():
# future triggers...
if abs(min_offset) >= rlim:
#... that extend past the default rl
# set to offsets plus one minimum interval
rlim = abs(min_offset) + rlim
description += " + %s (future trigger)" % (
abs(min_offset))
if flags.verbose:
print "Runahead limit: %s: %s" % (rlim, description)
min_offset = min(offsets)
if min_offset < get_interval_cls().get_null():
# A negative offset comes from future triggering.
self.minimum_runahead_limit = abs(min_offset)
if (self.custom_runahead_limit is not None and
self.custom_runahead_limit <
self.minimum_runahead_limit):
print >> sys.stderr, (
' WARNING, custom runahead limit of %s is less than '
'future triggering offset %s: suite may stall.' %
(self.custom_runahead_limit,
self.minimum_runahead_limit)
)

def get_custom_runahead_limit( self ):
"""Return the custom runahead limit (may be None)."""
return self.custom_runahead_limit

self.runahead_limit = rlim
def get_max_num_active_cycle_points( self ):
"""Return the maximum allowed number of pool cycle points."""
return self.max_num_active_cycle_points

def get_runahead_limit( self ):
# may be None (no cycling tasks)
return self.runahead_limit
def get_minimum_runahead_limit( self ):
"""Return the minimum runahead limit to apply."""
return self.minimum_runahead_limit

def get_config( self, args, sparse=False ):
return self.pcfg.get( args, sparse )
Expand Down
1 change: 0 additions & 1 deletion lib/cylc/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def __abs__( self ):
raise NotImplementedError()

def __mul__( self, m ):
# the suite runahead limit is a multiple of the smallest sequence interval
raise NotImplementedError()

def __nonzero__(self):
Expand Down
4 changes: 1 addition & 3 deletions lib/cylc/cycling/integer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@
# absolute, or relative to some context, so a special character 'c'
# is used to signify that context is required. '?' can be used for
# the period in one-off (no-repeat) expressions, otherwise an arbitrary
# given value will be ignored (an arbitrary interval is not stored as
# it may affect the default runahead limit calculation).
# given value will be ignored.
#
# 1) REPEAT/START/PERIOD: R[n]/[c]i/Pi
# missing n means repeat indefinitely
Expand Down Expand Up @@ -131,7 +130,6 @@ def __int__(self):
return int(self.value.replace("P", ""))

def __mul__(self, m):
# the suite runahead limit is a multiple of the smallest sequence interval
return IntegerInterval(int(self) * m)

def __nonzero__(self):
Expand Down
1 change: 0 additions & 1 deletion lib/cylc/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ def __abs__(self):
self._iso_interval_abs(self.value, self.NULL_INTERVAL_STRING))

def __mul__(self, m):
# the suite runahead limit is a multiple of the smallest sequence interval
return ISO8601Interval(self._iso_interval_mul(self.value, m))

def __nonzero__(self):
Expand Down
Loading

0 comments on commit 06b3150

Please sign in to comment.