Skip to content

Commit

Permalink
log_vc_info: handle long command output (#5821)
Browse files Browse the repository at this point in the history
* Log the commands run by log_vc_info (debug level).
* Handle the risk of large command output clogging up the buffer causing
  commands to hang.
  • Loading branch information
oliver-sanders authored Nov 23, 2023
1 parent e692a06 commit 8b2feae
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 9 deletions.
1 change: 1 addition & 0 deletions changes.d/5821.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed issue where large uncommitted changes could cause `cylc install` to hang.
29 changes: 20 additions & 9 deletions cylc/flow/install_plugins/log_vc_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,22 @@
from pathlib import Path
from subprocess import Popen, DEVNULL, PIPE
from typing import (
Any, Dict, Iterable, List, Optional, TYPE_CHECKING, TextIO, Union, overload
Any,
Dict,
Iterable,
List,
Optional,
TYPE_CHECKING,
TextIO,
Union,
overload,
)

from cylc.flow import LOG as _LOG, LoggerAdaptor
from cylc.flow.exceptions import CylcError
import cylc.flow.flags
from cylc.flow.pipe_poller import pipe_poller
from cylc.flow.util import format_cmd
from cylc.flow.workflow_files import WorkflowFiles

if TYPE_CHECKING:
Expand Down Expand Up @@ -171,7 +181,7 @@ def get_vc_info(path: Union[Path, str]) -> Optional[Dict[str, Any]]:
):
LOG.debug(f"Source dir {path} is not a {vcs} repository")
elif cylc.flow.flags.verbosity > -1:
LOG.warning(f"$ {vcs} {' '.join(args)}\n{exc}")
LOG.warning(f"$ {vcs} {format_cmd(args)}\n{exc}")
continue

info['version control system'] = vcs
Expand Down Expand Up @@ -217,9 +227,7 @@ def _run_cmd(
args: The args to pass to the version control command.
cwd: Directory to run the command in.
stdout: Where to redirect output (either PIPE or a
text stream/file object). Note: only use PIPE for
commands that will not generate a large output, otherwise
the pipe might get blocked.
text stream/file object).
Returns:
Stdout output if stdout=PIPE, else None as the output has been
Expand All @@ -231,6 +239,7 @@ def _run_cmd(
OSError: Non-zero return code for VCS command.
"""
cmd = [vcs, *args]
LOG.debug(f'$ {format_cmd(cmd)}')
try:
proc = Popen( # nosec
cmd,
Expand All @@ -245,13 +254,15 @@ def _run_cmd(
# This will only be raised if the VCS command is not installed,
# otherwise Popen() will succeed with a non-zero return code
raise VCSNotInstalledError(vcs, exc)
ret_code = proc.wait()
out, err = proc.communicate()
if ret_code:
if stdout == PIPE:
out, err = pipe_poller(proc, proc.stdout, proc.stderr)
else:
out, err = proc.communicate()
if proc.returncode:
if any(err.lower().startswith(msg) for msg in NO_BASE_ERRS[vcs]):
# No base commit in repo
raise VCSMissingBaseError(vcs, cwd)
raise OSError(ret_code, err)
raise OSError(proc.returncode, err)
return out


Expand Down
73 changes: 73 additions & 0 deletions cylc/flow/pipe_poller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Utility for preventing pipes from getting clogged up.
If you're reading files from Popen (i.e. to extract command output) where the
command output has the potential to be long-ish, then you should use this
function to protect against the buffer filling up.
Note, there is a more advanced version of this baked into the subprocpool.
"""

from select import select


def pipe_poller(proc, *files, chunk_size=4096):
"""Read from a process without hitting buffer issues.
Standin for subprocess.Popen.communicate.
When PIPE'ing from subprocesses, the output goes into a buffer. If the
buffer gets full, the subprocess will hang trying to write to it.
This function polls the process, reading output from the buffers into
memory to prevent them from filling up.
Args:
proc:
The process to poll.
files:
The files you want to read from, likely anything you've directed to
PIPE.
chunk_size:
The amount of text to read from the buffer on each pass.
Returns:
tuple - The text read from each of the files in the order they were
specified.
"""
_files = {
file: b'' if 'b' in getattr(file, 'mode', 'r') else ''
for file in files
}

def _read(timeout=1.0):
# read any data from files
nonlocal chunk_size, files
for file in select(list(files), [], [], timeout)[0]:
buffer = file.read(chunk_size)
if len(buffer) > 0:
_files[file] += buffer

while proc.poll() is None:
# read from the buffers
_read()
# double check the buffers now that the process has finished
_read(timeout=0.01)

return tuple(_files.values())
33 changes: 33 additions & 0 deletions tests/unit/test_pipe_poller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from subprocess import Popen, PIPE

from cylc.flow.pipe_poller import pipe_poller


def test_pipe_poller_str():
proc = Popen(['echo', 'Hello World!'], stdout=PIPE, text=True)
(stdout,) = pipe_poller(proc, proc.stdout)
assert proc.returncode == 0
assert stdout == 'Hello World!\n'


def test_pipe_poller_bytes():
proc = Popen(['echo', 'Hello World!'], stdout=PIPE, text=False)
(stdout,) = pipe_poller(proc, proc.stdout)
assert proc.returncode == 0
assert stdout == b'Hello World!\n'

0 comments on commit 8b2feae

Please sign in to comment.