Skip to content

Commit

Permalink
wip: lint,mypy,black
Browse files Browse the repository at this point in the history
  • Loading branch information
nedbat committed Jan 3, 2025
1 parent 1122561 commit eb5d728
Showing 1 changed file with 73 additions and 47 deletions.
120 changes: 73 additions & 47 deletions coverage/sysmon.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from typing import (
Any,
Callable,
Iterable,
NewType,
Optional,
cast,
Expand Down Expand Up @@ -57,15 +58,18 @@
DISABLE_TYPE = NewType("DISABLE_TYPE", object)
MonitorReturn = Optional[DISABLE_TYPE]
DISABLE = cast(MonitorReturn, getattr(sys_monitoring, "DISABLE", None))
TOffset = int

ALWAYS_JUMPS = {
dis.opmap[name] for name in
["JUMP_FORWARD", "JUMP_BACKWARD", "JUMP_BACKWARD_NO_INTERRUPT"]
}
ALWAYS_JUMPS: set[int] = set()
RETURNS: set[int] = set()

RETURNS = {
dis.opmap[name] for name in ["RETURN_VALUE", "RETURN_GENERATOR"]
}
if env.PYBEHAVIOR.branch_right_left:
ALWAYS_JUMPS.update(
dis.opmap[name]
for name in ["JUMP_FORWARD", "JUMP_BACKWARD", "JUMP_BACKWARD_NO_INTERRUPT"]
)

RETURNS.update(dis.opmap[name] for name in ["RETURN_VALUE", "RETURN_GENERATOR"])


if LOG: # pragma: debugging
Expand Down Expand Up @@ -175,16 +179,26 @@ def _decorator(meth: AnyCallable) -> AnyCallable:


class InstructionWalker:
def __init__(self, code: CodeType):
"""Utility to step through trails of instructions."""

def __init__(self, code: CodeType) -> None:
self.code = code
self.insts: dict[int, dis.Instruction] = {}
self.insts: dict[TOffset, dis.Instruction] = {}

inst = None
for inst in dis.get_instructions(code):
self.insts[inst.offset] = inst

assert inst is not None
self.max_offset = inst.offset

def walk(self, *, start_at=0, follow_jumps=True):
def walk(
self, *, start_at: TOffset = 0, follow_jumps: bool = True
) -> Iterable[dis.Instruction]:
"""
Yield instructions starting from `start_at`. Follow unconditional
jumps if `follow_jumps` is true.
"""
seen = set()
offset = start_at
while offset < self.max_offset + 1:
Expand All @@ -199,52 +213,57 @@ def walk(self, *, start_at=0, follow_jumps=True):
offset += 2


def populate_branch_trails(code: CodeType, code_info: CodeInfo) -> tuple[list[int], TArc | None]:
def populate_branch_trails(code: CodeType, code_info: CodeInfo) -> None:
"""
Populate the `branch_trails` attribute on `code_info`.
"""
iwalker = InstructionWalker(code)
for inst in iwalker.walk(follow_jumps=False):
log(f"considering {inst=}")
if not inst.jump_target:
log(f"no jump_target")
log("no jump_target")
continue
if inst.opcode in ALWAYS_JUMPS:
log(f"always jumps")
log("always jumps")
continue

from_line = inst.line_number
assert from_line is not None

def walkabout(start_at, branch_kind):
insts = []
def walk_one_branch(
start_at: TOffset, branch_kind: str
) -> tuple[list[TOffset], TArc | None]:
# pylint: disable=cell-var-from-loop
inst_offsets: list[TOffset] = []
to_line = None
for inst2 in iwalker.walk(start_at=start_at):
insts.append(inst2.offset)
inst_offsets.append(inst2.offset)
if inst2.line_number and inst2.line_number != from_line:
to_line = inst2.line_number
break
elif inst2.jump_target and (inst2.opcode not in ALWAYS_JUMPS):
log(f"stop: {inst2.jump_target=}, {inst2.opcode=} ({dis.opname[inst2.opcode]}), {ALWAYS_JUMPS=}")
log(
f"stop: {inst2.jump_target=}, "
+ f"{inst2.opcode=} ({dis.opname[inst2.opcode]}), "
+ f"{ALWAYS_JUMPS=}"
)
break
elif inst2.opcode in RETURNS:
to_line = -code.co_firstlineno
break
# if to_line is None:
# import contextlib
# with open("/tmp/foo.out", "a") as f:
# with contextlib.redirect_stdout(f):
# print()
# print(f"{code = }")
# print(f"{from_line = }, {to_line = }, {start_at = }")
# dis.dis(code)
# 1/0
if to_line is not None:
log(f"possible branch from @{start_at}: {insts}, {(from_line, to_line)} {code}")
return insts, (from_line, to_line)
log(
f"possible branch from @{start_at}: "
+ f"{inst_offsets}, {(from_line, to_line)} {code}"
)
return inst_offsets, (from_line, to_line)
else:
log(f" no possible branch from @{start_at}: {insts}")
log(f" no possible branch from @{start_at}: {inst_offsets}")
return [], None

code_info.branch_trails[inst.offset] = (
walkabout(start_at=inst.offset + 2, branch_kind="not-taken"),
walkabout(start_at=inst.jump_target, branch_kind="taken"),
walk_one_branch(start_at=inst.offset + 2, branch_kind="not-taken"),
walk_one_branch(start_at=inst.jump_target, branch_kind="taken"),
)


Expand All @@ -254,7 +273,7 @@ class CodeInfo:

tracing: bool
file_data: TTraceFileData | None
byte_to_line: dict[int, int] | None
byte_to_line: dict[TOffset, TLineNo] | None
# Keys are start instruction offsets for branches.
# Values are two tuples:
# (
Expand All @@ -263,15 +282,15 @@ class CodeInfo:
# )
# Two possible trails from the branch point, left and right.
branch_trails: dict[
int,
TOffset,
tuple[
tuple[list[int], TArc] | None,
tuple[list[int], TArc] | None,
]
tuple[list[TOffset], TArc | None],
tuple[list[TOffset], TArc | None],
],
]


