Skip to content

Commit

Permalink
Merge pull request #1007 from benfitzpatrick/iso.suite-rc-intervals
Browse files Browse the repository at this point in the history
#119: accept suite.rc interval configuration in ISO 8601 formats
  • Loading branch information
hjoliver committed Jul 9, 2014
2 parents 6e26156 + 5ad7e46 commit 5abb581
Show file tree
Hide file tree
Showing 36 changed files with 1,264 additions and 247 deletions.
220 changes: 114 additions & 106 deletions doc/suiterc.tex

Large diffs are not rendered by default.

82 changes: 63 additions & 19 deletions lib/cylc/cfgspec/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@
import re

from parsec.validate import validator as vdr
from parsec.validate import coercers, _strip_and_unquote, IllegalValueError
from parsec.validate import (
coercers, _strip_and_unquote, _strip_and_unquote_list, _expand_list,
IllegalValueError
)
from parsec.upgrade import upgrader, converter
from parsec.fileparse import parse
from parsec.config import config
from isodatetime.dumpers import TimePointDumper
from isodatetime.data import TimePoint
from isodatetime.parsers import TimePointParser
from isodatetime.data import TimePoint, SECONDS_IN_DAY
from isodatetime.parsers import TimePointParser, TimeIntervalParser


"Define all legal items and values for cylc suite definition files."

interval_parser = TimeIntervalParser()


def _coerce_cycletime( value, keys, args ):
"""Coerce value to a cycle point."""
value = _strip_and_unquote( keys, value )
Expand Down Expand Up @@ -101,10 +108,46 @@ def _coerce_cycletime_time_zone( value, keys, args ):
return value


def _coerce_interval( value, keys, args, back_comp_unit_factor=1 ):
"""Coerce an ISO 8601 interval (or number: back-comp) into seconds."""
value = _strip_and_unquote( keys, value )
try:
return float(value) * back_comp_unit_factor
except (TypeError, ValueError):
pass
try:
interval = interval_parser.parse(value)
except ValueError:
raise IllegalValueError("ISO 8601 interval", keys, value)
days, seconds = interval.get_days_and_seconds()
seconds += days * SECONDS_IN_DAY
return seconds


def _coerce_interval_list( value, keys, args, back_comp_unit_factor=1 ):
"""Coerce a list of intervals (or numbers: back-comp) into seconds."""
values_list = _strip_and_unquote_list( keys, value )
type_converter = (
lambda v: _coerce_interval(
v, keys, args,
back_comp_unit_factor=back_comp_unit_factor
)
)
seconds_list = _expand_list( values_list, keys, type_converter, True )
return seconds_list


coercers['cycletime'] = _coerce_cycletime
coercers['cycletime_format'] = _coerce_cycletime_format
coercers['cycletime_time_zone'] = _coerce_cycletime_time_zone

coercers['interval'] = _coerce_interval
coercers['interval_minutes'] = lambda *a: _coerce_interval(
*a, back_comp_unit_factor=60)
coercers['interval_seconds'] = _coerce_interval
coercers['interval_list'] = _coerce_interval_list
coercers['interval_minutes_list'] = lambda *a: _coerce_interval_list(
*a, back_comp_unit_factor=60)
coercers['interval_seconds_list'] = _coerce_interval_list

