import os
import sys
import warnings
from inspect import isabstract
from typing import Any
from test import support
from test.support import os_helper
from test.support import refleak_helper
from .runtests import HuntRefleak
from .utils import clear_caches
try:
from _abc import _get_dump
except ImportError:
import weakref
def _get_dump(cls):
# Reimplement _get_dump() for pure-Python implementation of
# the abc module (Lib/_py_abc.py)
registry_weakrefs = set(weakref.ref(obj) for obj in cls._abc_registry)
return (registry_weakrefs, cls._abc_cache,
cls._abc_negative_cache, cls._abc_negative_cache_version)
def save_support_xml(filename):
if support.junit_xml_list is None:
return
import pickle
with open(filename, 'xb') as fp:
pickle.dump(support.junit_xml_list, fp)
support.junit_xml_list = None
def restore_support_xml(filename):
try:
fp = open(filename, 'rb')
except FileNotFoundError:
return
import pickle
with fp:
xml_list = pickle.load(fp)
os.unlink(filename)
support.junit_xml_list = xml_list
def runtest_refleak(test_name, test_func,
hunt_refleak: HuntRefleak,
quiet: bool):
"""Run a test multiple times, looking for reference leaks.
Returns:
False if the test didn't leak references; True if we detected refleaks.
"""
# This code is hackish and inelegant, but it seems to do the job.
import copyreg
import collections.abc
if not hasattr(sys, 'gettotalrefcount'):
raise Exception("Tracking reference leaks requires a debug build "
"of Python")
# Avoid false positives due to various caches
# filling slowly with random data:
warm_caches()
# Save current values for dash_R_cleanup() to restore.
fs = warnings.filters[:]
ps = copyreg.dispatch_table.copy()
pic = sys.path_importer_cache.copy()
zdc: dict[str, Any] | None
try:
import zipimport
except ImportError:
zdc = None # Run unmodified on platforms without zipimport support
else:
# private attribute that mypy doesn't know about:
zdc = zipimport._zip_directory_cache.copy() # type: ignore[attr-defined]
abcs = {}
for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]:
if not isabstract(abc):
continue
for obj in abc.__subclasses__() + [abc]:
abcs[obj] = _get_dump(obj)[0]
# bpo-31217: Integer pool to get a single integer object for the same
# value. The pool is used to prevent false alarm when checking for memory
# block leaks. Fill the pool with values in -1000..1000 which are the most
# common (reference, memory block, file descriptor) differences.
int_pool = {value: value for value in range(-1000, 1000)}
def get_pooled_int(value):
return int_pool.setdefault(value, value)
warmups = hunt_refleak.warmups
runs = hunt_refleak.runs
filename = hunt_refleak.filename
repcount = warmups + runs
# Pre-allocate to ensure that the loop doesn't allocate anything new
rep_range = list(range(repcount))
rc_deltas = [0] * repcount
alloc_deltas = [0] * repcount
fd_deltas = [0] * repcount
getallocatedblocks = sys.getallocatedblocks
gettotalrefcount = sys.gettotalrefcount
getunicodeinternedsize = sys.getunicodeinternedsize
fd_count = os_helper.fd_count
# initialize variables to make pyflakes quiet
rc_before = alloc_before = fd_before = interned_immortal_before = 0
if not quiet:
print("beginning", repcount, "repetitions. Showing number of leaks "
"(. for 0 or less, X for 10 or more)",
file=sys.stderr)
numbers = ("1234567890"*(repcount//10 + 1))[:repcount]
numbers = numbers[:warmups] + ':' + numbers[warmups:]
print(numbers, file=sys.stderr, flush=True)
xml_filename = 'refleak-xml.tmp'
result = None
dash_R_cleanup(fs, ps, pic, zdc, abcs)
support.gc_collect()
for i in rep_range:
current = refleak_helper._hunting_for_refleaks
refleak_helper._hunting_for_refleaks = True
try:
result = test_func()
finally:
refleak_helper._hunting_for_refleaks = current
save_support_xml(xml_filename)
dash_R_cleanup(fs, ps, pic, zdc, abcs)
support.gc_collect()
# Read memory statistics immediately after the garbage collection.
# Also, readjust the reference counts and alloc blocks by ignoring
# any strings that might have been interned during test_func. These
# strings will be deallocated at runtime shutdown
interned_immortal_after = getunicodeinternedsize(
# Use an internal-only keyword argument that mypy doesn't know yet
_only_immortal=True) # type: ignore[call-arg]
alloc_after = getallocatedblocks() - interned_immortal_after
rc_after = gettotalrefcount()
fd_after = fd_count()
rc_deltas[i] = get_pooled_int(rc_after - rc_before)
alloc_deltas[i] = get_pooled_int(alloc_after - alloc_before)
fd_deltas[i] = get_pooled_int(fd_after - fd_before)
if not quiet:
# use max, not sum, so total_leaks is one of the pooled ints
total_leaks = max(rc_deltas[i], alloc_deltas[i], fd_deltas[i])
if total_leaks <= 0:
symbol = '.'
elif total_leaks < 10:
symbol = (
'.', '1', '2', '3', '4', '5', '6', '7', '8', '9',
)[total_leaks]
else:
symbol = 'X'
if i == warmups:
print(' ', end='', file=sys.stderr, flush=True)
print(symbol, end='', file=sys.stderr, flush=True)
del total_leaks
del symbol
alloc_before = alloc_after
rc_before = rc_after
fd_before = fd_after
interned_immortal_before = interned_immortal_after
restore_support_xml(xml_filename)
if not quiet:
print(file=sys.stderr)
# These checkers return False on success, True on failure
def check_rc_deltas(deltas):
# Checker for reference counters and memory blocks.
#
# bpo-30776: Try to ignore false positives:
#
# [3, 0, 0]
# [0, 1, 0]
# [8, -8, 1]
#
# Expected leaks:
#
# [5, 5, 6]
# [10, 1, 1]
return all(delta >= 1 for delta in deltas)
def check_fd_deltas(deltas):
return any(deltas)
failed = False
for deltas, item_name, checker in [
(rc_deltas, 'references', check_rc_deltas),
(alloc_deltas, 'memory blocks', check_rc_deltas),
(fd_deltas, 'file descriptors', check_fd_deltas)
]:
# ignore warmup runs
deltas = deltas[warmups:]
failing = checker(deltas)
suspicious = any(deltas)
if failing or suspicious:
msg = '%s leaked %s %s, sum=%s' % (
test_name, deltas, item_name, sum(deltas))
print(msg, end='', file=sys.stderr)
if failing:
print(file=sys.stderr, flush=True)
with open(filename, "a", encoding="utf-8") as refrep:
print(msg, file=refrep)
refrep.flush()
failed = True
else:
print(' (this is fine)', file=sys.stderr, flush=True)
return (failed, result)
def dash_R_cleanup(fs, ps, pic, zdc, abcs):
import copyreg
import collections.abc
# Restore some original values.
warnings.filters[:] = fs
copyreg.dispatch_table.clear()
copyreg.dispatch_table.update(ps)
sys.path_importer_cache.clear()
sys.path_importer_cache.update(pic)
try:
import zipimport
except ImportError:
pass # Run unmodified on platforms without zipimport support
else:
zipimport._zip_directory_cache.clear()
zipimport._zip_directory_cache.update(zdc)
# Clear ABC registries, restoring previously saved ABC registries.
abs_classes = [getattr(collections.abc, a) for a in collections.abc.__all__]
abs_classes = filter(isabstract, abs_classes)
for abc in abs_classes:
for obj in abc.__subclasses__() + [abc]:
refs = abcs.get(obj, None)
if refs is not None:
obj._abc_registry_clear()
for ref in refs:
subclass = ref()
if subclass is not None:
obj.register(subclass)
obj._abc_caches_clear()
# Clear caches
clear_caches()
# Clear other caches last (previous function calls can re-populate them):
sys._clear_internal_caches()
def warm_caches():
# char cache
s = bytes(range(256))
for i in range(256):
s[i:i+1]
# unicode cache
[chr(i) for i in range(256)]
# int cache
list(range(-5, 257))