Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Platforms.eval platform cmd #3791

Merged
merged 6 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
('job', 'batch submit command template', [None])
)

# Regex to check whether a string is a command
REC_COMMAND = re.compile(r'(`|\$\()\s*(.*)\s*([`)])$')


def get_platform(task_conf=None, task_id='unknown task', warn_only=False):
"""Get a platform.
Expand All @@ -52,6 +55,9 @@ def get_platform(task_conf=None, task_id='unknown task', warn_only=False):
Actually it returns either get_platform() or
platform_from_job_info(), but to the user these look the same.
When working in warn_only mode, warnings are returned as strings.

TODO:
At Cylc 9 remove all Cylc7 upgrade logic.
"""

if task_conf is None:
Expand All @@ -63,10 +69,16 @@ def get_platform(task_conf=None, task_id='unknown task', warn_only=False):
output = platform_from_name(task_conf)

elif 'platform' in task_conf and task_conf['platform']:
if REC_COMMAND.match(task_conf['platform']) and warn_only:
# In warning mode this function might have been passed an
# un-expanded platform string - warn that they won't deal with
# with this until job submit.
return None

# Check whether task has conflicting Cylc7 items.
fail_if_platform_and_host_conflict(task_conf, task_id)

# If platform name exists and doesn't clash with Cylc7 Config
# items:
# If platform name exists and doesn't clash with Cylc7 Config items.
output = platform_from_name(task_conf['platform'])

else:
Expand Down
122 changes: 88 additions & 34 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@
TASK_STATUS_SUBMIT_RETRYING, TASK_STATUS_RETRYING)
from cylc.flow.wallclock import get_current_time_string, get_utc_mode
from cylc.flow.remote import construct_platform_ssh_cmd
from cylc.flow.exceptions import PlatformLookupError, TaskRemoteMgmtError
from cylc.flow.exceptions import (
PlatformLookupError, SuiteConfigError, TaskRemoteMgmtError
)


class TaskJobManager:
Expand Down Expand Up @@ -201,7 +203,7 @@ def submit_task_jobs(self, suite, itasks, curve_auth,
prepared_tasks, bad_tasks = self.prep_submit_task_jobs(suite, itasks)

# Reset consumed host selection results
self.task_remote_mgr.remote_host_select_reset()
self.task_remote_mgr.subshell_eval_reset()

if not prepared_tasks:
return bad_tasks
Expand Down Expand Up @@ -693,14 +695,15 @@ def _run_job_cmd(self, cmd_key, suite, itasks, callback):
SubProcContext(cmd_key, cmd), callback, [suite, itasks])

@staticmethod
def _set_retry_timers(itask, rtconfig=None):
def _set_retry_timers(itask, rtconfig=None, retry=True):
"""Set try number and retry delays."""
if rtconfig is None:
rtconfig = itask.tdef.rtconfig
try:
_ = rtconfig[itask.tdef.run_mode + ' mode']['disable retries']
except KeyError:
retry = True
if (
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be faster since in most cases the try was going to fail.

itask.tdef.run_mode + ' mode' in rtconfig and
'disable retries' in rtconfig[itask.tdef.run_mode + ' mode']
):
retry = False
if retry:
if rtconfig['job']['submission retry delays']:
submit_delays = rtconfig['job']['submission retry delays']
Expand Down Expand Up @@ -798,29 +801,86 @@ def _prep_submit_task_job(self, suite, itask, check_syntax=True):
else:
rtconfig = itask.tdef.rtconfig

# Determine task host settings now, just before job submission,
# because dynamic host selection may be used.
# TODO - remove host logic at Cylc 9
# Determine task host or platform now, just before job submission,
# because dynamic host/platform selection may be used.
# cases:
# - Platform exists, host does = throw error here:
# Although errors of this sort should ideally be caught on config
# load this cannot be done because inheritance may create conflicts
# which appear later. Although this error is also raised
# by the platforms module it's probably worth putting it here too
# to prevent trying to run the remote_host/platform_select logic for
# tasks which will fail anyway later.
# - Platform exists, host doesn't = eval platform_n
# - host exists - eval host_n
if (
rtconfig['platform'] is not None and
rtconfig['remote']['host'] is not None
):
raise SuiteConfigError(
"A mixture of Cylc 7 (host) and Cylc 8 (platform) "
"logic should not be used. In this case for the task "
f"\"{itask.identity}\" the following are not compatible:\n"
)