SPEC = {
'title' : vdr( vtype='string', default="" ),
Expand All @@ -120,11 +163,11 @@ def _coerce_cycletime_time_zone( value, keys, args ):
'log resolved dependencies' : vdr( vtype='boolean', default=False ),
'job submission' : {
'batch size' : vdr( vtype='integer', vmin=1, default=10 ),
'delay between batches' : vdr( vtype='integer', vmin=0, default=0 ),
'delay between batches' : vdr( vtype='interval_seconds', vmin=0, default=0 ),
},
'event handler submission' : {
'batch size' : vdr( vtype='integer', vmin=1, default=10 ),
'delay between batches' : vdr( vtype='integer', vmin=0, default=0 ),
'delay between batches' : vdr( vtype='interval_seconds', vmin=0, default=0 ),
},
'poll and kill command submission' : {
'batch size' : vdr( vtype='integer', vmin=1, default=10 ),
Expand All @@ -141,7 +184,7 @@ def _coerce_cycletime_time_zone( value, keys, args ):
'startup handler' : vdr( vtype='string_list', default=[] ),
'timeout handler' : vdr( vtype='string_list', default=[] ),
'shutdown handler' : vdr( vtype='string_list', default=[] ),
'timeout' : vdr( vtype='float' ),
'timeout' : vdr( vtype='interval_minutes' ),
'reset timer' : vdr( vtype='boolean', default=True ),
'abort if startup handler fails' : vdr( vtype='boolean', default=False ),
'abort if shutdown handler fails' : vdr( vtype='boolean', default=False ),
Expand All @@ -159,9 +202,9 @@ def _coerce_cycletime_time_zone( value, keys, args ):
'required run mode' : vdr( vtype='string', options=[ 'live','simulation','dummy'] ),
'allow task failures' : vdr( vtype='boolean', default=False ),
'expected task failures' : vdr( vtype='string_list', default=[] ),
'live mode suite timeout' : vdr( vtype='float', default=1.0 ),
'dummy mode suite timeout' : vdr( vtype='float', default=1.0 ),
'simulation mode suite timeout' : vdr( vtype='float', default=1.0 ),
'live mode suite timeout' : vdr( vtype='interval_minutes', default=60 ),
'dummy mode suite timeout' : vdr( vtype='interval_minutes', default=60 ),
'simulation mode suite timeout' : vdr( vtype='interval_minutes', default=60 ),
},
},
'scheduling' : {
Expand Down Expand Up @@ -204,19 +247,19 @@ def _coerce_cycletime_time_zone( value, keys, args ):
'pre-command scripting' : vdr( vtype='string' ),
'command scripting' : vdr( vtype='string', default='echo Default command scripting; sleep $(cylc rnd 1 16)'),
'post-command scripting' : vdr( vtype='string' ),
'retry delays' : vdr( vtype='float_list', default=[] ),
'retry delays' : vdr( vtype='interval_minutes_list', default=[] ),
'manual completion' : vdr( vtype='boolean', default=False ),
'extra log files' : vdr( vtype='string_list', default=[] ),
'enable resurrection' : vdr( vtype='boolean', default=False ),
'work sub-directory' : vdr( vtype='string', default='$CYLC_TASK_ID' ),
'submission polling intervals' : vdr( vtype='float_list', default=[] ),
'execution polling intervals' : vdr( vtype='float_list', default=[] ),
'submission polling intervals' : vdr( vtype='interval_minutes_list', default=[] ),
'execution polling intervals' : vdr( vtype='interval_minutes_list', default=[] ),
'environment filter' : {
'include' : vdr( vtype='string_list' ),
'exclude' : vdr( vtype='string_list' ),
},
'simulation mode' : {
'run time range' : vdr( vtype='integer_list', default=[1,16]),
'run time range' : vdr( vtype='interval_seconds_list', default=[1, 16]),
'simulate failure' : vdr( vtype='boolean', default=False ),
'disable task event hooks' : vdr( vtype='boolean', default=True ),
'disable retries' : vdr( vtype='boolean', default=True ),
Expand All @@ -232,7 +275,7 @@ def _coerce_cycletime_time_zone( value, keys, args ):
'method' : vdr( vtype='string', default='background' ),
'command template' : vdr( vtype='string' ),
'shell' : vdr( vtype='string', default='/bin/bash' ),
'retry delays' : vdr( vtype='float_list', default=[] ),
'retry delays' : vdr( vtype='interval_minutes_list', default=[] ),
},
'remote' : {
'host' : vdr( vtype='string' ),
Expand All @@ -249,15 +292,15 @@ def _coerce_cycletime_time_zone( value, keys, args ):
'retry handler' : vdr( vtype='string_list', default=[] ),
'submission retry handler' : vdr( vtype='string_list', default=[] ),
'submission timeout handler' : vdr( vtype='string_list', default=[] ),
'submission timeout' : vdr( vtype='float' ),
'submission timeout' : vdr( vtype='interval_minutes' ),
'execution timeout handler' : vdr( vtype='string_list', default=[] ),
'execution timeout' : vdr( vtype='float'),
'execution timeout' : vdr( vtype='interval_minutes'),
'reset timer' : vdr( vtype='boolean', default=False ),
},
'suite state polling' : {
'user' : vdr( vtype='string' ),
'host' : vdr( vtype='string' ),
'interval' : vdr( vtype='integer' ),
'interval' : vdr( vtype='interval_seconds' ),
'max-polls' : vdr( vtype='integer' ),
'run-dir' : vdr( vtype='string' ),
'verbose mode' : vdr( vtype='boolean' ),
Expand Down Expand Up @@ -328,9 +371,11 @@ def upg( cfg, descr ):
class sconfig( config ):
pass


suitecfg = None
cfpath = None


def get_suitecfg( fpath, force=False, tvars=[], tvars_file=None, write_proc=False ):
global suitecfg, cfpath
if not suitecfg or fpath != cfpath or force:
Expand All @@ -339,4 +384,3 @@ def get_suitecfg( fpath, force=False, tvars=[], tvars_file=None, write_proc=Fals
suitecfg = sconfig( SPEC, upg, tvars=tvars, tvars_file=tvars_file, write_proc=write_proc )
suitecfg.loadcfg( fpath, "suite definition", strict=True )
return suitecfg

1 change: 0 additions & 1 deletion lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1750,4 +1750,3 @@ def get_task_proxy_raw( self, name, tag, state, stoptag, startup, submit_num, ex

def get_task_class( self, name ):
return self.taskdefs[name].get_task_class()

2 changes: 1 addition & 1 deletion lib/cylc/cycling/iso8601.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _wrapper(*args):
results = function(*args)
if len(inputs_results) > MEMOIZE_LIMIT:
# Full up, no more room.
return results
inputs_results.popitem()
inputs_results[args] = results
return results
return _wrapper
Expand Down
28 changes: 19 additions & 9 deletions lib/cylc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
from Queue import Queue, Empty
from batch_submit import event_batcher, poll_and_kill_batcher
import subprocess
from wallclock import now, get_current_time_string
from wallclock import (
now, get_current_time_string, get_seconds_as_interval_string)
from cycling.loader import get_point
import isodatetime.data
import isodatetime.parsers
Expand Down Expand Up @@ -581,9 +582,15 @@ def command_set_runahead( self, *args ):
#___________________________________________________________________

def set_suite_timer( self, reset=False ):
self.suite_timer_start = time.time()
self.suite_timer_timeout = time.time() + (
self.config.cfg['cylc']['event hooks']['timeout']
)
if flags.verbose:
print str(self.config.cfg['cylc']['event hooks']['timeout']) + " minute suite timer starts NOW:", get_current_time_string()
print "%s suite timer starts NOW: %s" % (
get_seconds_as_interval_string(
self.config.cfg['cylc']['event hooks']['timeout']),
get_current_time_string()
)


def reconfigure( self ):
Expand Down Expand Up @@ -716,7 +723,8 @@ def configure_suite( self, reconfigure=False ):
'Event Handlers', self.event_queue,
self.config.cfg['cylc']['event handler submission']['batch size'],
self.config.cfg['cylc']['event handler submission']['delay between batches'],
self.suite )
self.suite
)
self.eventq_worker.start()

self.poll_and_kill_queue = Queue()
Expand All @@ -725,7 +733,8 @@ def configure_suite( self, reconfigure=False ):
'Poll & Kill Commands', self.poll_and_kill_queue,
self.config.cfg['cylc']['poll and kill command submission']['batch size'],
self.config.cfg['cylc']['poll and kill command submission']['delay between batches'],
self.suite )
self.suite
)
self.pollkq_worker.start()

