diff --git a/changes.d/5821.fix.md b/changes.d/5821.fix.md new file mode 100644 index 00000000000..0c6c8b7918d --- /dev/null +++ b/changes.d/5821.fix.md @@ -0,0 +1 @@ +Fixed issue where large uncommitted changes could cause `cylc install` to hang. diff --git a/cylc/flow/install_plugins/log_vc_info.py b/cylc/flow/install_plugins/log_vc_info.py index d2379c5cc0d..29d861f7654 100644 --- a/cylc/flow/install_plugins/log_vc_info.py +++ b/cylc/flow/install_plugins/log_vc_info.py @@ -63,12 +63,22 @@ from pathlib import Path from subprocess import Popen, DEVNULL, PIPE from typing import ( - Any, Dict, Iterable, List, Optional, TYPE_CHECKING, TextIO, Union, overload + Any, + Dict, + Iterable, + List, + Optional, + TYPE_CHECKING, + TextIO, + Union, + overload, ) from cylc.flow import LOG as _LOG, LoggerAdaptor from cylc.flow.exceptions import CylcError import cylc.flow.flags +from cylc.flow.pipe_poller import pipe_poller +from cylc.flow.util import format_cmd from cylc.flow.workflow_files import WorkflowFiles if TYPE_CHECKING: @@ -171,7 +181,7 @@ def get_vc_info(path: Union[Path, str]) -> Optional[Dict[str, Any]]: ): LOG.debug(f"Source dir {path} is not a {vcs} repository") elif cylc.flow.flags.verbosity > -1: - LOG.warning(f"$ {vcs} {' '.join(args)}\n{exc}") + LOG.warning(f"$ {vcs} {format_cmd(args)}\n{exc}") continue info['version control system'] = vcs @@ -217,9 +227,7 @@ def _run_cmd( args: The args to pass to the version control command. cwd: Directory to run the command in. stdout: Where to redirect output (either PIPE or a - text stream/file object). Note: only use PIPE for - commands that will not generate a large output, otherwise - the pipe might get blocked. + text stream/file object). Returns: Stdout output if stdout=PIPE, else None as the output has been @@ -231,6 +239,7 @@ def _run_cmd( OSError: Non-zero return code for VCS command. """ cmd = [vcs, *args] + LOG.debug(f'$ {format_cmd(cmd)}') try: proc = Popen( # nosec cmd, @@ -245,13 +254,15 @@ def _run_cmd( # This will only be raised if the VCS command is not installed, # otherwise Popen() will succeed with a non-zero return code raise VCSNotInstalledError(vcs, exc) - ret_code = proc.wait() - out, err = proc.communicate() - if ret_code: + if stdout == PIPE: + out, err = pipe_poller(proc, proc.stdout, proc.stderr) + else: + out, err = proc.communicate() + if proc.returncode: if any(err.lower().startswith(msg) for msg in NO_BASE_ERRS[vcs]): # No base commit in repo raise VCSMissingBaseError(vcs, cwd) - raise OSError(ret_code, err) + raise OSError(proc.returncode, err) return out diff --git a/cylc/flow/pipe_poller.py b/cylc/flow/pipe_poller.py new file mode 100644 index 00000000000..d769f77c469 --- /dev/null +++ b/cylc/flow/pipe_poller.py @@ -0,0 +1,73 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Utility for preventing pipes from getting clogged up. + +If you're reading files from Popen (i.e. to extract command output) where the +command output has the potential to be long-ish, then you should use this +function to protect against the buffer filling up. + +Note, there is a more advanced version of this baked into the subprocpool. +""" + +from select import select + + +def pipe_poller(proc, *files, chunk_size=4096): + """Read from a process without hitting buffer issues. + + Standin for subprocess.Popen.communicate. + + When PIPE'ing from subprocesses, the output goes into a buffer. If the + buffer gets full, the subprocess will hang trying to write to it. + + This function polls the process, reading output from the buffers into + memory to prevent them from filling up. + + Args: + proc: + The process to poll. + files: + The files you want to read from, likely anything you've directed to + PIPE. + chunk_size: + The amount of text to read from the buffer on each pass. + + Returns: + tuple - The text read from each of the files in the order they were + specified. + + """ + _files = { + file: b'' if 'b' in getattr(file, 'mode', 'r') else '' + for file in files + } + + def _read(timeout=1.0): + # read any data from files + nonlocal chunk_size, files + for file in select(list(files), [], [], timeout)[0]: + buffer = file.read(chunk_size) + if len(buffer) > 0: + _files[file] += buffer + + while proc.poll() is None: + # read from the buffers + _read() + # double check the buffers now that the process has finished + _read(timeout=0.01) + + return tuple(_files.values()) diff --git a/tests/unit/test_pipe_poller.py b/tests/unit/test_pipe_poller.py new file mode 100644 index 00000000000..a41e9635c6b --- /dev/null +++ b/tests/unit/test_pipe_poller.py @@ -0,0 +1,33 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from subprocess import Popen, PIPE + +from cylc.flow.pipe_poller import pipe_poller + + +def test_pipe_poller_str(): + proc = Popen(['echo', 'Hello World!'], stdout=PIPE, text=True) + (stdout,) = pipe_poller(proc, proc.stdout) + assert proc.returncode == 0 + assert stdout == 'Hello World!\n' + + +def test_pipe_poller_bytes(): + proc = Popen(['echo', 'Hello World!'], stdout=PIPE, text=False) + (stdout,) = pipe_poller(proc, proc.stdout) + assert proc.returncode == 0 + assert stdout == b'Hello World!\n'