Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-120754: Add a strace helper and test set of syscalls for open().read(), Take 2 #123413

Merged
merged 12 commits into from
Nov 3, 2024
Merged
170 changes: 170 additions & 0 deletions Lib/test/support/strace_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import re
import sys
import textwrap
import unittest
from dataclasses import dataclass
from functools import cache
from test import support
from test.support.script_helper import run_python_until_end

_strace_binary = "/usr/bin/strace"
_syscall_regex = re.compile(
r"(?P<syscall>[^(]*)\((?P<args>[^)]*)\)\s*[=]\s*(?P<returncode>.+)")
_returncode_regex = re.compile(
br"\+\+\+ exited with (?P<returncode>\d+) \+\+\+")


@dataclass
class StraceEvent:
syscall: str
args: list[str]
returncode: str


@dataclass
class StraceResult:
strace_returncode: int
python_returncode: int

"""The event messages generated by strace. This is very similar to the
stderr strace produces with returncode marker section removed."""
event_bytes: bytes
stdout: bytes
stderr: bytes

def events(self):
"""Parse event_bytes data into system calls for easier processing.

This assumes the program under inspection doesn't print any non-utf8
strings which would mix into the strace output."""
decoded_events = self.event_bytes.decode('utf-8')
matches = [
_syscall_regex.match(event)
for event in decoded_events.splitlines()
]
return [
StraceEvent(match["syscall"],
[arg.strip() for arg in (match["args"].split(","))],
match["returncode"]) for match in matches if match
]

def sections(self):
"""Find all "MARK <X>" writes and use them to make groups of events.

This is useful to avoid variable / overhead events, like those at
interpreter startup or when opening a file so a test can verify just
the small case under study."""
current_section = "__startup"
sections = {current_section: []}
for event in self.events():
if event.syscall == 'write' and len(
event.args) > 2 and event.args[1].startswith("\"MARK "):
# Found a new section, don't include the write in the section
# but all events until next mark should be in that section
current_section = event.args[1].split(
" ", 1)[1].removesuffix('\\n"')
if current_section not in sections:
sections[current_section] = list()
else:
sections[current_section].append(event)

return sections


@support.requires_subprocess()
def strace_python(code, strace_flags, check=True):
"""Run strace and return the trace.

Sets strace_returncode and python_returncode to `-1` on error."""
res = None

def _make_error(reason, details):
return StraceResult(
strace_returncode=-1,
python_returncode=-1,
event_bytes=f"error({reason},details={details}) = -1".encode('utf-8'),
stdout=res.out if res else "",
cmaloney marked this conversation as resolved.
Show resolved Hide resolved
stderr=res.err if res else "")

# Run strace, and get out the raw text
try:
res, cmd_line = run_python_until_end(
"-c",
textwrap.dedent(code),
__run_using_command=[_strace_binary] + strace_flags)
except OSError as err:
return _make_error("Caught OSError", err)

if check and res.rc:
res.fail(cmd_line)

# Get out program returncode
stripped = res.err.strip()
output = stripped.rsplit(b"\n", 1)
if len(output) != 2:
return _make_error("Expected strace events and exit code line",
stripped[-50:])

returncode_match = _returncode_regex.match(output[1])
if not returncode_match:
return _make_error("Expected to find returncode in last line.",
output[1][:50])

python_returncode = int(returncode_match["returncode"])
if check and python_returncode:
res.fail(cmd_line)

return StraceResult(strace_returncode=res.rc,
python_returncode=python_returncode,
event_bytes=output[0],
stdout=res.out,
stderr=res.err)


def get_events(code, strace_flags, prelude, cleanup):
# NOTE: The flush is currently required to prevent the prints from getting
# buffered and done all at once at exit
prelude = textwrap.dedent(prelude)
code = textwrap.dedent(code)
cleanup = textwrap.dedent(cleanup)
to_run = f"""
print("MARK prelude", flush=True)
{prelude}
print("MARK code", flush=True)
{code}
print("MARK cleanup", flush=True)
{cleanup}
print("MARK __shutdown", flush=True)
"""
trace = strace_python(to_run, strace_flags)
all_sections = trace.sections()
return all_sections['code']


def get_syscalls(code, strace_flags, prelude="", cleanup=""):
"""Get the syscalls which a given chunk of python code generates"""
events = get_events(code, strace_flags, prelude=prelude, cleanup=cleanup)
return [ev.syscall for ev in events]


# Moderately expensive (spawns a subprocess), so share results when possible.
@cache
def _can_strace():
res = strace_python("import sys; sys.exit(0)", [], check=False)
assert res.events(), "Should have parsed multiple calls"

return res.strace_returncode == 0 and res.python_returncode == 0


def requires_strace():
if sys.platform != "linux":
return unittest.skip("Linux only, requires strace.")

