Skip to content

Commit

Permalink
Merge pull request #2104 from matthewrmshin/refactor-restart-job-host…
Browse files Browse the repository at this point in the history
…-init

Refactor restart job host init logic
  • Loading branch information
hjoliver authored Jan 9, 2017
2 parents e610be0 + 1c2692d commit cbebb62
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 31 deletions.
31 changes: 15 additions & 16 deletions lib/cylc/job_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ def get_inst(cls):
return cls._INSTANCE

def __init__(self):
self.initialised_hosts = {} # user_at_host: should_unlink
self.initialised = {} # {(user, host): should_unlink, ...}
self.single_task_mode = False
self.suite_srv_files_mgr = SuiteSrvFilesManager()

def init_suite_run_dir(self, reg, user_at_host):
def init_suite_run_dir(self, reg, host, owner):
"""Initialise suite run dir on a user@host.
Create SUITE_RUN_DIR/log/job/ if necessary.
Expand All @@ -74,14 +74,14 @@ def init_suite_run_dir(self, reg, user_at_host):
Raise RemoteJobHostInitError if initialisation cannot complete.
"""
if '@' in user_at_host:
owner, host = user_at_host.split('@', 1)
else:
owner, host = None, user_at_host
if ((owner, host) in [(None, 'localhost'), (USER, 'localhost')] or
host in self.initialised_hosts or
self.single_task_mode):
if host is None:
host = 'localhost'
if ((host, owner) in [('localhost', None), ('localhost', USER)] or
(host, owner) in self.initialised or self.single_task_mode):
return
user_at_host = host
if owner:
user_at_host = owner + '@' + host

r_suite_run_dir = GLOBAL_CFG.get_derived_host_item(
reg, 'suite run directory', host, owner)
Expand All @@ -107,7 +107,7 @@ def init_suite_run_dir(self, reg, user_at_host):
stdout=PIPE, stderr=PIPE)
if proc.wait() == 0:
# Initialised, but no need to tidy up
self.initialised_hosts[user_at_host] = False
self.initialised[(host, owner)] = False
return
finally:
try:
Expand Down Expand Up @@ -153,7 +153,7 @@ def init_suite_run_dir(self, reg, user_at_host):
RemoteJobHostInitError.MSG_INIT,
user_at_host, ' '.join([quote(item) for item in cmd]),
proc.returncode, out, err)
self.initialised_hosts[user_at_host] = should_unlink
self.initialised[(host, owner)] = should_unlink
LOG.info('Initialised %s:%s' % (user_at_host, r_suite_run_dir))

def unlink_suite_contact_files(self, reg):
Expand All @@ -164,13 +164,12 @@ def unlink_suite_contact_files(self, reg):
"""
# Issue all SSH commands in parallel
procs = {}
for user_at_host, should_unlink in self.initialised_hosts.items():
for (host, owner), should_unlink in self.initialised.items():
if not should_unlink:
continue
if '@' in user_at_host:
owner, host = user_at_host.split('@', 1)
else:
owner, host = None, user_at_host
user_at_host = host
if owner:
user_at_host = owner + '@' + host
ssh_tmpl = GLOBAL_CFG.get_host_item(
'remote shell template', host, owner)
r_suite_contact_file = os.path.join(
Expand Down
26 changes: 12 additions & 14 deletions lib/cylc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,6 @@ def __init__(self, is_restart, options, args):
self.suite_log = None
self.log = LOG

# FIXME: can this be a local variable?
self.old_user_at_host_set = set()

self.ref_test_allowed_failures = []

def start(self):
Expand Down Expand Up @@ -438,15 +435,6 @@ def configure(self):
pass
copytree(suite_py, suite_run_py)

# 2) restart only: copy to other accounts with still-running tasks
for user_at_host in self.old_user_at_host_set:
try:
RemoteJobHostManager.get_inst().init_suite_run_dir(
self.suite, user_at_host)
except RemoteJobHostInitError as exc:
self.log.error(str(exc))
self.old_user_at_host_set.clear()

self.already_timed_out = False
self.set_suite_timer()

Expand Down Expand Up @@ -495,6 +483,17 @@ def load_tasks_for_restart(self):
self.pri_dao.select_task_pool_for_restart(
self._load_task_pool, self.options.checkpoint)
self.pri_dao.select_task_action_timers(self._load_task_action_timers)
# Re-initialise run directory for user@host for each submitted and
# running tasks.
# Note: tasks should all be in the runahead pool at this point.
for itask in self.pool.get_rh_tasks():
if itask.state.status in [
TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING]:
try:
RemoteJobHostManager.get_inst().init_suite_run_dir(
self.suite, itask.task_host, itask.task_owner)
except RemoteJobHostInitError as exc:
self.log.error(str(exc))
self.pool.poll_task_jobs()

def _load_broadcast_states(self, row_idx, row):
Expand Down Expand Up @@ -621,7 +620,6 @@ def _load_task_pool(self, row_idx, row):
except ValueError:
itask.task_owner = None
itask.task_host = user_at_host
self.old_user_at_host_set.add(user_at_host)

elif status in (TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_FAILED):
itask.state.set_prerequisites_all_satisfied()
Expand All @@ -634,7 +632,7 @@ def _load_task_pool(self, row_idx, row):
elif status in (TASK_STATUS_SUBMIT_RETRYING, TASK_STATUS_RETRYING):
itask.state.set_prerequisites_all_satisfied()

elif itask.state.status == TASK_STATUS_SUCCEEDED:
elif status == TASK_STATUS_SUCCEEDED:
itask.state.set_prerequisites_all_satisfied()
# TODO - just poll for outputs in the job status file.
itask.state.outputs.set_all_completed()
Expand Down
2 changes: 1 addition & 1 deletion lib/cylc/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ def _prep_submit_impl(self, overrides=None):
'execution time limit polling intervals', [60, 120, 420])))

RemoteJobHostManager.get_inst().init_suite_run_dir(
self.suite_name, user_at_host)
self.suite_name, self.task_host, self.task_owner)
self.db_updates_map[self.TABLE_TASK_JOBS].append({
"user_at_host": user_at_host,
"batch_sys_name": self.summary['batch_sys_name'],
Expand Down

0 comments on commit cbebb62

Please sign in to comment.