def bytes_to_lines(code: CodeType) -> dict[int, int]:
def bytes_to_lines(code: CodeType) -> dict[TOffset, TLineNo]:
"""Make a dict mapping byte code offsets to line numbers."""
b2l = {}
for bstart, bend, lineno in code.co_lines():
Expand Down Expand Up @@ -335,15 +354,21 @@ def start(self) -> None:
sys_monitoring.use_tool_id(self.myid, "coverage.py")
register = functools.partial(sys_monitoring.register_callback, self.myid)
events = sys.monitoring.events
import contextlib

sys_monitoring.set_events(self.myid, events.PY_START)
register(events.PY_START, self.sysmon_py_start)
if self.trace_arcs:
register(events.PY_RETURN, self.sysmon_py_return)
register(events.LINE, self.sysmon_line_arcs)
register(events.BRANCH_RIGHT, self.sysmon_branch_either) # type:ignore[attr-defined]
register(events.BRANCH_LEFT, self.sysmon_branch_either) # type:ignore[attr-defined]
if env.PYBEHAVIOR.branch_right_left:
register(
events.BRANCH_RIGHT, # type:ignore[attr-defined]
self.sysmon_branch_either,
)
register(
events.BRANCH_LEFT, # type:ignore[attr-defined]
self.sysmon_branch_either,
)
else:
register(events.LINE, self.sysmon_line_lines)
sys_monitoring.restart_events()
Expand Down Expand Up @@ -385,7 +410,7 @@ def get_stats(self) -> dict[str, int] | None:

@panopticon("code", "@")
def sysmon_py_start( # pylint: disable=useless-return
self, code: CodeType, instruction_offset: int
self, code: CodeType, instruction_offset: TOffset
) -> MonitorReturn:
"""Handle sys.monitoring.events.PY_START events."""
# Entering a new frame. Decide if we should trace in this file.
Expand Down Expand Up @@ -433,7 +458,7 @@ def sysmon_py_start( # pylint: disable=useless-return
branch_trails={},
)
self.code_infos[id(code)] = code_info
populate_branch_trails(code, code_info) # TODO: should be a method?
populate_branch_trails(code, code_info) # TODO: should be a method?
self.code_objects.append(code)

if tracing_code:
Expand All @@ -445,7 +470,8 @@ def sysmon_py_start( # pylint: disable=useless-return
if self.trace_arcs:
assert env.PYBEHAVIOR.branch_right_left
local_events |= (
events.BRANCH_RIGHT | events.BRANCH_LEFT # type:ignore[attr-defined]
events.BRANCH_RIGHT # type:ignore[attr-defined]
| events.BRANCH_LEFT # type:ignore[attr-defined]
)
sys_monitoring.set_local_events(self.myid, code, local_events)
# 111963:
Expand All @@ -457,7 +483,7 @@ def sysmon_py_start( # pylint: disable=useless-return
def sysmon_py_return( # pylint: disable=useless-return
self,
code: CodeType,
instruction_offset: int,
instruction_offset: TOffset,
retval: object,
) -> MonitorReturn:
"""Handle sys.monitoring.events.PY_RETURN events for branch coverage."""
Expand All @@ -472,7 +498,7 @@ def sysmon_py_return( # pylint: disable=useless-return
return None

@panopticon("code", "line")
def sysmon_line_lines(self, code: CodeType, line_number: int) -> MonitorReturn:
def sysmon_line_lines(self, code: CodeType, line_number: TLineNo) -> MonitorReturn:
"""Handle sys.monitoring.events.LINE events for line coverage."""
code_info = self.code_infos[id(code)]
if code_info.file_data is not None:
Expand All @@ -481,7 +507,7 @@ def sysmon_line_lines(self, code: CodeType, line_number: int) -> MonitorReturn:
return DISABLE

@panopticon("code", "line")
def sysmon_line_arcs(self, code: CodeType, line_number: int) -> MonitorReturn:
def sysmon_line_arcs(self, code: CodeType, line_number: TLineNo) -> MonitorReturn:
"""Handle sys.monitoring.events.LINE events for branch coverage."""
code_info = self.code_infos[id(code)]
if code_info.file_data is not None:
Expand All @@ -492,7 +518,7 @@ def sysmon_line_arcs(self, code: CodeType, line_number: int) -> MonitorReturn:

@panopticon("code", "@", "@")
def sysmon_branch_either(
self, code: CodeType, instruction_offset: int, destination_offset: int
self, code: CodeType, instruction_offset: TOffset, destination_offset: TOffset
) -> MonitorReturn:
"""Handle BRANCH_RIGHT and BRANCH_LEFT events."""
code_info = self.code_infos[id(code)]
Expand Down

0 comments on commit eb5d728

Please sign in to comment.