Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor restart job host init logic #2104

Merged
merged 1 commit into from
Jan 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions lib/cylc/job_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ def get_inst(cls):
return cls._INSTANCE

def __init__(self):
self.initialised_hosts = {} # user_at_host: should_unlink
self.initialised = {} # {(user, host): should_unlink, ...}
self.single_task_mode = False
self.suite_srv_files_mgr = SuiteSrvFilesManager()

def init_suite_run_dir(self, reg, user_at_host):
def init_suite_run_dir(self, reg, host, owner):
"""Initialise suite run dir on a user@host.

Create SUITE_RUN_DIR/log/job/ if necessary.
Expand All @@ -74,14 +74,14 @@ def init_suite_run_dir(self, reg, user_at_host):
Raise RemoteJobHostInitError if initialisation cannot complete.

"""
if '@' in user_at_host:
owner, host = user_at_host.split('@', 1)
else:
owner, host = None, user_at_host
if ((owner, host) in [(None, 'localhost'), (USER, 'localhost')] or
host in self.initialised_hosts or
self.single_task_mode):
if host is None:
host = 'localhost'
if ((host, owner) in [('localhost', None), ('localhost', USER)] or
(host, owner) in self.initialised or self.single_task_mode):
return
user_at_host = host
if owner:
user_at_host = owner + '@' + host

r_suite_run_dir = GLOBAL_CFG.get_derived_host_item(
reg, 'suite run directory', host, owner)
Expand All @@ -107,7 +107,7 @@ def init_suite_run_dir(self, reg, user_at_host):
stdout=PIPE, stderr=PIPE)
if proc.wait() == 0:
# Initialised, but no need to tidy up
self.initialised_hosts[user_at_host] = False
self.initialised[(host, owner)] = False
return
finally:
try:
Expand Down Expand Up @@ -153,7 +153,7 @@ def init_suite_run_dir(self, reg, user_at_host):
RemoteJobHostInitError.MSG_INIT,
user_at_host, ' '.join([quote(item) for item in cmd]),
proc.returncode, out, err)
self.initialised_hosts[user_at_host] = should_unlink
self.initialised[(host, owner)] = should_unlink
LOG.info('Initialised %s:%s' % (user_at_host, r_suite_run_dir))

def unlink_suite_contact_files(self, reg):
Expand All @@ -164,13 +164,12 @@ def unlink_suite_contact_files(self, reg):
"""
# Issue all SSH commands in parallel
procs = {}
for user_at_host, should_unlink in self.initialised_hosts.items():
for (host, owner), should_unlink in self.initialised.items():
if not should_unlink:
continue
if '@' in user_at_host:
owner, host = user_at_host.split('@', 1)
else:
owner, host = None, user_at_host
user_at_host = host
if owner:
user_at_host = owner + '@' + host
ssh_tmpl = GLOBAL_CFG.get_host_item(
'remote shell template', host, owner)
r_suite_contact_file = os.path.join(
Expand Down
26 changes: 12 additions & 14 deletions lib/cylc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,6 @@ def __init__(self, is_restart, options, args):
self.suite_log = None
self.log = LOG

# FIXME: can this be a local variable?
self.old_user_at_host_set = set()

self.ref_test_allowed_failures = []

def start(self):
Expand Down Expand Up @@ -438,15 +435,6 @@ def configure(self):
pass
copytree(suite_py, suite_run_py)

# 2) restart only: copy to other accounts with still-running tasks
for user_at_host in self.old_user_at_host_set:
try:
RemoteJobHostManager.get_inst().init_suite_run_dir(
self.suite, user_at_host)
except RemoteJobHostInitError as exc:
self.log.error(str(exc))
self.old_user_at_host_set.clear()

self.already_timed_out = False
self.set_suite_timer()

Expand Down Expand Up @@ -495,6 +483,17 @@ def load_tasks_for_restart(self):
self.pri_dao.select_task_pool_for_restart(
self._load_task_pool, self.options.checkpoint)
self.pri_dao.select_task_action_timers(self._load_task_action_timers)
# Re-initialise run directory for user@host for each submitted and
# running tasks.
# Note: tasks should all be in the runahead pool at this point.
for itask in self.pool.get_rh_tasks():
if itask.state.status in [
TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING]:
try:
RemoteJobHostManager.get_inst().init_suite_run_dir(
self.suite, itask.task_host, itask.task_owner)
except RemoteJobHostInitError as exc:
self.log.error(str(exc))
self.pool.poll_task_jobs()

def _load_broadcast_states(self, row_idx, row):
Expand Down Expand Up @@ -621,7 +620,6 @@ def _load_task_pool(self, row_idx, row):
except ValueError:
itask.task_owner = None
itask.task_host = user_at_host
self.old_user_at_host_set.add(user_at_host)

elif status in (TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_FAILED):
itask.state.set_prerequisites_all_satisfied()
Expand All @@ -634,7 +632,7 @@ def _load_task_pool(self, row_idx, row):
elif status in (TASK_STATUS_SUBMIT_RETRYING, TASK_STATUS_RETRYING):
itask.state.set_prerequisites_all_satisfied()

elif itask.state.status == TASK_STATUS_SUCCEEDED:
elif status == TASK_STATUS_SUCCEEDED:
itask.state.set_prerequisites_all_satisfied()
# TODO - just poll for outputs in the job status file.
itask.state.outputs.set_all_completed()
Expand Down
2 changes: 1 addition & 1 deletion lib/cylc/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ def _prep_submit_impl(self, overrides=None):
'execution time limit polling intervals', [60, 120, 420])))

RemoteJobHostManager.get_inst().init_suite_run_dir(
self.suite_name, user_at_host)
self.suite_name, self.task_host, self.task_owner)
self.db_updates_map[self.TABLE_TASK_JOBS].append({
"user_at_host": user_at_host,
"batch_sys_name": self.summary['batch_sys_name'],
Expand Down