Skip to content

Commit

Permalink
Merge branch '8.1.x' into 4955
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim authored Apr 21, 2023
2 parents ee1e481 + 62f5ef5 commit ef235c3
Show file tree
Hide file tree
Showing 9 changed files with 407 additions and 117 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ ones in. -->
better error messages if [sections] and settings are mixed up in a
configuration.

[5445](https://github.com/cylc/cylc-flow/pull/5445) - Fix remote tidy
bug where install target is not explicit in platform definition.

[5398](https://github.com/cylc/cylc-flow/pull/5398) - Fix platform from
group selection order bug.

Expand Down
25 changes: 21 additions & 4 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,29 @@ def __str__(self):


class NoPlatformsError(PlatformLookupError):
"""None of the platforms of a given group were reachable."""
def __init__(self, platform_group):
self.platform_group = platform_group
"""None of the platforms of a given set were reachable.
Args:
identity: The name of the platform group or install target
set_type: Whether the set of platforms is a platform group or an
install target
place: Where the attempt to get the platform failed.
"""
def __init__(
self, identity: str, set_type: str = 'group', place: str = ''
):
self.identity = identity
self.type = set_type
if place:
self.place = f' during {place}.'
else:
self.place = '.'

def __str__(self):
return f'Unable to find a platform from group {self.platform_group}.'
return (
f'Unable to find a platform from {self.type} {self.identity}'
f'{self.place}'
)


class CylcVersionError(CylcError):
Expand Down
50 changes: 15 additions & 35 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import re
from copy import deepcopy
from typing import (
TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Union, overload
TYPE_CHECKING, Any, Dict, Iterable,
List, Optional, Set, Union, overload
)

from cylc.flow import LOG
Expand Down Expand Up @@ -630,18 +631,29 @@ def get_install_target_from_platform(platform: Dict[str, Any]) -> str:


def get_install_target_to_platforms_map(
platform_names: Iterable[str]
platform_names: Iterable[str],
quiet: bool = False
) -> Dict[str, List[Dict[str, Any]]]:
"""Get a dictionary of unique install targets and the platforms which use
them.
Args:
platform_names: List of platform names to look up in the global config.
quiet: Supress PlatformNotFound Errors
Return {install_target_1: [platform_1_dict, platform_2_dict, ...], ...}
"""
platform_names = set(platform_names)
platforms = [platform_from_name(p_name) for p_name in platform_names]
platforms: List[Dict[str, Any]] = []
for p_name in platform_names:
try:
platform = platform_from_name(p_name)
except PlatformLookupError as exc:
if not quiet:
raise exc
else:
platforms.append(platform)

install_targets = {
get_install_target_from_platform(platform)
for platform in platforms
Expand All @@ -667,38 +679,6 @@ def is_platform_with_target_in_list(
)


def get_all_platforms_for_install_target(
install_target: str
) -> List[Dict[str, Any]]:
"""Return list of platform dictionaries for given install target."""
platforms: List[Dict[str, Any]] = []
all_platforms = glbl_cfg(cached=True).get(['platforms'], sparse=False)
for k, v in all_platforms.iteritems(): # noqa: B301 (iteritems valid here)
if (v.get('install target', k) == install_target):
v_copy = deepcopy(v)
v_copy['name'] = k
platforms.append(v_copy)
return platforms


def get_random_platform_for_install_target(
install_target: str
) -> Dict[str, Any]:
"""Return a randomly selected platform (dict) for given install target.
Raises:
PlatformLookupError: We can't get a platform for this install target.
"""
platforms = get_all_platforms_for_install_target(install_target)
try:
return random.choice(platforms) # nosec (not crypto related)
except IndexError:
# No platforms to choose from
raise PlatformLookupError(
f'Could not select platform for install target: {install_target}'
)


def get_localhost_install_target() -> str:
"""Returns the install target of localhost platform"""
localhost = get_platform()
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr,
self.bad_hosts = bad_hosts
self.bad_hosts_to_clear = set()
self.task_remote_mgr = TaskRemoteMgr(
workflow, proc_pool, self.bad_hosts)
workflow, proc_pool, self.bad_hosts, self.workflow_db_mgr)

def check_task_jobs(self, workflow, task_pool):
"""Check submission and execution timeout and polling timers.
Expand Down
111 changes: 80 additions & 31 deletions cylc/flow/task_remote_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
from time import sleep, time
from typing import (
Any, Deque, Dict, TYPE_CHECKING, List,
NamedTuple, Optional, Tuple
NamedTuple, Optional, Set, Tuple
)

from cylc.flow import LOG
from cylc.flow.exceptions import PlatformError
from cylc.flow.exceptions import (
PlatformError, PlatformLookupError, NoHostsError, NoPlatformsError
)
import cylc.flow.flags
from cylc.flow.network.client_factory import CommsMeth
from cylc.flow.pathutil import (
Expand All @@ -50,12 +52,10 @@
from cylc.flow.platforms import (
HOST_REC_COMMAND,
PLATFORM_REC_COMMAND,
NoHostsError,
PlatformLookupError,
get_host_from_platform,
get_install_target_from_platform,
get_install_target_to_platforms_map,
get_localhost_install_target,
get_random_platform_for_install_target,
log_platform_event,
)
from cylc.flow.remote import construct_rsync_over_ssh_cmd, construct_ssh_cmd
Expand Down Expand Up @@ -96,7 +96,7 @@ class RemoteTidyQueueTuple(NamedTuple):
class TaskRemoteMgr:
"""Manage task remote initialisation, tidy, selection."""

def __init__(self, workflow, proc_pool, bad_hosts):
def __init__(self, workflow, proc_pool, bad_hosts, db_mgr):
self.workflow = workflow
self.proc_pool = proc_pool
# self.remote_command_map = {command: host|PlatformError|None}
Expand All @@ -110,6 +110,7 @@ def __init__(self, workflow, proc_pool, bad_hosts):
self.bad_hosts = bad_hosts
self.is_reload = False
self.is_restart = False
self.db_mgr = db_mgr

def _subshell_eval(
self, eval_str: str, command_pattern: re.Pattern
Expand Down Expand Up @@ -300,44 +301,92 @@ def construct_remote_tidy_ssh_cmd(
cmd = construct_ssh_cmd(cmd, platform, host, timeout='10s')
return cmd, host

@staticmethod
def _get_remote_tidy_targets(
platform_names: List[str],
install_targets: Set[str]
) -> Dict[str, List[Dict[str, Any]]]:
"""Finds valid platforms for install targets, warns about in invalid
install targets.
logs:
A list of install targets where no platform can be found.
returns:
A mapping of install targets to valid platforms only where
platforms are available.
"""
if install_targets and not platform_names:
install_targets_map: Dict[str, List[Dict[str, Any]]] = {
t: [] for t in install_targets}
unreachable_targets = install_targets
else:
install_targets_map = get_install_target_to_platforms_map(
platform_names, quiet=True)
# If we couldn't find a platform for a target, we cannot tidy it -
# raise an Error:
unreachable_targets = install_targets.difference(
install_targets_map)

if unreachable_targets:
msg = 'No platforms available to remote tidy install targets:'
for unreachable_target in unreachable_targets:
msg += f'\n * {unreachable_target}'
LOG.error(msg)

return install_targets_map

def remote_tidy(self) -> None:
"""Remove workflow contact files and keys from initialised remotes.
Call "cylc remote-tidy".
This method is called on workflow shutdown, so we want nothing to hang.
Timeout any incomplete commands after 10 seconds.
"""
# Get a list of all platforms used from workflow database:
platforms_used = (
self.db_mgr.get_pri_dao().select_task_job_platforms())
# For each install target compile a list of platforms:
install_targets = {
target for target, msg
in self.remote_init_map.items()
if msg == REMOTE_FILE_INSTALL_DONE
}
install_targets_map = self._get_remote_tidy_targets(
platforms_used, install_targets)

# Issue all SSH commands in parallel
queue: Deque[RemoteTidyQueueTuple] = deque()
for install_target, message in self.remote_init_map.items():
if message != REMOTE_FILE_INSTALL_DONE:
continue
for install_target, platforms in install_targets_map.items():
if install_target == get_localhost_install_target():
continue
try:
platform = get_random_platform_for_install_target(
install_target
)
cmd, host = self.construct_remote_tidy_ssh_cmd(platform)
except (NoHostsError, PlatformLookupError) as exc:
LOG.warning(
PlatformError(
f'{PlatformError.MSG_TIDY}\n{exc}',
platform['name'],
for platform in platforms:
try:
cmd, host = self.construct_remote_tidy_ssh_cmd(platform)
except NoHostsError as exc:
LOG.warning(
PlatformError(
f'{PlatformError.MSG_TIDY}\n{exc}',
platform['name'],
)
)
)
else:
log_platform_event('remote tidy', platform, host)
queue.append(
RemoteTidyQueueTuple(
platform,
host,
Popen( # nosec
cmd, stdout=PIPE, stderr=PIPE, stdin=DEVNULL,
text=True
) # * command constructed by internal interface
else:
log_platform_event('remote tidy', platform, host)
queue.append(
RemoteTidyQueueTuple(
platform,
host,
Popen( # nosec
cmd, stdout=PIPE, stderr=PIPE, stdin=DEVNULL,
text=True
) # * command constructed by internal interface
)
)
)
break
else:
LOG.error(
NoPlatformsError(
install_target, 'install target', 'remote tidy'))
# Wait for commands to complete for a max of 10 seconds
timeout = time() + 10.0
while queue and time() < timeout:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ platform: fake-platform - initialisation did not complete
platform: fake-platform - remote init (on localhost)
platform: fake-platform - remote file install (on localhost)
platform: fake-platform - initialisation did not complete
platform: fake-platform - remote tidy (on localhost)
__HERE__

purge
Expand Down
Loading

0 comments on commit ef235c3

Please sign in to comment.