Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#119: warm start support #1043

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/cylc-restart
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ where they got to while the suite was down."""

scheduler.parse_commandline( self )

def get_state_start_string( self ):
"""Return the start string specified in the state file, if any."""
def get_state_initial_point_string( self ):
"""Return the initial point string from the state file, if any."""
return self.get_state_file_info()[0]

def get_state_file_path( self ):
Expand Down
106 changes: 29 additions & 77 deletions bin/cylc-run
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,15 @@ Suites run in daemon mode unless -n/--no-detach or --debug is used.
The following are all equivalent if no intercycle dependence exists:
1/ Cold start (default) : use special cold-start tasks
2/ Warm start (-w,--warm) : assume a previous cycle
3/ Raw start (-r,--raw) : assume nothing

1/ COLD START -- any designated cold-start tasks will be inserted in the
waiting state. The variable $CYLC_SUITE_INITIAL_CYCLE_POINT will be set
to the initial cycle point, in task environments.
1/ COLD START -- start at the initial cycle point of the suite,
specified via the suite.rc or via the first argument (ARGS). Any
dependencies on earlier cycles will be ignored.

2/ WARM START -- any designated cold-start tasks will be inserted in the
succeeded state, to stand in for a previous cycle. The variable
$CYLC_SUITE_INITIAL_CYCLE_POINT will be set to 'None' in task environments
unless '--ict' is used.

3/ RAW START -- do not insert any cold-start tasks (mainly for testing).
2/ WARM START -- start from a cycle point later than the initial cycle
point, supplied via the first argument (ARGS). The initial cycle point
should be specified via the suite.rc. Any dependencies on earlier
cycles will be ignored.

In task environments, $CYLC_SUITE_FINAL_CYCLE_POINT is always set to the
final cycle point if one is set (by suite.rc file or command line). The
Expand All @@ -76,9 +73,6 @@ initial and final cycle point variables persists across suite restarts."""
self.parser.add_option( "-w", "--warm", help="Warm start the suite",
action="store_true", default=False, dest="warm" )

self.parser.add_option( "-r", "--raw", help="Raw start the suite",
action="store_true", default=False, dest="raw" )

self.parser.add_option( "--ict",
help="Set $CYLC_SUITE_INITIAL_CYCLE_POINT to the initial "
"cycle point even in a warm start (as for cold starts).",
Expand All @@ -97,27 +91,31 @@ initial and final cycle point variables persists across suite restarts."""
sys.path.append( os.path.join( self.suite_dir, 'python' ))

if len( self.args ) == 2:
self._cli_start_string = self.args[1]
if self._cli_start_string == "now":
start_point_string = self.args[1]
if start_point_string == "now":
# TODO ISO: will it ever be useful to have this in minutes?
self._cli_start_string = (
start_point_string = (
datetime.datetime.utcnow().strftime("%Y%m%dT%HZ")
)
if self.options.warm:
self._cli_start_point_string = start_point_string
else:
self._cli_initial_point_string = start_point_string
elif self.options.warm:
# No warm-start cycle point supplied.
sys.exit(self.parser.get_usage())

scheduler.parse_commandline( self )

if self.options.warm:
self.load_tasks = self.load_tasks_warm
elif self.options.raw:
self.load_tasks = self.load_tasks_raw
else:
self.load_tasks = self.load_tasks_cold

def load_tasks_cold( self ):
def load_tasks( self ):
if self.start_point is not None:
self.log.info( 'Cold Start ' + str(self.start_point) )
if self.options.warm:
self.log.info( 'Warm Start %s' % self.start_point)
else:
self.log.info( 'Cold Start %s' % self.start_point )

task_list = self.filter_initial_task_list( self.config.get_task_name_list() )
coldstart_tasks = self.config.get_coldstart_task_list()

for name in task_list:
if self.start_point is not None:
Expand All @@ -130,64 +128,18 @@ initial and final cycle point variables persists across suite restarts."""
submit_num=0, exists=False
)

if itask.point:
self.pool.add_to_runahead_pool( itask )
else:
self.log.info( "Not loading " + name + " (out of sequence bounds)" )
del itask

def load_tasks_warm( self ):
if self.start_point is not None:
self.log.info( 'Warm Start ' + str(self.start_point) )

