diff --git a/doc/suiterc.tex b/doc/suiterc.tex index 12843f08aeb..3f6b75372e2 100644 --- a/doc/suiterc.tex +++ b/doc/suiterc.tex @@ -583,20 +583,52 @@ \subsection{[scheduling]} \subsubsection[runahead limit]{[scheduling] $\rightarrow$ runahead limit} +\label{runahead limit} -The suite runahead limit prevents the fastest tasks in a suite from +Runahead limiting prevents the fastest tasks in a suite from getting too far ahead of the slowest ones, as documented in Section~\ref{RunaheadLimit}. Tasks exceeding the limit are put into a special runahead held state until slower tasks have caught up -sufficiently. The default limit is computed as a factor (2) of the -suite's minimum cycling interval, and is up-adjusted from there as -necessary for any longer-offset future triggering. +sufficiently. + +The default behaviour is to limit the number of tasks +by only allowing up to \lstinline=N= (default 3) active cycle points at +any time, up-adjusted if necessary for any future triggering. + +\lstinline=runahead limit= is deprecated in favour of specifying this +maximum number of active cycle points (\ref{max active cycle points}). + \begin{myitemize} \item {\em type:} Cycle interval string e.g. \lstinline=PT12H= for a 12 hour limit under ISO 8601 cycling. - \item {\em default:} (twice the minimum cycling interval in the suite) + \item {\em default:} (none) +\end{myitemize} + + +\subsubsection[max active cycle points]{[scheduling] $\rightarrow$ + max active cycle points} +\label{max active cycle points} + +Runahead limiting prevents the fastest tasks in a suite from +getting too far ahead of the slowest ones, as documented in +Section~\ref{RunaheadLimit}. Tasks exceeding the limit are put into +a special runahead held state until slower tasks have caught up +sufficiently. + +The default behaviour is to limit the number of tasks +by only allowing up to \lstinline=N= (default 3) active cycle points at +any time, up-adjusted if necessary for any future triggering. + +This input allows you to change \lstinline=N=, the maximum +number of cycle points that can be active at once. This supersedes +\ref{runahead limit}. + +\begin{myitemize} + \item {\em type:} integer + \item {\em default:} 3 \end{myitemize} + \subsubsection[{[[}queues{]]}]{[scheduling] $\rightarrow$ [[queues]]} Configuration of internal queues, by which the number of simultaneously diff --git a/lib/cylc/cfgspec/suite.py b/lib/cylc/cfgspec/suite.py index 285a9965911..9ce9174ed7f 100644 --- a/lib/cylc/cfgspec/suite.py +++ b/lib/cylc/cfgspec/suite.py @@ -183,7 +183,8 @@ def _coerce_cycletime_time_zone( value, keys, args ): 'initial cycle point' : vdr(vtype='cycletime'), 'final cycle point' : vdr(vtype='cycletime'), 'cycling mode' : vdr(vtype='string', default="gregorian", options=["360day","gregorian","integer"] ), - 'runahead limit' : vdr(vtype='cycleinterval' ), + 'runahead limit' : vdr(vtype='cycleinterval' ), + 'max active cycle points' : vdr(vtype='integer', default=3), 'queues' : { 'default' : { 'limit' : vdr( vtype='integer', default=0), diff --git a/lib/cylc/config.py b/lib/cylc/config.py index b0c6312d64d..0ccd59b0e5f 100644 --- a/lib/cylc/config.py +++ b/lib/cylc/config.py @@ -118,7 +118,7 @@ def __init__( self, suite, fpath, template_vars=[], self.actual_first_ctime = None self.custom_runahead_limit = None - self.runahead_limit = None + self.max_num_active_cycle_points = None # runtime hierarchy dicts keyed by namespace name: self.runtime = { @@ -286,7 +286,7 @@ def __init__( self, suite, fpath, template_vars=[], if not self.graph_found: raise SuiteConfigError, 'No suite dependency graph defined.' - self.compute_runahead_limit() + self.compute_runahead_limits() self.configure_queues() @@ -309,8 +309,10 @@ def __init__( self, suite, fpath, template_vars=[], self.cfg['visualization']['initial cycle point'] = vict vict_rh = None - if vict and self.runahead_limit: - vict_rh = str( get_point( vict ) + self.runahead_limit ) + v_runahead_limit = ( + self.custom_runahead_limit or self.minimum_runahead_limit) + if vict and v_runahead_limit: + vict_rh = str( get_point( vict ) + v_runahead_limit ) vfct = self.cfg['visualization']['final cycle point'] or vict_rh or vict self.cfg['visualization']['final cycle point'] = vfct @@ -549,96 +551,55 @@ def print_inheritance(self): for item, val in self.runtime[foo].items(): print ' ', ' ', item, val - def compute_runahead_limit( self ): + def compute_runahead_limits( self ): + """Extract the custom and the minimum runahead limits.""" + + self.max_num_active_cycle_points = self.cfg['scheduling'][ + 'max active cycle points'] + limit = self.cfg['scheduling']['runahead limit'] if (limit is not None and limit.isdigit() and get_interval_cls().get_null().TYPE == ISO8601_CYCLING_TYPE): # Backwards-compatibility for raw number of hours. limit = "PT%sH" % limit - self.custom_runahead_limit = get_interval(limit) - if self.custom_runahead_limit is not None: - self.runahead_limit = self.custom_runahead_limit - return - initial_point = get_point( - self.cfg['scheduling']['initial cycle point']) - - offsets = set([]) - seq_points = {} - point_tasks = {} + # The custom runahead limit is None if not user-configured. + self.custom_runahead_limit = get_interval(limit) - # Loop through all sequences and extract the first few points. - for seq in self.sequences: - seq_points[seq] = [] - seq_point_0 = seq.get_first_point(initial_point) - if seq_point_0 is None: - continue - seq_points[seq].append(seq_point_0) - point_tasks.setdefault(seq_point_0, set()) - iter_point = seq_point_0 - for i in range(NUM_RUNAHEAD_SEQ_POINTS - 1): - next_point = seq.get_next_point(iter_point) - if next_point is None: - break - seq_points[seq].append(next_point) - point_tasks.setdefault(next_point, set()) - iter_point = next_point + # Find the minimum runahead limit necessary for any future triggers. + self.minimum_runahead_limit = None - # Loop through all tasks and log their sequences. + offsets = set() for name, taskdef in self.taskdefs.items(): - if taskdef.min_intercycle_offset is not None: - if taskdef.min_intercycle_offset: - offsets.add(taskdef.min_intercycle_offset) - for seq in taskdef.sequences: - for point in seq_points.get(seq, []): - point_tasks[point].add(name) - - points = sorted(point_tasks.keys()) - min_interval = None - - # Loop through all point pairs with common tasks to get the interval. - for i, point in enumerate(points): - tasks = point_tasks[point] - for other_point in points[i + 1:]: - other_tasks = point_tasks[other_point] - if point == other_point: - # Technically, hash-different points could compare equal. - continue - common_tasks = tasks.intersection(other_tasks) - if common_tasks: - # These points share the same task or tasks. - interval = abs(other_point - point) - if min_interval is None or interval < min_interval: - min_interval = interval - min_interval_task_example = common_tasks.pop() - min_interval_points = [point, other_point] - - if min_interval is None: - rlim = get_interval_cls().get_null() # Null interval. - description = "(null)" - else: - rlim = min_interval * AUTO_RUNAHEAD_FACTOR - description = "from %d * %s (min interval)" % ( - AUTO_RUNAHEAD_FACTOR, min_interval - ) + if taskdef.min_intercycle_offset: + offsets.add(taskdef.min_intercycle_offset) + if offsets: - min_offset = min( offsets ) - if min_offset < get_interval_cls().get_null(): - # future triggers... - if abs(min_offset) >= rlim: - #... that extend past the default rl - # set to offsets plus one minimum interval - rlim = abs(min_offset) + rlim - description += " + %s (future trigger)" % ( - abs(min_offset)) - if flags.verbose: - print "Runahead limit: %s: %s" % (rlim, description) + min_offset = min(offsets) + if min_offset < get_interval_cls().get_null(): + # A negative offset comes from future triggering. + self.minimum_runahead_limit = abs(min_offset) + if (self.custom_runahead_limit is not None and + self.custom_runahead_limit < + self.minimum_runahead_limit): + print >> sys.stderr, ( + ' WARNING, custom runahead limit of %s is less than ' + 'future triggering offset %s: suite may stall.' % + (self.custom_runahead_limit, + self.minimum_runahead_limit) + ) + + def get_custom_runahead_limit( self ): + """Return the custom runahead limit (may be None).""" + return self.custom_runahead_limit - self.runahead_limit = rlim + def get_max_num_active_cycle_points( self ): + """Return the maximum allowed number of pool cycle points.""" + return self.max_num_active_cycle_points - def get_runahead_limit( self ): - # may be None (no cycling tasks) - return self.runahead_limit + def get_minimum_runahead_limit( self ): + """Return the minimum runahead limit to apply.""" + return self.minimum_runahead_limit def get_config( self, args, sparse=False ): return self.pcfg.get( args, sparse ) diff --git a/lib/cylc/cycling/__init__.py b/lib/cylc/cycling/__init__.py index a16766290f6..9b84cab2113 100644 --- a/lib/cylc/cycling/__init__.py +++ b/lib/cylc/cycling/__init__.py @@ -117,7 +117,6 @@ def __abs__( self ): raise NotImplementedError() def __mul__( self, m ): - # the suite runahead limit is a multiple of the smallest sequence interval raise NotImplementedError() def __nonzero__(self): diff --git a/lib/cylc/cycling/integer.py b/lib/cylc/cycling/integer.py index b61d5dd5ea4..139c9868098 100755 --- a/lib/cylc/cycling/integer.py +++ b/lib/cylc/cycling/integer.py @@ -44,8 +44,7 @@ # absolute, or relative to some context, so a special character 'c' # is used to signify that context is required. '?' can be used for # the period in one-off (no-repeat) expressions, otherwise an arbitrary -# given value will be ignored (an arbitrary interval is not stored as -# it may affect the default runahead limit calculation). +# given value will be ignored. # # 1) REPEAT/START/PERIOD: R[n]/[c]i/Pi # missing n means repeat indefinitely @@ -131,7 +130,6 @@ def __int__(self): return int(self.value.replace("P", "")) def __mul__(self, m): - # the suite runahead limit is a multiple of the smallest sequence interval return IntegerInterval(int(self) * m) def __nonzero__(self): diff --git a/lib/cylc/cycling/iso8601.py b/lib/cylc/cycling/iso8601.py index 9ac8063d559..52d12950981 100755 --- a/lib/cylc/cycling/iso8601.py +++ b/lib/cylc/cycling/iso8601.py @@ -189,7 +189,6 @@ def __abs__(self): self._iso_interval_abs(self.value, self.NULL_INTERVAL_STRING)) def __mul__(self, m): - # the suite runahead limit is a multiple of the smallest sequence interval return ISO8601Interval(self._iso_interval_mul(self.value, m)) def __nonzero__(self): diff --git a/lib/cylc/task_pool.py b/lib/cylc/task_pool.py index 699bd07fe31..3b9e39798ce 100644 --- a/lib/cylc/task_pool.py +++ b/lib/cylc/task_pool.py @@ -59,9 +59,13 @@ def __init__( self, suite, db, stop_point, config, pyro, log, run_mode ): self.reconfiguring = False self.db = db - self.runahead_limit = config.get_runahead_limit() + self.custom_runahead_limit = config.get_custom_runahead_limit() + self.minimum_runahead_limit = config.get_minimum_runahead_limit() + self.max_num_active_cycle_points = ( + config.get_max_num_active_cycle_points()) self.config = config + self.pool = {} self.runahead_pool = {} self.myq = {} self.queues = {} @@ -139,7 +143,8 @@ def add_to_runahead_pool( self, itask ): itask.reset_state_held() # add to the runahead pool - self.runahead_pool[itask.id] = itask + self.runahead_pool.setdefault(itask.c_time, {}) + self.runahead_pool[itask.c_time][itask.id] = itask self.rhpool_changed = True return True @@ -157,42 +162,76 @@ def release_runahead_tasks( self ): if not self.runahead_pool: return - runahead_base = None - for itask in self.get_tasks(all=True): - if itask.state.is_currently('failed', 'succeeded'): - continue - if not runahead_base or itask.c_time < runahead_base: - runahead_base = itask.c_time - - # release tasks below the limit - if runahead_base: - for itask in self.runahead_pool.values(): - if not self.runahead_limit or \ - itask.c_time - self.runahead_limit <= runahead_base: - # release task to the appropriate queue - # (no runahead limit implies R1 tasks only) - queue = self.myq[itask.name] - if queue not in self.queues: - self.queues[queue] = {} - self.queues[queue][itask.id] = itask - self.pool_changed = True - flags.pflag = True - itask.log('DEBUG', "released to the task pool" ) - del self.runahead_pool[itask.id] - self.rhpool_changed = True - try: - self.pyro.connect( itask.message_queue, itask.id ) - except Exception, x: - if flags.debug: - raise - print >> sys.stderr, x - self.log.warning( itask.id + ' cannot be added (use --debug and see stderr)' ) - return False + points = set() + for point, itasks in self.get_tasks_by_point(all=True): + has_ok_itasks = False + for itask in itasks: + if not itask.state.is_currently('failed', 'succeeded'): + has_ok_itasks = True + break + if has_ok_itasks: + points.add(point) + + if not points: + return + + runahead_base_point = min(points) + + if self.custom_runahead_limit is None: + # Calculate which tasks to release based on a maximum number of + # active cycle points (active meaning non-finished tasks). + limit = self.max_num_active_cycle_points + latest_allowed_point = sorted(points)[:limit][-1] + if self.minimum_runahead_limit is not None: + latest_allowed_point = max([ + latest_allowed_point, + runahead_base_point + self.minimum_runahead_limit + ]) + else: + # Calculate which tasks to release based on a maximum duration + # measured from the oldest non-finished task. + latest_allowed_point = ( + runahead_base_point + self.custom_runahead_limit) + + for point, itask_id_map in self.runahead_pool.values(): + if point <= latest_allowed_point: + for itask in itask_id_map.values(): + self.release_runahead_task(itask) + + def release_runahead_task(itask): + """Release itask to the appropriate queue in the active pool.""" + queue = self.myq[itask.name] + if queue not in self.queues: + self.queues[queue] = {} + self.queues[queue][itask.id] = itask + self.pool.setdefault(itask.c_time, {}) + self.pool[itask.c_time][itask.id] = itask + self.pool_changed = True + flags.pflag = True + itask.log('DEBUG', "released to the task pool" ) + del self.runahead_pool[itask.c_time][itask.id] + if not self.runahead_pool[itask.c_time]: + self.runahead_pool.pop(itask.c_time) + self.rhpool_changed = True + try: + self.pyro.connect( itask.message_queue, itask.id ) + except Exception, x: + if flags.debug: + raise + print >> sys.stderr, x + self.log.warning( + '%s cannot be added (use --debug and see stderr)' % itask.id) + return False def remove( self, itask, reason=None ): - if itask.id in self.runahead_pool: - del self.runahead_pool[itask.id] + try: + del self.runahead_pool[itask.c_time][itask.id] + except KeyError: + pass + else: + if not self.runahead_pool[itask.c_time]: + self.runahead_pool.pop(itask.c_time) self.rhpool_changed = True return @@ -209,6 +248,9 @@ def remove( self, itask, reason=None ): # remove from queue queue = self.myq[itask.name] del self.queues[queue][itask.id] + del self.pool[itask.c_time][itask.id] + if not self.pool[itask.c_time]: + self.pool.pop(itask_c_time) self.pool_changed = True msg = "task proxy removed" if reason: @@ -235,17 +277,33 @@ def get_tasks( self, all=False ): if all: if self.rhpool_changed: self.rhpool_changed = False - self.rhpool_list = self.runahead_pool.values() + self.rhpool_list = [] + for itask_id_maps in self.runahead_pool.values(): + self.rhpool_list.extend(itask_id_maps.values()) return self.rhpool_list + self.pool_list else: return self.pool_list + def get_tasks_by_point( self, all=False ): + """Return a map of task proxies by cycle point.""" + point_itasks = {} + for point, itask_id_map in self.cycle_point_itask_map.items(): + point_itasks[point] = itask_id_map.values() + + if not all: + return point_itasks + + for point, itask_id_map in self.runahead_pool.items(): + point_itasks.setdefault(point, []) + point_itasks[point].extend(itask_id_map.values()) + return point_itasks def id_exists( self, id ): """Check if task id is in the runahead_pool or pool""" - if id in self.runahead_pool: - return True + for point, itask_ids in self.runahead_pool.items(): + if id in itask_ids: + return True for queue in self.queues: if id in self.queues[queue]: return True @@ -340,16 +398,16 @@ def set_runahead( self, interval=None ): if interval is None: # No limit self.log.warning( "setting NO runahead limit" ) - self.runahead_limit = None + self.custom_runahead_limit = None else: self.log.info( "setting runahead limit to " + str(interval) ) - self.runahead_limit = interval + self.custom_runahead_limit = interval self.release_runahead_tasks() def get_min_ctime( self ): """Return the minimum cycle currently in the pool.""" - cycles = [ t.c_time for t in self.get_tasks() ] + cycles = self.pool.keys() minc = None if cycles: minc = min(cycles) @@ -358,7 +416,7 @@ def get_min_ctime( self ): def get_max_ctime( self ): """Return the maximum cycle currently in the pool.""" - cycles = [ t.c_time for t in self.get_tasks() ] + cycles = self.pool.keys() maxc = None if cycles: maxc = max(cycles) @@ -369,7 +427,10 @@ def reconfigure( self, config, stop_point ): self.reconfiguring = True - self.runahead_limit = config.get_runahead_limit() + self.custom_runahead_limit = config.get_custom_runahead_limit() + self.minimum_runahead_limit = config.get_minimum_runahead_limit() + self.max_num_active_cycle_points = ( + config.get_max_num_active_cycle_points()) self.config = config self.stop_point = stop_point # TODO: Any point in using set_stop_point?