From 62f5ef556a8c6d558041dace332130803634f0b8 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Fri, 21 Apr 2023 10:00:49 +0100 Subject: [PATCH] Fix.platform is regex remote tidy fail (#5445) platforms: fix issue with remote_tidy and platforms with hosts defined by regex patterns Closes #5429 --- CHANGES.md | 3 + cylc/flow/exceptions.py | 25 ++- cylc/flow/platforms.py | 50 ++---- cylc/flow/task_job_mgr.py | 2 +- cylc/flow/task_remote_mgr.py | 111 ++++++++---- .../01-periodic-clear-badhosts.t | 1 + tests/integration/test_task_remote_mgr.py | 167 ++++++++++++++++++ tests/unit/test_platforms.py | 54 +----- tests/unit/test_task_remote_mgr.py | 111 +++++++++++- 9 files changed, 407 insertions(+), 117 deletions(-) create mode 100644 tests/integration/test_task_remote_mgr.py diff --git a/CHANGES.md b/CHANGES.md index 014dc887841..7644721d777 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -14,6 +14,9 @@ ones in. --> ### Fixes +[5445](https://github.com/cylc/cylc-flow/pull/5445) - Fix remote tidy + bug where install target is not explicit in platform definition. + [5398](https://github.com/cylc/cylc-flow/pull/5398) - Fix platform from group selection order bug. diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index 073c8261b29..dd59fb9eec8 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -436,12 +436,29 @@ def __str__(self): class NoPlatformsError(PlatformLookupError): - """None of the platforms of a given group were reachable.""" - def __init__(self, platform_group): - self.platform_group = platform_group + """None of the platforms of a given set were reachable. + + Args: + identity: The name of the platform group or install target + set_type: Whether the set of platforms is a platform group or an + install target + place: Where the attempt to get the platform failed. + """ + def __init__( + self, identity: str, set_type: str = 'group', place: str = '' + ): + self.identity = identity + self.type = set_type + if place: + self.place = f' during {place}.' + else: + self.place = '.' def __str__(self): - return f'Unable to find a platform from group {self.platform_group}.' + return ( + f'Unable to find a platform from {self.type} {self.identity}' + f'{self.place}' + ) class CylcVersionError(CylcError): diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py index 86806bd26b7..d4fc812187c 100644 --- a/cylc/flow/platforms.py +++ b/cylc/flow/platforms.py @@ -20,7 +20,8 @@ import re from copy import deepcopy from typing import ( - TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Union, overload + TYPE_CHECKING, Any, Dict, Iterable, + List, Optional, Set, Union, overload ) from cylc.flow import LOG @@ -630,18 +631,29 @@ def get_install_target_from_platform(platform: Dict[str, Any]) -> str: def get_install_target_to_platforms_map( - platform_names: Iterable[str] + platform_names: Iterable[str], + quiet: bool = False ) -> Dict[str, List[Dict[str, Any]]]: """Get a dictionary of unique install targets and the platforms which use them. Args: platform_names: List of platform names to look up in the global config. + quiet: Supress PlatformNotFound Errors Return {install_target_1: [platform_1_dict, platform_2_dict, ...], ...} """ platform_names = set(platform_names) - platforms = [platform_from_name(p_name) for p_name in platform_names] + platforms: List[Dict[str, Any]] = [] + for p_name in platform_names: + try: + platform = platform_from_name(p_name) + except PlatformLookupError as exc: + if not quiet: + raise exc + else: + platforms.append(platform) + install_targets = { get_install_target_from_platform(platform) for platform in platforms @@ -667,38 +679,6 @@ def is_platform_with_target_in_list( ) -def get_all_platforms_for_install_target( - install_target: str -) -> List[Dict[str, Any]]: - """Return list of platform dictionaries for given install target.""" - platforms: List[Dict[str, Any]] = [] - all_platforms = glbl_cfg(cached=True).get(['platforms'], sparse=False) - for k, v in all_platforms.iteritems(): # noqa: B301 (iteritems valid here) - if (v.get('install target', k) == install_target): - v_copy = deepcopy(v) - v_copy['name'] = k - platforms.append(v_copy) - return platforms - - -def get_random_platform_for_install_target( - install_target: str -) -> Dict[str, Any]: - """Return a randomly selected platform (dict) for given install target. - - Raises: - PlatformLookupError: We can't get a platform for this install target. - """ - platforms = get_all_platforms_for_install_target(install_target) - try: - return random.choice(platforms) # nosec (not crypto related) - except IndexError: - # No platforms to choose from - raise PlatformLookupError( - f'Could not select platform for install target: {install_target}' - ) - - def get_localhost_install_target() -> str: """Returns the install target of localhost platform""" localhost = get_platform() diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 6c940d48842..358f3586847 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -155,7 +155,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr, self.bad_hosts = bad_hosts self.bad_hosts_to_clear = set() self.task_remote_mgr = TaskRemoteMgr( - workflow, proc_pool, self.bad_hosts) + workflow, proc_pool, self.bad_hosts, self.workflow_db_mgr) def check_task_jobs(self, workflow, task_pool): """Check submission and execution timeout and polling timers. diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index c38c2ce1e4c..56a51fe8677 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -34,11 +34,13 @@ from time import sleep, time from typing import ( Any, Deque, Dict, TYPE_CHECKING, List, - NamedTuple, Optional, Tuple + NamedTuple, Optional, Set, Tuple ) from cylc.flow import LOG -from cylc.flow.exceptions import PlatformError +from cylc.flow.exceptions import ( + PlatformError, PlatformLookupError, NoHostsError, NoPlatformsError +) import cylc.flow.flags from cylc.flow.network.client_factory import CommsMeth from cylc.flow.pathutil import ( @@ -50,12 +52,10 @@ from cylc.flow.platforms import ( HOST_REC_COMMAND, PLATFORM_REC_COMMAND, - NoHostsError, - PlatformLookupError, get_host_from_platform, get_install_target_from_platform, + get_install_target_to_platforms_map, get_localhost_install_target, - get_random_platform_for_install_target, log_platform_event, ) from cylc.flow.remote import construct_rsync_over_ssh_cmd, construct_ssh_cmd @@ -96,7 +96,7 @@ class RemoteTidyQueueTuple(NamedTuple): class TaskRemoteMgr: """Manage task remote initialisation, tidy, selection.""" - def __init__(self, workflow, proc_pool, bad_hosts): + def __init__(self, workflow, proc_pool, bad_hosts, db_mgr): self.workflow = workflow self.proc_pool = proc_pool # self.remote_command_map = {command: host|PlatformError|None} @@ -110,6 +110,7 @@ def __init__(self, workflow, proc_pool, bad_hosts): self.bad_hosts = bad_hosts self.is_reload = False self.is_restart = False + self.db_mgr = db_mgr def _subshell_eval( self, eval_str: str, command_pattern: re.Pattern @@ -300,6 +301,41 @@ def construct_remote_tidy_ssh_cmd( cmd = construct_ssh_cmd(cmd, platform, host, timeout='10s') return cmd, host + @staticmethod + def _get_remote_tidy_targets( + platform_names: List[str], + install_targets: Set[str] + ) -> Dict[str, List[Dict[str, Any]]]: + """Finds valid platforms for install targets, warns about in invalid + install targets. + + logs: + A list of install targets where no platform can be found. + + returns: + A mapping of install targets to valid platforms only where + platforms are available. + """ + if install_targets and not platform_names: + install_targets_map: Dict[str, List[Dict[str, Any]]] = { + t: [] for t in install_targets} + unreachable_targets = install_targets + else: + install_targets_map = get_install_target_to_platforms_map( + platform_names, quiet=True) + # If we couldn't find a platform for a target, we cannot tidy it - + # raise an Error: + unreachable_targets = install_targets.difference( + install_targets_map) + + if unreachable_targets: + msg = 'No platforms available to remote tidy install targets:' + for unreachable_target in unreachable_targets: + msg += f'\n * {unreachable_target}' + LOG.error(msg) + + return install_targets_map + def remote_tidy(self) -> None: """Remove workflow contact files and keys from initialised remotes. @@ -307,37 +343,50 @@ def remote_tidy(self) -> None: This method is called on workflow shutdown, so we want nothing to hang. Timeout any incomplete commands after 10 seconds. """ + # Get a list of all platforms used from workflow database: + platforms_used = ( + self.db_mgr.get_pri_dao().select_task_job_platforms()) + # For each install target compile a list of platforms: + install_targets = { + target for target, msg + in self.remote_init_map.items() + if msg == REMOTE_FILE_INSTALL_DONE + } + install_targets_map = self._get_remote_tidy_targets( + platforms_used, install_targets) + # Issue all SSH commands in parallel queue: Deque[RemoteTidyQueueTuple] = deque() - for install_target, message in self.remote_init_map.items(): - if message != REMOTE_FILE_INSTALL_DONE: - continue + for install_target, platforms in install_targets_map.items(): if install_target == get_localhost_install_target(): continue - try: - platform = get_random_platform_for_install_target( - install_target - ) - cmd, host = self.construct_remote_tidy_ssh_cmd(platform) - except (NoHostsError, PlatformLookupError) as exc: - LOG.warning( - PlatformError( - f'{PlatformError.MSG_TIDY}\n{exc}', - platform['name'], + for platform in platforms: + try: + cmd, host = self.construct_remote_tidy_ssh_cmd(platform) + except NoHostsError as exc: + LOG.warning( + PlatformError( + f'{PlatformError.MSG_TIDY}\n{exc}', + platform['name'], + ) ) - ) - else: - log_platform_event('remote tidy', platform, host) - queue.append( - RemoteTidyQueueTuple( - platform, - host, - Popen( # nosec - cmd, stdout=PIPE, stderr=PIPE, stdin=DEVNULL, - text=True - ) # * command constructed by internal interface + else: + log_platform_event('remote tidy', platform, host) + queue.append( + RemoteTidyQueueTuple( + platform, + host, + Popen( # nosec + cmd, stdout=PIPE, stderr=PIPE, stdin=DEVNULL, + text=True + ) # * command constructed by internal interface + ) ) - ) + break + else: + LOG.error( + NoPlatformsError( + install_target, 'install target', 'remote tidy')) # Wait for commands to complete for a max of 10 seconds timeout = time() + 10.0 while queue and time() < timeout: diff --git a/tests/functional/intelligent-host-selection/01-periodic-clear-badhosts.t b/tests/functional/intelligent-host-selection/01-periodic-clear-badhosts.t index 07dc1c4b487..fef25c44fe0 100644 --- a/tests/functional/intelligent-host-selection/01-periodic-clear-badhosts.t +++ b/tests/functional/intelligent-host-selection/01-periodic-clear-badhosts.t @@ -76,6 +76,7 @@ platform: fake-platform - initialisation did not complete platform: fake-platform - remote init (on localhost) platform: fake-platform - remote file install (on localhost) platform: fake-platform - initialisation did not complete +platform: fake-platform - remote tidy (on localhost) __HERE__ purge diff --git a/tests/integration/test_task_remote_mgr.py b/tests/integration/test_task_remote_mgr.py new file mode 100644 index 00000000000..0fd462115f7 --- /dev/null +++ b/tests/integration/test_task_remote_mgr.py @@ -0,0 +1,167 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import cylc +from cylc.flow.task_remote_mgr import ( + REMOTE_FILE_INSTALL_DONE, + REMOTE_FILE_INSTALL_FAILED +) + + +async def test_remote_tidy( + flow, + scheduler, + start, + mock_glbl_cfg, + one_conf, + monkeypatch +): + """Remote tidy gets platforms for install targets. + + In particular, referencing https://github.com/cylc/cylc-flow/issues/5429, + ensure that install targets defined implicitly by platform name are found. + + Mock remote init map: + - Include an install target (quiz) with + message != REMOTE_FILE_INSTALL_DONE to ensure that + this is picked out. + - Install targets where we can get a platform + - foo - Install target is implicitly the platfrom name. + - bar9 - The install target is implicitly the plaform name, + and the platform name matches a platform regex. + - baz - Install target is set explicitly. + - An install target (qux) where we cannot get a platform: Ensure + that we get the desired error. + + Test that platforms with no good hosts (no host not in bad hosts). + """ + # Monkeypatch away subprocess.Popen calls - prevent any interaction with + # remotes actually happening: + class MockProc: + def __init__(self, *args, **kwargs): + self.poll = lambda: True + if ( + 'baum' in args[0] + or 'bay' in args[0] + ): + self.returncode = 255 + else: + self.returncode = 0 + self.communicate = lambda: ('out', 'err') + + monkeypatch.setattr( + cylc.flow.task_remote_mgr, + 'Popen', + lambda *args, **kwargs: MockProc(*args, **kwargs) + ) + + # Monkeypath function to add a sort order which we don't need in the + # real code but rely to prevent the test becoming flaky: + def mock_get_install_target_platforms_map(*args, **kwargs): + """Add sort to original function to ensure test consistency""" + from cylc.flow.platforms import get_install_target_to_platforms_map + result = get_install_target_to_platforms_map(*args, **kwargs) + sorted_result = {} + for key in sorted(result): + sorted_result[key] = sorted( + result[key], key=lambda x: x['name'], reverse=True) + return sorted_result + + monkeypatch.setattr( + cylc.flow.task_remote_mgr, + 'get_install_target_to_platforms_map', + mock_get_install_target_platforms_map + ) + + # Set up global config + mock_glbl_cfg( + 'cylc.flow.platforms.glbl_cfg', + ''' + [platforms] + [[foo]] + # install target = foo (implicit) + # hosts = foo (implicit) + [[bar.]] + # install target = bar1 to bar9 (implicit) + # hosts = bar1 to bar9 (implicit) + [[baz]] + install target = baz + # baum and bay should be uncontactable: + hosts = baum, bay, baz + [[[selection]]] + method = definition order + [[notthisone]] + install target = baz + hosts = baum, bay + [[bay]] + ''', + ) + + # Get a scheduler: + id_ = flow(one_conf) + schd = scheduler(id_) + async with start(schd) as log: + # Write database with 6 tasks using 3 platforms: + platforms = ['baz', 'bar9', 'foo', 'notthisone', 'bay'] + line = r"('', '', {}, 0, 1, '', '', 0,'', '', '', 0, '', '{}', 4, '')" + stmt = r"INSERT INTO task_jobs VALUES" + r','.join([ + line.format(i, platform) for i, platform in enumerate(platforms) + ]) + schd.workflow_db_mgr.pri_dao.connect().execute(stmt) + schd.workflow_db_mgr.pri_dao.connect().commit() + + # Mock a remote init map. + schd.task_job_mgr.task_remote_mgr.remote_init_map = { + 'baz': REMOTE_FILE_INSTALL_DONE, # Should match platform baz + 'bar9': REMOTE_FILE_INSTALL_DONE, # Should match platform bar. + 'foo': REMOTE_FILE_INSTALL_DONE, # Should match plaform foo + 'qux': REMOTE_FILE_INSTALL_DONE, # Should not match a plaform + 'quiz': REMOTE_FILE_INSTALL_FAILED, # Should not be considered + 'bay': REMOTE_FILE_INSTALL_DONE, # Should return NoPlatforms + } + + # Clear the log, run the test: + log.clear() + schd.task_job_mgr.task_remote_mgr.bad_hosts.update(['baum', 'bay']) + schd.task_job_mgr.task_remote_mgr.remote_tidy() + pass + + records = [str(r.msg) for r in log.records] + + # We can't get qux, no defined platform has a matching install target: + qux_msg = 'No platforms available to remote tidy install targets:\n * qux' + assert qux_msg in records + + # We can get foo bar baz, and we try to remote tidy them. + # (This will ultimately fail, but past the point we are testing). + for target in ['foo', 'bar9', 'baz']: + msg = f'platform: {target} - remote tidy (on {target})' + assert msg in records + + # We haven't done anything with Quiz because we're only looking + # at cases where platform == REMOTE_FILE_INSTALL_DONE + assert not [r for r in records if 'quiz' in r] + + notthisone_msg = ( + 'platform: notthisone - clean up did not complete' + '\nUnable to find valid host for notthisone' + ) + assert notthisone_msg in records + + bay_msg = ( + 'Unable to find a platform from install target' + ' bay during remote tidy.') + assert bay_msg in records diff --git a/tests/unit/test_platforms.py b/tests/unit/test_platforms.py index 8bbe19ebbd4..68d08cb8d16 100644 --- a/tests/unit/test_platforms.py +++ b/tests/unit/test_platforms.py @@ -21,10 +21,9 @@ from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults from cylc.flow.platforms import ( - get_all_platforms_for_install_target, get_platform, get_platform_deprecated_settings, - get_random_platform_for_install_target, is_platform_definition_subshell, + is_platform_definition_subshell, platform_from_name, platform_name_from_job_info, get_install_target_from_platform, get_install_target_to_platforms_map, @@ -414,6 +413,7 @@ def test_get_install_target_from_platform(platform, expected): assert get_install_target_from_platform(platform) == expected +@pytest.mark.parametrize('quiet', [True, False]) @pytest.mark.parametrize( 'platform_names, expected_map, expected_err', [ @@ -450,14 +450,19 @@ def test_get_install_target_to_platforms_map( platform_names: List[str], expected_map: Dict[str, Any], expected_err: Type[Exception], - monkeypatch: pytest.MonkeyPatch): + quiet: bool, + monkeypatch: pytest.MonkeyPatch +): """Test that get_install_target_to_platforms_map works as expected.""" monkeypatch.setattr('cylc.flow.platforms.platform_from_name', lambda x: platform_from_name(x, PLATFORMS_TREK)) - if expected_err: + if expected_err and not quiet: with pytest.raises(expected_err): get_install_target_to_platforms_map(platform_names) + elif expected_err and quiet: + # No error should be raised in quiet mode. + assert get_install_target_to_platforms_map(platform_names, quiet=quiet) else: result = get_install_target_to_platforms_map(platform_names) # Sort the maps: @@ -511,47 +516,6 @@ def test_generic_items_match(platform, job, remote, expect): assert generic_items_match(platform, job, remote) == expect -def test_get_all_platforms_for_install_target(mock_glbl_cfg): - mock_glbl_cfg( - 'cylc.flow.platforms.glbl_cfg', - ''' - [platforms] - [[localhost]] - hosts = localhost - install target = localhost - [[olaf]] - hosts = snow, ice, sparkles - install target = arendelle - [[snow white]] - hosts = happy, sleepy, dopey - install target = forest - [[kristoff]] - hosts = anna, elsa, hans - install target = arendelle - [[belle]] - hosts = beast, maurice - install target = france - [[bambi]] - hosts = thumper, faline, flower - install target = forest - [[merida]] - hosts = angus, fergus - install target = forest - [[forest]] - hosts = fir, oak, elm - ''' - ) - actual = get_all_platforms_for_install_target('forest') - expected = ['snow white', 'bambi', 'merida', 'forest'] - for platform in actual: - assert platform['name'] in expected - arendelle_platforms = ['kristoff', 'olaf'] - assert get_random_platform_for_install_target( - 'arendelle')['name'] in arendelle_platforms - assert get_random_platform_for_install_target( - 'forest')['name'] not in arendelle_platforms - - @pytest.mark.parametrize( 'task_conf, expected', [ diff --git a/tests/unit/test_task_remote_mgr.py b/tests/unit/test_task_remote_mgr.py index bb7b8cea085..539349da3fe 100644 --- a/tests/unit/test_task_remote_mgr.py +++ b/tests/unit/test_task_remote_mgr.py @@ -112,7 +112,7 @@ def test_get_log_file_name(tmp_path: Path, install_target: str, load_type: Optional[str], expected: str): - task_remote_mgr = TaskRemoteMgr('some_workflow', None, None) + task_remote_mgr = TaskRemoteMgr('some_workflow', None, None, None) if load_type == 'restart': task_remote_mgr.is_restart = True elif load_type == 'reload': @@ -127,3 +127,112 @@ def test_get_log_file_name(tmp_path: Path, log_name = task_remote_mgr.get_log_file_name( install_target, install_log_dir=log_dir) assert log_name == expected + + +@pytest.mark.parametrize( + 'platform_names, install_targets, glblcfg, expect', + [ + pytest.param( + # Two platforms share an install target. Both are reachable. + ['sir_handel', 'peter_sam'], + ['mountain_railway'], + ''' + [platforms] + [[peter_sam, sir_handel]] + install target = mountain_railway + ''', + { + 'targets': {'mountain_railway': ['peter_sam', 'sir_handel']}, + 'unreachable': set() + }, + id='basic' + ), + pytest.param( + # Two platforms share an install target. Both are unreachable. + set(), + ['mountain_railway'], + ''' + [platforms] + [[peter_sam, sir_handel]] + install target = mountain_railway + ''', + { + 'targets': {'mountain_railway': []}, + 'unreachable': {'mountain_railway'} + }, + id='platform_unreachable' + ), + pytest.param( + # One of our install targets matches one of our platforms, + # but only implicitly; i.e. the platform name is the same as the + # install target name. + ['sir_handel'], + ['sir_handel'], + ''' + [platforms] + [[sir_handel]] + ''', + { + 'targets': {'sir_handel': ['sir_handel']}, + 'unreachable': set() + }, + id='implicit-target' + ), + pytest.param( + # One of our install targets matches one of our platforms, + # but only implicitly, and the platform name is defined using a + # regex. + ['sir_handel42'], + ['sir_handel42'], + ''' + [platforms] + [[sir_handel..]] + ''', + { + 'targets': {'sir_handel42': ['sir_handel42']}, + 'unreachable': set() + }, + id='implicit-target-regex' + ), + pytest.param( + # One of our install targets (rusty) has no defined platforms + # causing a PlatformLookupError. + ['duncan', 'rusty'], + ['mountain_railway', 'rusty'], + ''' + [platforms] + [[duncan]] + install target = mountain_railway + ''', + { + 'targets': {'mountain_railway': ['duncan']}, + 'unreachable': {'rusty'} + }, + id='PlatformLookupError' + ) + ] +) +def test_map_platforms_used_for_install_targets( + mock_glbl_cfg, + platform_names, install_targets, glblcfg, expect, caplog +): + def flatten_install_targets_map(itm): + result = {} + for target, platforms in itm.items(): + result[target] = sorted([p['name'] for p in platforms]) + return result + + mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', glblcfg) + + install_targets_map = TaskRemoteMgr._get_remote_tidy_targets( + set(platform_names), set(install_targets)) + + assert ( + expect['targets'] == flatten_install_targets_map(install_targets_map)) + + if expect['unreachable']: + for unreachable in expect["unreachable"]: + assert ( + unreachable in caplog.records[0].msg) + else: + assert not caplog.records