host_n, platform_n = None, None
try:
platform = get_platform(rtconfig, itask.identity)
if rtconfig['remote']['host'] is not None:
host_n = self.task_remote_mgr.subshell_eval(
rtconfig['remote']['host']
)
else:
platform_n = self.task_remote_mgr.subshell_eval(
rtconfig['platform'], host_check=False
)
except TaskRemoteMgmtError as exc:
# Submit number not yet incremented
itask.submit_num += 1
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(suite, itask)
self._set_retry_timers(itask, rtconfig)
self._prep_submit_task_job_error(
suite, itask, '(remote host select)', exc)
return False
else:
# TODO: re-instate when remote host selection upgraded
# if task_host is None: # host select not ready
# itask.set_summary_message(self.REMOTE_SELECT_MSG)
# return
itask.platform = platform
# Submit number not yet incremented
itask.submit_num += 1
# Retry delays, needed for the try_num
self._set_retry_timers(itask, rtconfig)
# host/platform select not ready
if host_n is None and platform_n is None:
itask.set_summary_message(self.REMOTE_SELECT_MSG)
return
elif host_n is None and rtconfig['platform'] != platform_n:
LOG.debug(
f"for task {itask.identity}: platform = "
f"{rtconfig['platform']} evaluated as {platform_n}"
)
rtconfig['platform'] = platform_n
elif platform_n is None and rtconfig['remote']['host'] != host_n:
LOG.debug(
f"for task {itask.identity}: host = "
f"{rtconfig['remote']['host']} evaluated as {host_n}"
)
rtconfig['remote']['host'] = host_n

try:
platform = get_platform(rtconfig)
except PlatformLookupError as exc:
# Submit number not yet incremented
itask.submit_num += 1
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(suite, itask)
self._set_retry_timers(itask, rtconfig, False)
self._prep_submit_task_job_error(
suite, itask, '(platform not defined)', exc
)
return False
else:
itask.platform = platform
# Submit number not yet incremented
itask.submit_num += 1
# Retry delays, needed for the try_num
self._set_retry_timers(itask, rtconfig)

