import os
import random
import re
import shlex
import sys
import sysconfig
import time
import trace
from test.support import (os_helper, MS_WINDOWS, flush_std_streams,
suppress_immortalization)
from .cmdline import _parse_args, Namespace
from .findtests import findtests, split_test_packages, list_cases
from .logger import Logger
from .pgo import setup_pgo_tests
from .result import State, TestResult
from .results import TestResults, EXITCODE_INTERRUPTED
from .runtests import RunTests, HuntRefleak
from .setup import setup_process, setup_test_dir
from .single import run_single_test, PROGRESS_MIN_TIME
from .tsan import setup_tsan_tests
from .utils import (
StrPath, StrJSON, TestName, TestList, TestTuple, TestFilter,
strip_py_suffix, count, format_duration,
printlist, get_temp_dir, get_work_dir, exit_timeout,
display_header, cleanup_temp_dir, print_warning,
is_cross_compiled, get_host_runner,
EXIT_TIMEOUT)
class Regrtest:
"""Execute a test suite.
This also parses command-line options and modifies its behavior
accordingly.
tests -- a list of strings containing test names (optional)
testdir -- the directory in which to look for tests (optional)
Users other than the Python test suite will certainly want to
specify testdir; if it's omitted, the directory containing the
Python test suite is searched for.
If the tests argument is omitted, the tests listed on the
command-line will be used. If that's empty, too, then all *.py
files beginning with test_ will be used.
The other default arguments (verbose, quiet, exclude,
single, randomize, use_resources, trace, coverdir,
print_slow, and random_seed) allow programmers calling main()
directly to set the values that would normally be set by flags
on the command line.
"""
def __init__(self, ns: Namespace, _add_python_opts: bool = False):
# Log verbosity
self.verbose: int = int(ns.verbose)
self.quiet: bool = ns.quiet
self.pgo: bool = ns.pgo
self.pgo_extended: bool = ns.pgo_extended
self.tsan: bool = ns.tsan
# Test results
self.results: TestResults = TestResults()
self.first_state: str | None = None
# Logger
self.logger = Logger(self.results, self.quiet, self.pgo)
# Actions
self.want_header: bool = ns.header
self.want_list_tests: bool = ns.list_tests
self.want_list_cases: bool = ns.list_cases
self.want_wait: bool = ns.wait
self.want_cleanup: bool = ns.cleanup
self.want_rerun: bool = ns.rerun
self.want_run_leaks: bool = ns.runleaks
self.want_bisect: bool = ns.bisect
self.ci_mode: bool = (ns.fast_ci or ns.slow_ci)
self.want_add_python_opts: bool = (_add_python_opts
and ns._add_python_opts)
# Select tests
self.match_tests: TestFilter = ns.match_tests
self.exclude: bool = ns.exclude
self.fromfile: StrPath | None = ns.fromfile
self.starting_test: TestName | None = ns.start
self.cmdline_args: TestList = ns.args
# Workers
self.single_process: bool = ns.single_process
if self.single_process or ns.use_mp is None:
num_workers = 0 # run sequentially in a single process
elif ns.use_mp <= 0:
num_workers = -1 # run in parallel, use the number of CPUs
else:
num_workers = ns.use_mp # run in parallel
self.num_workers: int = num_workers
self.worker_json: StrJSON | None = ns.worker_json
# Options to run tests
self.fail_fast: bool = ns.failfast
self.fail_env_changed: bool = ns.fail_env_changed
self.fail_rerun: bool = ns.fail_rerun
self.forever: bool = ns.forever
self.output_on_failure: bool = ns.verbose3
self.timeout: float | None = ns.timeout
if ns.huntrleaks:
warmups, runs, filename = ns.huntrleaks
filename = os.path.abspath(filename)
self.hunt_refleak: HuntRefleak | None = HuntRefleak(warmups, runs, filename)
else:
self.hunt_refleak = None
self.test_dir: StrPath | None = ns.testdir
self.junit_filename: StrPath | None = ns.xmlpath
self.memory_limit: str | None = ns.memlimit
self.gc_threshold: int | None = ns.threshold
self.use_resources: tuple[str, ...] = tuple(ns.use_resources)
if ns.python:
self.python_cmd: tuple[str, ...] | None = tuple(ns.python)
else:
self.python_cmd = None
self.coverage: bool = ns.trace
self.coverage_dir: StrPath | None = ns.coverdir
self.tmp_dir: StrPath | None = ns.tempdir
# Randomize
self.randomize: bool = ns.randomize
if ('SOURCE_DATE_EPOCH' in os.environ
# don't use the variable if empty
and os.environ['SOURCE_DATE_EPOCH']
):
self.randomize = False
# SOURCE_DATE_EPOCH should be an integer, but use a string to not
# fail if it's not integer. random.seed() accepts a string.
# https://reproducible-builds.org/docs/source-date-epoch/
self.random_seed: int | str = os.environ['SOURCE_DATE_EPOCH']
elif ns.random_seed is None:
self.random_seed = random.getrandbits(32)
else:
self.random_seed = ns.random_seed
# tests
self.first_runtests: RunTests | None = None
# used by --slowest
self.print_slowest: bool = ns.print_slow
# used to display the progress bar "[ 3/100]"
self.start_time = time.perf_counter()
# used by --single
self.single_test_run: bool = ns.single
self.next_single_test: TestName | None = None
self.next_single_filename: StrPath | None = None
def log(self, line=''):
self.logger.log(line)
def find_tests(self, tests: TestList | None = None) -> tuple[TestTuple, TestList | None]:
if self.single_test_run:
self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
try:
with open(self.next_single_filename, 'r') as fp:
next_test = fp.read().strip()
tests = [next_test]
except OSError:
pass
if self.fromfile:
tests = []
# regex to match 'test_builtin' in line:
# '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec'
regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
with open(os.path.join(os_helper.SAVEDCWD, self.fromfile)) as fp:
for line in fp:
line = line.split('#', 1)[0]
line = line.strip()
match = regex.search(line)
if match is not None:
tests.append(match.group())
strip_py_suffix(tests)
if self.pgo:
# add default PGO tests if no tests are specified
setup_pgo_tests(self.cmdline_args, self.pgo_extended)
if self.tsan:
setup_tsan_tests(self.cmdline_args)
exclude_tests = set()
if self.exclude:
for arg in self.cmdline_args:
exclude_tests.add(arg)
self.cmdline_args = []
alltests = findtests(testdir=self.test_dir,
exclude=exclude_tests)
if not self.fromfile:
selected = tests or self.cmdline_args
if selected:
selected = split_test_packages(selected)
else:
selected = alltests
else:
selected = tests
if self.single_test_run:
selected = selected[:1]
try:
pos = alltests.index(selected[0])
self.next_single_test = alltests[pos + 1]
except IndexError:
pass
# Remove all the selected tests that precede start if it's set.
if self.starting_test:
try:
del selected[:selected.index(self.starting_test)]
except ValueError:
print(f"Cannot find starting test: {self.starting_test}")
sys.exit(1)
random.seed(self.random_seed)
if self.randomize:
random.shuffle(selected)
return (tuple(selected), tests)
@staticmethod
def list_tests(tests: TestTuple):
for name in tests:
print(name)
def _rerun_failed_tests(self, runtests: RunTests):
# Configure the runner to re-run tests
if self.num_workers == 0 and not self.single_process:
# Always run tests in fresh processes to have more deterministic
# initial state. Don't re-run tests in parallel but limit to a
# single worker process to have side effects (on the system load
# and timings) between tests.
self.num_workers = 1
tests, match_tests_dict = self.results.prepare_rerun()
# Re-run failed tests
runtests = runtests.copy(
tests=tests,
rerun=True,
verbose=True,
forever=False,
fail_fast=False,
match_tests_dict=match_tests_dict,
output_on_failure=False)
self.logger.set_tests(runtests)
msg = f"Re-running {len(tests)} failed tests in verbose mode"
if not self.single_process:
msg = f"{msg} in subprocesses"
self.log(msg)
self._run_tests_mp(runtests, self.num_workers)
else:
self.log(msg)
self.run_tests_sequentially(runtests)
return runtests
def rerun_failed_tests(self, runtests: RunTests):
if self.python_cmd:
# Temp patch for https://github.com/python/cpython/issues/94052
self.log(
"Re-running failed tests is not supported with --python "
"host runner option."
)
return
self.first_state = self.get_state()
print()
rerun_runtests = self._rerun_failed_tests(runtests)
if self.results.bad:
print(count(len(self.results.bad), 'test'), "failed again:")
printlist(self.results.bad)
self.display_result(rerun_runtests)
def _run_bisect(self, runtests: RunTests, test: str, progress: str) -> bool:
print()
title = f"Bisect {test}"
if progress:
title = f"{title} ({progress})"
print(title)
print("#" * len(title))
print()
cmd = runtests.create_python_cmd()
cmd.extend([
"-u", "-m", "test.bisect_cmd",
# Limit to 25 iterations (instead of 100) to not abuse CI resources
"--max-iter", "25",
"-v",
# runtests.match_tests is not used (yet) for bisect_cmd -i arg
])
cmd.extend(runtests.bisect_cmd_args())
cmd.append(test)
print("+", shlex.join(cmd), flush=True)
flush_std_streams()
import subprocess
proc = subprocess.run(cmd, timeout=runtests.timeout)
exitcode = proc.returncode
title = f"{title}: exit code {exitcode}"
print(title)
print("#" * len(title))
print(flush=True)
if exitcode:
print(f"Bisect failed with exit code {exitcode}")
return False
return True
def run_bisect(self, runtests: RunTests) -> None:
tests, _ = self.results.prepare_rerun(clear=False)
for index, name in enumerate(tests, 1):
if len(tests) > 1:
progress = f"{index}/{len(tests)}"
else:
progress = ""
if not self._run_bisect(runtests, name, progress):
return
def display_result(self, runtests):
# If running the test suite for PGO then no one cares about results.
if runtests.pgo:
return
state = self.get_state()
print()
print(f"== Tests result: {state} ==")
self.results.display_result(runtests.tests,
self.quiet, self.print_slowest)
def run_test(
self, test_name: TestName, runtests: RunTests, tracer: trace.Trace | None
) -> TestResult:
if tracer is not None:
# If we're tracing code coverage, then we don't exit with status
# if on a false return value from main.
cmd = ('result = run_single_test(test_name, runtests)')
namespace = dict(locals())
tracer.runctx(cmd, globals=globals(), locals=namespace)
result = namespace['result']
result.covered_lines = list(tracer.counts)
else:
result = run_single_test(test_name, runtests)
self.results.accumulate_result(result, runtests)
return result
def run_tests_sequentially(self, runtests) -> None:
if self.coverage:
tracer = trace.Trace(trace=False, count=True)
else:
tracer = None
save_modules = set(sys.modules)
jobs = runtests.get_jobs()
if jobs is not None:
tests = count(jobs, 'test')
else:
tests = 'tests'
msg = f"Run {tests} sequentially in a single process"
if runtests.timeout:
msg += " (timeout: %s)" % format_duration(runtests.timeout)
self.log(msg)
previous_test = None
tests_iter = runtests.iter_tests()
for test_index, test_name in enumerate(tests_iter, 1):
start_time = time.perf_counter()
text = test_name
if previous_test:
text = '%s -- %s' % (text, previous_test)
self.logger.display_progress(test_index, text)
result = self.run_test(test_name, runtests, tracer)
# Unload the newly imported test modules (best effort finalization)
new_modules = [module for module in sys.modules
if module not in save_modules and
module.startswith(("test.", "test_"))]
for module in new_modules:
sys.modules.pop(module, None)
# Remove the attribute of the parent module.
parent, _, name = module.rpartition('.')
try:
delattr(sys.modules[parent], name)
except (KeyError, AttributeError):
pass
if result.must_stop(self.fail_fast, self.fail_env_changed):
break
previous_test = str(result)
test_time = time.perf_counter() - start_time
if test_time >= PROGRESS_MIN_TIME:
previous_test = "%s in %s" % (previous_test, format_duration(test_time))
elif result.state == State.PASSED:
# be quiet: say nothing if the test passed shortly
previous_test = None
if previous_test:
print(previous_test)
def get_state(self):
state = self.results.get_state(self.fail_env_changed)
if self.first_state:
state = f'{self.first_state} then {state}'
return state
def _run_tests_mp(self, runtests: RunTests, num_workers: int) -> None:
from .run_workers import RunWorkers
RunWorkers(num_workers, runtests, self.logger, self.results).run()
def finalize_tests(self, coverage: trace.CoverageResults | None) -> None:
if self.next_single_filename:
if self.next_single_test:
with open(self.next_single_filename, 'w') as fp:
fp.write(self.next_single_test + '\n')
else:
os.unlink(self.next_single_filename)
if coverage is not None:
# uses a new-in-Python 3.13 keyword argument that mypy doesn't know about yet:
coverage.write_results(show_missing=True, summary=True, # type: ignore[call-arg]
coverdir=self.coverage_dir,
ignore_missing_files=True)
if self.want_run_leaks:
os.system("leaks %d" % os.getpid())
if self.junit_filename:
self.results.write_junit(self.junit_filename)
def display_summary(self):
duration = time.perf_counter() - self.logger.start_time
filtered = bool(self.match_tests)
# Total duration
print()
print("Total duration: %s" % format_duration(duration))
self.results.display_summary(self.first_runtests, filtered)
# Result
state = self.get_state()
print(f"Result: {state}")
def create_run_tests(self, tests: TestTuple):
return RunTests(
tests,
fail_fast=self.fail_fast,
fail_env_changed=self.fail_env_changed,
match_tests=self.match_tests,
match_tests_dict=None,
rerun=False,
forever=self.forever,
pgo=self.pgo,
pgo_extended=self.pgo_extended,
output_on_failure=self.output_on_failure,
timeout=self.timeout,
verbose=self.verbose,
quiet=self.quiet,
hunt_refleak=self.hunt_refleak,
test_dir=self.test_dir,
use_junit=(self.junit_filename is not None),
coverage=self.coverage,
memory_limit=self.memory_limit,
gc_threshold=self.gc_threshold,
use_resources=self.use_resources,
python_cmd=self.python_cmd,
randomize=self.randomize,
random_seed=self.random_seed,
)
def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
if self.hunt_refleak and self.hunt_refleak.warmups < 3:
msg = ("WARNING: Running tests with --huntrleaks/-R and "
"less than 3 warmup repetitions can give false positives!")
print(msg, file=sys.stdout, flush=True)
if self.num_workers < 0:
# Use all CPUs + 2 extra worker processes for tests
# that like to sleep
#
# os.process.cpu_count() is new in Python 3.13;
# mypy doesn't know about it yet
self.num_workers = (os.process_cpu_count() or 1) + 2 # type: ignore[attr-defined]
# For a partial run, we do not need to clutter the output.
if (self.want_header
or not(self.pgo or self.quiet or self.single_test_run
or tests or self.cmdline_args)):
display_header(self.use_resources, self.python_cmd)
print("Using random seed:", self.random_seed)
runtests = self.create_run_tests(selected)
self.first_runtests = runtests
self.logger.set_tests(runtests)
setup_process()
if (runtests.hunt_refleak is not None) and (not self.num_workers):
# gh-109739: WindowsLoadTracker thread interfers with refleak check
use_load_tracker = False
else:
# WindowsLoadTracker is only needed on Windows
use_load_tracker = MS_WINDOWS
if use_load_tracker:
self.logger.start_load_tracker()
try:
if self.num_workers:
self._run_tests_mp(runtests, self.num_workers)
else:
# gh-117783: don't immortalize deferred objects when tracking
# refleaks. Only relevant for the free-threaded build.
with suppress_immortalization(runtests.hunt_refleak):
self.run_tests_sequentially(runtests)
coverage = self.results.get_coverage_results()
self.display_result(runtests)
if self.want_rerun and self.results.need_rerun():
self.rerun_failed_tests(runtests)
if self.want_bisect and self.results.need_rerun():
self.run_bisect(runtests)
finally:
if use_load_tracker:
self.logger.stop_load_tracker()
self.display_summary()
self.finalize_tests(coverage)
return self.results.get_exitcode(self.fail_env_changed,
self.fail_rerun)
def run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
os.makedirs(self.tmp_dir, exist_ok=True)
work_dir = get_work_dir(self.tmp_dir)
# Put a timeout on Python exit
with exit_timeout():
# Run the tests in a context manager that temporarily changes the
# CWD to a temporary and writable directory. If it's not possible
# to create or change the CWD, the original CWD will be used.
# The original CWD is available from os_helper.SAVEDCWD.
with os_helper.temp_cwd(work_dir, quiet=True):
# When using multiprocessing, worker processes will use
# work_dir as their parent temporary directory. So when the
# main process exit, it removes also subdirectories of worker
# processes.
return self._run_tests(selected, tests)
def _add_cross_compile_opts(self, regrtest_opts):
# WASM/WASI buildbot builders pass multiple PYTHON environment
# variables such as PYTHONPATH and _PYTHON_HOSTRUNNER.
keep_environ = bool(self.python_cmd)
environ = None
# Are we using cross-compilation?
cross_compile = is_cross_compiled()
# Get HOSTRUNNER
hostrunner = get_host_runner()
if cross_compile:
# emulate -E, but keep PYTHONPATH + cross compile env vars,
# so test executable can load correct sysconfigdata file.
keep = {
'_PYTHON_PROJECT_BASE',
'_PYTHON_HOST_PLATFORM',
'_PYTHON_SYSCONFIGDATA_NAME',
'PYTHONPATH'
}
old_environ = os.environ
new_environ = {
name: value for name, value in os.environ.items()
if not name.startswith(('PYTHON', '_PYTHON')) or name in keep
}
# Only set environ if at least one variable was removed
if new_environ != old_environ:
environ = new_environ
keep_environ = True
if cross_compile and hostrunner:
if self.num_workers == 0 and not self.single_process:
# For now use only two cores for cross-compiled builds;
# hostrunner can be expensive.
regrtest_opts.extend(['-j', '2'])
# If HOSTRUNNER is set and -p/--python option is not given, then
# use hostrunner to execute python binary for tests.
if not self.python_cmd:
buildpython = sysconfig.get_config_var("BUILDPYTHON")
python_cmd = f"{hostrunner} {buildpython}"
regrtest_opts.extend(["--python", python_cmd])
keep_environ = True
return (environ, keep_environ)
def _add_ci_python_opts(self, python_opts, keep_environ):
# --fast-ci and --slow-ci add options to Python:
# "-u -W default -bb -E"
# Unbuffered stdout and stderr
if not sys.stdout.write_through:
python_opts.append('-u')
# Add warnings filter 'default'
if 'default' not in sys.warnoptions:
python_opts.extend(('-W', 'default'))
# Error on bytes/str comparison
if sys.flags.bytes_warning < 2:
python_opts.append('-bb')
if not keep_environ:
# Ignore PYTHON* environment variables
if not sys.flags.ignore_environment:
python_opts.append('-E')
def _execute_python(self, cmd, environ):
# Make sure that messages before execv() are logged
sys.stdout.flush()
sys.stderr.flush()
cmd_text = shlex.join(cmd)
try:
print(f"+ {cmd_text}", flush=True)
if hasattr(os, 'execv') and not MS_WINDOWS:
os.execv(cmd[0], cmd)
# On success, execv() do no return.
# On error, it raises an OSError.
else:
import subprocess
with subprocess.Popen(cmd, env=environ) as proc:
try:
proc.wait()
except KeyboardInterrupt:
# There is no need to call proc.terminate(): on CTRL+C,
# SIGTERM is also sent to the child process.
try:
proc.wait(timeout=EXIT_TIMEOUT)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
sys.exit(EXITCODE_INTERRUPTED)
sys.exit(proc.returncode)
except Exception as exc:
print_warning(f"Failed to change Python options: {exc!r}\n"
f"Command: {cmd_text}")
# continue executing main()
def _add_python_opts(self):
python_opts = []
regrtest_opts = []
environ, keep_environ = self._add_cross_compile_opts(regrtest_opts)
if self.ci_mode:
self._add_ci_python_opts(python_opts, keep_environ)
if (not python_opts) and (not regrtest_opts) and (environ is None):
# Nothing changed: nothing to do
return
# Create new command line
cmd = list(sys.orig_argv)
if python_opts:
cmd[1:1] = python_opts
if regrtest_opts:
cmd.extend(regrtest_opts)
cmd.append("--dont-add-python-opts")
self._execute_python(cmd, environ)
def _init(self):
# Set sys.stdout encoder error handler to backslashreplace,
# similar to sys.stderr error handler, to avoid UnicodeEncodeError
# when printing a traceback or any other non-encodable character.
sys.stdout.reconfigure(errors="backslashreplace")
if self.junit_filename and not os.path.isabs(self.junit_filename):
self.junit_filename = os.path.abspath(self.junit_filename)
strip_py_suffix(self.cmdline_args)
self.tmp_dir = get_temp_dir(self.tmp_dir)
def main(self, tests: TestList | None = None):
if self.want_add_python_opts:
self._add_python_opts()
self._init()
if self.want_cleanup:
cleanup_temp_dir(self.tmp_dir)
sys.exit(0)
if self.want_wait:
input("Press any key to continue...")
setup_test_dir(self.test_dir)
selected, tests = self.find_tests(tests)
exitcode = 0
if self.want_list_tests:
self.list_tests(selected)
elif self.want_list_cases:
list_cases(selected,
match_tests=self.match_tests,
test_dir=self.test_dir)
else:
exitcode = self.run_tests(selected, tests)
sys.exit(exitcode)
def main(tests=None, _add_python_opts=False, **kwargs):
"""Run the Python suite."""
ns = _parse_args(sys.argv[1:], **kwargs)
Regrtest(ns, _add_python_opts=_add_python_opts).main(tests=tests)