if support.check_sanitizer(address=True, memory=True):
return unittest.skip("LeakSanitizer does not work under ptrace (strace, gdb, etc)")

return unittest.skipUnless(_can_strace(), "Requires working strace")


__all__ = ["get_events", "get_syscalls", "requires_strace", "strace_python",
"StraceEvent", "StraceResult"]
138 changes: 137 additions & 1 deletion Lib/test/test_fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from test.support import (
cpython_only, swap_attr, gc_collect, is_emscripten, is_wasi,
infinite_recursion,
infinite_recursion, strace_helper
)
from test.support.os_helper import (
TESTFN, TESTFN_ASCII, TESTFN_UNICODE, make_bad_fd,
Expand All @@ -24,6 +24,9 @@
import _pyio # Python implementation of io


_strace_flags=["--trace=%file,%desc"]


class AutoFileTests:
# file tests for which a test file is automatically set up

Expand Down Expand Up @@ -359,6 +362,139 @@ def testErrnoOnClosedReadinto(self, f):
a = array('b', b'x'*10)
f.readinto(a)

@strace_helper.requires_strace()
def test_syscalls_read(self):
"""Check that the set of system calls produced by the I/O stack is what
is expected for various read cases.

It's expected as bits of the I/O implementation change, this will need
to change. The goal is to catch changes that unintentionally add
additional systemcalls (ex. additional calls have been looked at in
bpo-21679 and gh-120754).
"""
self.f.write(b"Hello, World!")
self.f.close()


def check_readall(name, code, prelude="", cleanup="",
extra_checks=None):
with self.subTest(name=name):
syscalls = strace_helper.get_events(code, _strace_flags,
prelude=prelude,
cleanup=cleanup)

# The first call should be an open that returns a
# file descriptor (fd). Afer that calls may vary. Once the file
# is opened, check calls refer to it by fd as the filename
# could be removed from the filesystem, renamed, etc. See:
# Time-of-check time-of-use (TOCTOU) software bug class.
#
# There are a number of related but distinct open system calls
# so not checking precise name here.
self.assertGreater(
len(syscalls),
1,
f"Should have had at least an open call|calls={syscalls}")
fd_str = syscalls[0].returncode

# All other calls should contain the fd in their argument set.
for ev in syscalls[1:]:
self.assertIn(
fd_str,
ev.args,
f"Looking for file descriptor in arguments|ev={ev}"
)

# There are a number of related syscalls used to implement
# behaviors in a libc (ex. fstat, newfstatat, statx, open, openat).
# Allow any that use the same substring.
def count_similarname(name):
return len([ev for ev in syscalls if name in ev.syscall])

checks = [
# Should open and close the file exactly once
("open", 1),
("close", 1),
# There should no longer be an isatty call (All files being
# tested are block devices / not character devices).
('ioctl', 0),
# Should only have one fstat (bpo-21679, gh-120754)
# note: It's important this uses a fd rather than filename,
# That is validated by the `fd` check above.
# note: fstat, newfstatat, and statx have all been observed
# here in the underlying C library implementations.
("stat", 1)
]

if extra_checks:
checks += extra_checks

for call, count in checks:
self.assertEqual(
count_similarname(call),
count,
msg=f"call={call}|count={count}|syscalls={syscalls}"
)

# "open, read, close" file using different common patterns.
check_readall(
"open builtin with default options",
f"""
f = open('{TESTFN}')
f.read()
f.close()
"""
)

check_readall(
"open in binary mode",
f"""
f = open('{TESTFN}', 'rb')
f.read()
f.close()
"""
)

check_readall(
"open in text mode",
f"""
f = open('{TESTFN}', 'rt')
f.read()
f.close()
""",
# GH-122111: read_text uses BufferedIO which requires looking up
# position in file. `read_bytes` disables that buffering and avoids
# these calls which is tested the `pathlib read_bytes` case.
extra_checks=[("seek", 1)]
)

check_readall(
"pathlib read_bytes",
"p.read_bytes()",
prelude=f"""from pathlib import Path; p = Path("{TESTFN}")""",
# GH-122111: Buffering is disabled so these calls are avoided.
extra_checks=[("seek", 0)]
)

check_readall(
"pathlib read_text",
"p.read_text()",
prelude=f"""from pathlib import Path; p = Path("{TESTFN}")"""
)

# Focus on just `read()`.
calls = strace_helper.get_syscalls(
prelude=f"f = open('{TESTFN}')",
code="f.read()",
cleanup="f.close()",
strace_flags=_strace_flags
)
# One to read all the bytes
# One to read the EOF and get a size 0 return.
self.assertEqual(calls.count("read"), 2)



class CAutoFileTests(AutoFileTests, unittest.TestCase):
FileIO = _io.FileIO
modulename = '_io'
Expand Down
Loading
Loading