import re
import sys
import textwrap
import os
import unittest
from dataclasses import dataclass
from functools import cache
from test import support
from test.support.script_helper import run_python_until_end
_strace_binary = "/usr/bin/strace"
_syscall_regex = re.compile(
r"(?P<syscall>[^(]*)\((?P<args>[^)]*)\)\s*[=]\s*(?P<returncode>.+)")
_returncode_regex = re.compile(
br"\+\+\+ exited with (?P<returncode>\d+) \+\+\+")
@dataclass
class StraceEvent:
syscall: str
args: list[str]
returncode: str
@dataclass
class StraceResult:
strace_returncode: int
python_returncode: int
"""The event messages generated by strace. This is very similar to the
stderr strace produces with returncode marker section removed."""
event_bytes: bytes
stdout: bytes
stderr: bytes
def events(self):
"""Parse event_bytes data into system calls for easier processing.
This assumes the program under inspection doesn't print any non-utf8
strings which would mix into the strace output."""
decoded_events = self.event_bytes.decode('utf-8')
matches = [
_syscall_regex.match(event)
for event in decoded_events.splitlines()
]
return [
StraceEvent(match["syscall"],
[arg.strip() for arg in (match["args"].split(","))],
match["returncode"]) for match in matches if match
]
def sections(self):
"""Find all "MARK <X>" writes and use them to make groups of events.
This is useful to avoid variable / overhead events, like those at
interpreter startup or when opening a file so a test can verify just
the small case under study."""
current_section = "__startup"
sections = {current_section: []}
for event in self.events():
if event.syscall == 'write' and len(
event.args) > 2 and event.args[1].startswith("\"MARK "):
# Found a new section, don't include the write in the section
# but all events until next mark should be in that section
current_section = event.args[1].split(
" ", 1)[1].removesuffix('\\n"')
if current_section not in sections:
sections[current_section] = list()
else:
sections[current_section].append(event)
return sections
def _filter_memory_call(call):
# mmap can operate on a fd or "MAP_ANONYMOUS" which gives a block of memory.
# Ignore "MAP_ANONYMOUS + the "MAP_ANON" alias.
if call.syscall == "mmap" and "MAP_ANON" in call.args[3]:
return True
if call.syscall in ("munmap", "mprotect"):
return True
return False
def filter_memory(syscalls):
"""Filter out memory allocation calls from File I/O calls.
Some calls (mmap, munmap, etc) can be used on files or to just get a block
of memory. Use this function to filter out the memory related calls from
other calls."""
return [call for call in syscalls if not _filter_memory_call(call)]
@support.requires_subprocess()
def strace_python(code, strace_flags, check=True):
"""Run strace and return the trace.
Sets strace_returncode and python_returncode to `-1` on error."""
res = None
def _make_error(reason, details):
return StraceResult(
strace_returncode=-1,
python_returncode=-1,
event_bytes=f"error({reason},details={details}) = -1".encode('utf-8'),
stdout=res.out if res else b"",
stderr=res.err if res else b"")
# Run strace, and get out the raw text
try:
res, cmd_line = run_python_until_end(
"-c",
textwrap.dedent(code),
__run_using_command=[_strace_binary] + strace_flags,
)
except OSError as err:
return _make_error("Caught OSError", err)
if check and res.rc:
res.fail(cmd_line)
# Get out program returncode
stripped = res.err.strip()
output = stripped.rsplit(b"\n", 1)
if len(output) != 2:
return _make_error("Expected strace events and exit code line",
stripped[-50:])
returncode_match = _returncode_regex.match(output[1])
if not returncode_match:
return _make_error("Expected to find returncode in last line.",
output[1][:50])
python_returncode = int(returncode_match["returncode"])
if check and python_returncode:
res.fail(cmd_line)
return StraceResult(strace_returncode=res.rc,
python_returncode=python_returncode,
event_bytes=output[0],
stdout=res.out,
stderr=res.err)
def get_events(code, strace_flags, prelude, cleanup):
# NOTE: The flush is currently required to prevent the prints from getting
# buffered and done all at once at exit
prelude = textwrap.dedent(prelude)
code = textwrap.dedent(code)
cleanup = textwrap.dedent(cleanup)
to_run = f"""
print("MARK prelude", flush=True)
{prelude}
print("MARK code", flush=True)
{code}
print("MARK cleanup", flush=True)
{cleanup}
print("MARK __shutdown", flush=True)
"""
trace = strace_python(to_run, strace_flags)
all_sections = trace.sections()
return all_sections['code']
def get_syscalls(code, strace_flags, prelude="", cleanup="",
ignore_memory=True):
"""Get the syscalls which a given chunk of python code generates"""
events = get_events(code, strace_flags, prelude=prelude, cleanup=cleanup)
if ignore_memory:
events = filter_memory(events)
return [ev.syscall for ev in events]
# Moderately expensive (spawns a subprocess), so share results when possible.
@cache
def _can_strace():
res = strace_python("import sys; sys.exit(0)", [], check=False)
assert res.events(), "Should have parsed multiple calls"
return res.strace_returncode == 0 and res.python_returncode == 0
def requires_strace():
if sys.platform != "linux":
return unittest.skip("Linux only, requires strace.")
if "LD_PRELOAD" in os.environ:
# Distribution packaging (ex. Debian `fakeroot` and Gentoo `sandbox`)
# use LD_PRELOAD to intercept system calls, which changes the overall
# set of system calls which breaks tests expecting a specific set of
# system calls).
return unittest.skip("Not supported when LD_PRELOAD is intercepting system calls.")
if support.check_sanitizer(address=True, memory=True):
return unittest.skip("LeakSanitizer does not work under ptrace (strace, gdb, etc)")
return unittest.skipUnless(_can_strace(), "Requires working strace")
__all__ = ["filter_memory", "get_events", "get_syscalls", "requires_strace",
"strace_python", "StraceEvent", "StraceResult"]