try:
job_conf = self._prep_submit_task_job_impl(suite, itask, rtconfig)
Expand All @@ -847,21 +907,15 @@ def _prep_submit_task_job(self, suite, itask, check_syntax=True):
def _prep_submit_task_job_error(self, suite, itask, action, exc):
"""Helper for self._prep_submit_task_job. On error."""
LOG.debug("submit_num %s" % itask.submit_num)
if type(exc) == PlatformLookupError:
LOG.error(
f"{itask.identity} cannot find platform to match Cylc 7 "
"settings:"
)
else:
LOG.debug(traceback.format_exc())
LOG.error(exc)
log_task_job_activity(
SubProcContext(self.JOBS_SUBMIT, action, err=exc, ret_code=1),
suite,
itask.point,
itask.tdef.name,
submit_num=itask.submit_num
)
LOG.debug(traceback.format_exc())
LOG.error(exc)
log_task_job_activity(
SubProcContext(self.JOBS_SUBMIT, action, err=exc, ret_code=1),
suite,
itask.point,
itask.tdef.name,
submit_num=itask.submit_num
)
# Persist
self.suite_db_mgr.put_insert_task_jobs(itask, {
'is_manual_submit': itask.is_manual_submit,
Expand Down
66 changes: 38 additions & 28 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,74 +60,84 @@ class TaskRemoteMgr:
def __init__(self, suite, proc_pool):
self.suite = suite
self.proc_pool = proc_pool
# self.remote_host_str_map = {host_str: host|TaskRemoteMgmtError|None}
self.remote_host_str_map = {}
# self.remote_command_map = {command: host|TaskRemoteMgmtError|None}
self.remote_command_map = {}
# self.remote_init_map = {(host, owner): status, ...}
self.remote_init_map = {}
self.single_task_mode = False
self.uuid_str = None
self.ready = False

def remote_host_select(self, host_str):
"""Evaluate a task host string.
def subshell_eval(self, command, host_check=True):
"""Evaluate a task platform from a subshell string.

At Cylc 7, from a host string.

Arguments:
host_str (str):
command (str):
An explicit host name, a command in back-tick or $(command)
format, or an environment variable holding a hostname.

Return (str):
None if evaluate of host_str is still taking place.
'localhost' if host_str is not defined or if the evaluated host
name is equivalent to 'localhost'.
Otherwise, return the evaluated host name on success.
- None if evaluation of command is still taking place.
- If command is not defined or the evaluated name is equivelent
to 'localhost', _and_ host_check is set to True then
'localhost'
- Otherwise, return the evaluated host name on success.

TODO:
At Cylc 9, strip of all references to host.

Raise TaskRemoteMgmtError on error.

"""
if not host_str:
if not command:
return 'localhost'

# Host selection command: $(command) or `command`
match = REC_COMMAND.match(host_str)
match = REC_COMMAND.match(command)
if match:
cmd_str = match.groups()[1]
if cmd_str in self.remote_host_str_map:
if cmd_str in self.remote_command_map:
# Command recently launched
value = self.remote_host_str_map[cmd_str]
value = self.remote_command_map[cmd_str]
if isinstance(value, TaskRemoteMgmtError):
raise value # command failed
elif value is None:
return # command not yet ready
else:
host_str = value # command succeeded
command = value # command succeeded
else:
# Command not launched (or already reset)
self.proc_pool.put_command(
SubProcContext(
'remote-host-select',
['bash', '-c', cmd_str],
env=dict(os.environ)),
self._remote_host_select_callback, [cmd_str])
self.remote_host_str_map[cmd_str] = None
return self.remote_host_str_map[cmd_str]
self._subshell_eval_callback, [cmd_str])
self.remote_command_map[cmd_str] = None
return self.remote_command_map[cmd_str]

# Environment variable substitution
host_str = os.path.expandvars(host_str)
command = os.path.expandvars(command)
# Remote?
if is_remote_host(host_str):
return host_str
# TODO - Remove at Cylc 9 as this only makes sense with host logic
if host_check is True:
if is_remote_host(command):
return command
else:
return 'localhost'
else:
return 'localhost'
return command

def remote_host_select_reset(self):
"""Reset remote host select results.
def subshell_eval_reset(self):
"""Reset remote eval subshell results.

This is normally called after the results are consumed.
"""
for key, value in list(self.remote_host_str_map.copy().items()):
for key, value in list(self.remote_command_map.copy().items()):
if value is not None:
del self.remote_host_str_map[key]
del self.remote_command_map[key]

def remote_init(self, platform_name, curve_auth, client_pub_key_dir):
"""Initialise a remote [owner@]host if necessary.
Expand Down Expand Up @@ -285,17 +295,17 @@ def remote_tidy(self):
(host, owner), ' '.join(quote(item) for item in cmd),
proc.returncode, out, err))

def _remote_host_select_callback(self, proc_ctx, cmd_str):
def _subshell_eval_callback(self, proc_ctx, cmd_str):
"""Callback when host select command exits"""
self.ready = True
if proc_ctx.ret_code == 0 and proc_ctx.out:
# Good status
LOG.debug(proc_ctx)
self.remote_host_str_map[cmd_str] = proc_ctx.out.splitlines()[0]
self.remote_command_map[cmd_str] = proc_ctx.out.splitlines()[0]
else:
# Bad status
LOG.error(proc_ctx)
self.remote_host_str_map[cmd_str] = TaskRemoteMgmtError(
self.remote_command_map[cmd_str] = TaskRemoteMgmtError(
TaskRemoteMgmtError.MSG_SELECT, (cmd_str, None), cmd_str,
proc_ctx.ret_code, proc_ctx.out, proc_ctx.err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,8 @@
#-------------------------------------------------------------------------------
# Test recovery of a failed host select command for a group of tasks.
. "$(dirname "$0")/test_header"
skip_all "TODO replace this test with one checking that garbage-platform commands are handled ok"
set_test_number 2

create_test_global_config "
[platforms]
[[test platform]]
hosts = $(my-host-select)
"

install_suite "${TEST_NAME_BASE}"
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}"
suite_run_ok "${TEST_NAME_BASE}-run" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
[runtime]
[[foo<i>]]
script = true
platform = test platform
platform = $(my-host-select)
[[[job]]]
submission retry delays = PT10S
Loading