self.info_interface = info_interface( self.info_commands )
Expand Down Expand Up @@ -1020,11 +1029,12 @@ def process_resolved( self, tasks ):
def check_suite_timer( self ):
if self.already_timed_out:
return
timeout = self.suite_timer_start + 60 * (
self.config.cfg['cylc']['event hooks']['timeout'])
if time.time() > timeout:
if time.time() > self.suite_timer_timeout:
self.already_timed_out = True
message = 'suite timed out after ' + str( self.config.cfg['cylc']['event hooks']['timeout']) + ' minutes'
message = 'suite timed out after %s' % (
get_seconds_as_interval_string(
self.config.cfg['cylc']['event hooks']['timeout'])
)
self.log.warning( message )
abort = self.config.cfg['cylc']['event hooks']['abort if timeout handler fails']
self.run_event_handlers( 'timeout', abort, message )
Expand Down
6 changes: 4 additions & 2 deletions lib/cylc/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,14 @@ def reload_taskdefs( self ):
if itask.state.is_currently( 'retrying' ):
new_task.retry_delay = itask.retry_delay
new_task.retry_delays = itask.retry_delays
new_task.retry_delay_timer_start = itask.retry_delay_timer_start
new_task.retry_delay_timer_timeout = (
itask.retry_delay_timer_timeout)
elif itask.state.is_currently( 'submit-retrying' ):
new_task.sub_retry_delay = itask.sub_retry_delay
new_task.sub_retry_delays = itask.sub_retry_delays
new_task.sub_retry_delays_orig = itask.sub_retry_delays_orig
new_task.sub_retry_delay_timer_start = itask.sub_retry_delay_timer_start
new_task.sub_retry_delay_timer_timeout = (
itask.sub_retry_delay_timer_timeout)

new_task.try_number = itask.try_number
new_task.sub_try_number = itask.sub_try_number
Expand Down
Loading

0 comments on commit 5abb581

Please sign in to comment.