task_list = self.filter_initial_task_list( self.config.get_task_name_list() )
coldstart_tasks = self.config.get_coldstart_task_list()
if len( coldstart_tasks ) == 0:
self.log.info( "This suite has not defined any cold start tasks" )
for name in task_list:
# (startup=True is only for cold start)
if self.start_point is not None:
point = self.start_point
else:
# no initial cycle point: we can't load cycling tasks
continue
itask = self.config.get_task_proxy(
name, point, 'waiting', stop_point=None, startup=False,
submit_num=0, exists=False
)
if name in coldstart_tasks:
itask.log( "NORMAL", "warm start: starting in succeeded state" )
if name in coldstart_tasks and self.options.warm:
itask.log("NORMAL",
"warm start: starting in succeeded state")
itask.state.set_status( 'succeeded' )
itask.prerequisites.set_all_satisfied()
itask.outputs.set_all_completed()

self.pool.add_to_runahead_pool( itask )

def load_tasks_raw( self ):
if self.start_point is not None:
self.log.info( 'Raw Start ' + str(self.start_point) )

task_list = self.filter_initial_task_list( self.config.get_task_name_list() )

coldstart_tasks = self.config.get_coldstart_task_list()

for name in task_list:
# startup=True only for cold start
if self.start_point is not None:
point = self.start_point
if itask.point:
self.pool.add_to_runahead_pool( itask )
else:
# no initial cycle point: we can't load cycling tasks
continue
itask = self.config.get_task_proxy(
name, point, 'waiting', stop_point=None, startup=False,
submit_num=0, exists=False
)
if name in coldstart_tasks:
itask.log( "NORMAL", "This is a raw start: I will self-destruct." )
self.log.info( "Not loading " + name + " (out of sequence bounds)" )
del itask
continue

self.pool.add_to_runahead_pool( itask )

if __name__ == '__main__':
main("run", start)
2 changes: 1 addition & 1 deletion bin/cylc-validate
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ if cylc.flags.verbose:

try:
config( suite, suiterc,
cli_start_string=options.ict,
cli_initial_point_string=options.ict,
template_vars=options.templatevars,
template_vars_file=options.templatevars_file,
validation=True, strict=options.strict,
Expand Down
2 changes: 1 addition & 1 deletion dev/ToDo/ToDo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ messages such as remote error tracebacks to be sent to the handler.
* a 'nudge' that just updates task state info, without going through
the full task processing loop?

* warm, cold, and raw start should insert tasks beyond the optional
* warm and cold start should insert tasks beyond the optional
final task in the 'held' state, not just delete them? (see 'STOPPING'
in bin/_run

Expand Down
22 changes: 18 additions & 4 deletions lib/cylc/LogDiagnosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ def __init__( self, log ):
self.lines = h.readlines()
h.close()

def get_start_string( self ):
def get_initial_point_string( self ):
found = False
for line in self.lines:
m = re.search( 'Start point: (.*)$',line)
m = re.search( 'Initial point: (.*)$',line)
if m:
found = True
point_string = m.groups()[0]
Expand All @@ -35,10 +35,24 @@ def get_start_string( self ):
else:
raise LogAnalyserError( "ERROR: logged start point not found" )

def get_stop_string( self ):
def get_start_point_string( self ):
found = False
for line in self.lines:
m = re.search( 'Start point: (.*)$',line)
if m:
found = True
point_string = m.groups()[0]
if point_string == "None":
point_string = None
break
if found:
return point_string
return None

def get_final_point_string( self ):
found = False
for line in self.lines:
m = re.search( 'Stop point: (.*)$',line)
m = re.search( 'Final point: (.*)$',line)
if m:
found = True
point_string = m.groups()[0]
Expand Down
44 changes: 27 additions & 17 deletions lib/cylc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class config( object ):
def __init__( self, suite, fpath, template_vars=[],
template_vars_file=None, owner=None, run_mode='live',
validation=False, strict=False, collapsed=[],
cli_start_string=None, is_restart=False, is_reload=False,
cli_initial_point_string=None, cli_start_point_string=None,
is_restart=False, is_reload=False,
write_proc=True ):

self.suite = suite # suite name
Expand All @@ -106,7 +107,10 @@ def __init__( self, suite, fpath, template_vars=[],
self.edges = []
self.taskdefs = {}
self.validation = validation
self._cli_start_string = cli_start_string
self.initial_point = None
self.start_point = None
self._cli_initial_point_string = cli_initial_point_string
self._cli_start_point_string = cli_start_point_string
self.is_restart = is_restart
self.first_graph = True
self.clock_offsets = {}
Expand Down Expand Up @@ -147,9 +151,9 @@ def __init__( self, suite, fpath, template_vars=[],
write_proc=write_proc )
self.cfg = self.pcfg.get(sparse=True)

if self._cli_start_string is not None:
if self._cli_initial_point_string is not None:
self.cfg['scheduling']['initial cycle point'] = (
self._cli_start_string)
self._cli_initial_point_string)

if 'cycling mode' not in self.cfg['scheduling']:
# Auto-detect integer cycling for pure async graph suites.
Expand Down Expand Up @@ -225,6 +229,7 @@ def __init__( self, suite, fpath, template_vars=[],
# after the call to init_cyclers, we can start getting proper points.
init_cyclers(self.cfg)

initial_point = None
if self.cfg['scheduling']['initial cycle point'] is not None:
initial_point = get_point(
self.cfg['scheduling']['initial cycle point']).standardise()
Expand All @@ -235,9 +240,18 @@ def __init__( self, suite, fpath, template_vars=[],
self.cfg['scheduling']['final cycle point']).standardise()
self.cfg['scheduling']['final cycle point'] = str(final_point)

self.cli_start_point = get_point(self._cli_start_string)
if self.cli_start_point is not None:
self.cli_start_point.standardise()
self.cli_initial_point = get_point(self._cli_initial_point_string)
if self.cli_initial_point is not None:
self.cli_initial_point.standardise()

self.initial_point = self.cli_initial_point or initial_point
if self.initial_point is not None:
self.initial_point.standardise()

self.start_point = (
get_point(self._cli_start_point_string) or self.initial_point)
if self.start_point is not None:
self.start_point.standardise()

flags.backwards_compat_cycling = (
get_backwards_compat_mode())
Expand Down Expand Up @@ -328,8 +342,7 @@ def __init__( self, suite, fpath, template_vars=[],

# initial and final cycles for visualization
vict = self.cfg['visualization']['initial cycle point'] or \
str(self.get_actual_first_point(
self.cfg['scheduling']['initial cycle point']))
str(self.get_actual_first_point(self.start_point))
self.cfg['visualization']['initial cycle point'] = vict

vict_rh = None
Expand Down Expand Up @@ -868,10 +881,9 @@ def check_tasks( self ):
for name in self.taskdefs.keys():
type = self.taskdefs[name].type
# TODO ISO - THIS DOES NOT GET ALL GRAPH SECTIONS:
start_point = get_point( self.cfg['scheduling']['initial cycle point'] )
try:
# instantiate a task
itask = self.taskdefs[name].get_task_class()( start_point, 'waiting', None, True, validate=True )
itask = self.taskdefs[name].get_task_class()( self.start_point, 'waiting', None, True, validate=True )
except TypeError, x:
# This should not happen as we now explicitly catch use
# of synchronous special tasks in an asynchronous graph.
Expand All @@ -884,7 +896,7 @@ def check_tasks( self ):
'ERROR, failed to instantiate task %s: %s' % (name, x))
if itask.point is None:
if flags.verbose:
print " + Task out of bounds for " + str(start_point) + ": " + itask.name
print " + Task out of bounds for " + str(self.start_point) + ": " + itask.name
continue

# warn for purely-implicit-cycling tasks (these are deprecated).
Expand Down Expand Up @@ -1338,7 +1350,7 @@ def generate_triggers( self, lexpression, left_nodes, right, seq, suicide ):

if lnode.offset_is_from_ict:
first_point = get_point_relative(
lnode.offset_string, ltaskdef.ict)
lnode.offset_string, self.initial_point)
last_point = seq.get_stop_point()
if last_point is None:
# This dependency persists for the whole suite run.
Expand Down Expand Up @@ -1761,14 +1773,12 @@ def get_taskdef( self, name ):
rtcfg = self.cfg['runtime'][name]
except KeyError:
raise SuiteConfigError, "Task not found: " + name

ict_point = (self.cli_start_point or
get_point(self.cfg['scheduling']['initial cycle point']))
# We may want to put in some handling for cases of changing the
# initial cycle via restart (accidentally or otherwise).

# Get the taskdef object for generating the task proxy class
taskd = taskdef.taskdef( name, rtcfg, self.run_mode, ict_point )
taskd = taskdef.taskdef(
name, rtcfg, self.run_mode, self.start_point)

# TODO - put all taskd.foo items in a single config dict
# SET COLD-START TASK INDICATORS
Expand Down
2 changes: 1 addition & 1 deletion lib/cylc/gui/GraphUpdater.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def update_graph(self):
if start_time == None or oldest > start_time:
rawx = True
else:
# (show cold start tasks) - TODO - actual raw start
# (show cold start tasks)
rawx = False

extra_node_ids = {}
Expand Down
Loading