Skip to content

Commit

Permalink
fix host check in subshell_eval
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed Sep 15, 2020
1 parent 3175fee commit e6838a6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 32 deletions.
8 changes: 4 additions & 4 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def submit_task_jobs(self, suite, itasks, curve_auth,
prepared_tasks, bad_tasks = self.prep_submit_task_jobs(suite, itasks)

# Reset consumed host selection results
self.task_remote_mgr.remote_host_select_reset()
self.task_remote_mgr.subshell_eval_reset()

if not prepared_tasks:
return bad_tasks
Expand Down Expand Up @@ -827,12 +827,12 @@ def _prep_submit_task_job(self, suite, itask, check_syntax=True):
host_n, platform_n = None, None
try:
if rtconfig['remote']['host'] is not None:
host_n = self.task_remote_mgr.remote_host_select(
host_n = self.task_remote_mgr.subshell_eval(
rtconfig['remote']['host']
)
else:
platform_n = self.task_remote_mgr.remote_host_select(
rtconfig['platform']
platform_n = self.task_remote_mgr.subshell_eval(
rtconfig['platform'], host_check=False
)
except TaskRemoteMgmtError as exc:
# Submit number not yet incremented
Expand Down
66 changes: 38 additions & 28 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,74 +60,84 @@ class TaskRemoteMgr:
def __init__(self, suite, proc_pool):
self.suite = suite
self.proc_pool = proc_pool
# self.remote_host_str_map = {host_str: host|TaskRemoteMgmtError|None}
self.remote_host_str_map = {}
# self.remote_command_map = {command: host|TaskRemoteMgmtError|None}
self.remote_command_map = {}
# self.remote_init_map = {(host, owner): status, ...}
self.remote_init_map = {}
self.single_task_mode = False
self.uuid_str = None
self.ready = False

def remote_host_select(self, host_str):
"""Evaluate a task host string.
def subshell_eval(self, command, host_check=True):
"""Evaluate a task platform from a subshell string.
At Cylc 7, from a host string.
Arguments:
host_str (str):
command (str):
An explicit host name, a command in back-tick or $(command)
format, or an environment variable holding a hostname.
Return (str):
None if evaluate of host_str is still taking place.
'localhost' if host_str is not defined or if the evaluated host
name is equivalent to 'localhost'.
Otherwise, return the evaluated host name on success.
- None if evaluation of command is still taking place.
- If command is not defined or the evaluated name is equivelent
to 'localhost', _and_ host_check is set to True then
'localhost'
- Otherwise, return the evaluated host name on success.
TODO:
At Cylc 9, strip of all references to host.
Raise TaskRemoteMgmtError on error.
"""
if not host_str:
if not command:
return 'localhost'

# Host selection command: $(command) or `command`
match = REC_COMMAND.match(host_str)
match = REC_COMMAND.match(command)
if match:
cmd_str = match.groups()[1]
if cmd_str in self.remote_host_str_map:
if cmd_str in self.remote_command_map:
# Command recently launched
value = self.remote_host_str_map[cmd_str]
value = self.remote_command_map[cmd_str]
if isinstance(value, TaskRemoteMgmtError):
raise value # command failed
elif value is None:
return # command not yet ready
else:
host_str = value # command succeeded
command = value # command succeeded
else:
# Command not launched (or already reset)
self.proc_pool.put_command(
SubProcContext(
'remote-host-select',
['bash', '-c', cmd_str],
env=dict(os.environ)),
self._remote_host_select_callback, [cmd_str])
self.remote_host_str_map[cmd_str] = None
return self.remote_host_str_map[cmd_str]
self._subshell_eval_callback, [cmd_str])
self.remote_command_map[cmd_str] = None
return self.remote_command_map[cmd_str]

# Environment variable substitution
host_str = os.path.expandvars(host_str)
command = os.path.expandvars(command)
# Remote?
if is_remote_host(host_str):
return host_str
# TODO - Remove at Cylc 9 as this only makes sense with host logic
if host_check is True:
if is_remote_host(command):
return command
else:
return 'localhost'
else:
return 'localhost'
return command

def remote_host_select_reset(self):
"""Reset remote host select results.
def subshell_eval_reset(self):
"""Reset remote eval subshell results.
This is normally called after the results are consumed.
"""
for key, value in list(self.remote_host_str_map.copy().items()):
for key, value in list(self.remote_command_map.copy().items()):
if value is not None:
del self.remote_host_str_map[key]
del self.remote_command_map[key]

def remote_init(self, platform_name, curve_auth, client_pub_key_dir):
"""Initialise a remote [owner@]host if necessary.
Expand Down Expand Up @@ -285,17 +295,17 @@ def remote_tidy(self):
(host, owner), ' '.join(quote(item) for item in cmd),
proc.returncode, out, err))

def _remote_host_select_callback(self, proc_ctx, cmd_str):
def _subshell_eval_callback(self, proc_ctx, cmd_str):
"""Callback when host select command exits"""
self.ready = True
if proc_ctx.ret_code == 0 and proc_ctx.out:
# Good status
LOG.debug(proc_ctx)
self.remote_host_str_map[cmd_str] = proc_ctx.out.splitlines()[0]
self.remote_command_map[cmd_str] = proc_ctx.out.splitlines()[0]
else:
# Bad status
LOG.error(proc_ctx)
self.remote_host_str_map[cmd_str] = TaskRemoteMgmtError(
self.remote_command_map[cmd_str] = TaskRemoteMgmtError(
TaskRemoteMgmtError.MSG_SELECT, (cmd_str, None), cmd_str,
proc_ctx.ret_code, proc_ctx.out, proc_ctx.err)

Expand Down

0 comments on commit e6838a6

Please sign in to comment.