cpython/Lib/test/test_gc.py

import unittest
import unittest.mock
from test import support
from test.support import (verbose, refcount_test,
                          cpython_only, requires_subprocess,
                          requires_gil_enabled,
                          Py_GIL_DISABLED)
from test.support.import_helper import import_module
from test.support.os_helper import temp_dir, TESTFN, unlink
from test.support.script_helper import assert_python_ok, make_script
from test.support import threading_helper, gc_threshold

import gc
import sys
import sysconfig
import textwrap
import threading
import time
import weakref

try:
    import _testcapi
    from _testcapi import with_tp_del
    from _testcapi import ContainerNoGC
except ImportError:
    _testcapi = None
    def with_tp_del(cls):
        class C(object):
            def __new__(cls, *args, **kwargs):
                raise unittest.SkipTest('requires _testcapi.with_tp_del')
        return C
    ContainerNoGC = None

try:
    import _testinternalcapi
except ImportError:
    _testinternalcapi = None

### Support code
###############################################################################

# Bug 1055820 has several tests of longstanding bugs involving weakrefs and
# cyclic gc.

# An instance of C1055820 has a self-loop, so becomes cyclic trash when
# unreachable.
class C1055820(object):
    def __init__(self, i):
        self.i = i
        self.loop = self

class GC_Detector(object):
    # Create an instance I.  Then gc hasn't happened again so long as
    # I.gc_happened is false.

    def __init__(self):
        self.gc_happened = False

        def it_happened(ignored):
            self.gc_happened = True

        # Create a piece of cyclic trash that triggers it_happened when
        # gc collects it.
        self.wr = weakref.ref(C1055820(666), it_happened)

@with_tp_del
class Uncollectable(object):
    """Create a reference cycle with multiple __del__ methods.

    An object in a reference cycle will never have zero references,
    and so must be garbage collected.  If one or more objects in the
    cycle have __del__ methods, the gc refuses to guess an order,
    and leaves the cycle uncollected."""
    def __init__(self, partner=None):
        if partner is None:
            self.partner = Uncollectable(partner=self)
        else:
            self.partner = partner
    def __tp_del__(self):
        pass

if sysconfig.get_config_vars().get('PY_CFLAGS', ''):
    BUILD_WITH_NDEBUG = ('-DNDEBUG' in sysconfig.get_config_vars()['PY_CFLAGS'])
else:
    # Usually, sys.gettotalrefcount() is only present if Python has been
    # compiled in debug mode. If it's missing, expect that Python has
    # been released in release mode: with NDEBUG defined.
    BUILD_WITH_NDEBUG = (not hasattr(sys, 'gettotalrefcount'))

### Tests
###############################################################################

class GCTests(unittest.TestCase):
    def test_list(self):
        l = []
        l.append(l)
        gc.collect()
        del l
        self.assertEqual(gc.collect(), 1)

    def test_dict(self):
        d = {}
        d[1] = d
        gc.collect()
        del d
        self.assertEqual(gc.collect(), 1)

    def test_tuple(self):
        # since tuples are immutable we close the loop with a list
        l = []
        t = (l,)
        l.append(t)
        gc.collect()
        del t
        del l
        self.assertEqual(gc.collect(), 2)

    def test_class(self):
        class A:
            pass
        A.a = A
        gc.collect()
        del A
        self.assertNotEqual(gc.collect(), 0)

    def test_newstyleclass(self):
        class A(object):
            pass
        gc.collect()
        del A
        self.assertNotEqual(gc.collect(), 0)

    def test_instance(self):
        class A:
            pass
        a = A()
        a.a = a
        gc.collect()
        del a
        self.assertNotEqual(gc.collect(), 0)

    def test_newinstance(self):
        class A(object):
            pass
        a = A()
        a.a = a
        gc.collect()
        del a
        self.assertNotEqual(gc.collect(), 0)
        class B(list):
            pass
        class C(B, A):
            pass
        a = C()
        a.a = a
        gc.collect()
        del a
        self.assertNotEqual(gc.collect(), 0)
        del B, C
        self.assertNotEqual(gc.collect(), 0)
        A.a = A()
        del A
        self.assertNotEqual(gc.collect(), 0)
        self.assertEqual(gc.collect(), 0)

    def test_method(self):
        # Tricky: self.__init__ is a bound method, it references the instance.
        class A:
            def __init__(self):
                self.init = self.__init__
        a = A()
        gc.collect()
        del a
        self.assertNotEqual(gc.collect(), 0)

    @cpython_only
    def test_legacy_finalizer(self):
        # A() is uncollectable if it is part of a cycle, make sure it shows up
        # in gc.garbage.
        @with_tp_del
        class A:
            def __tp_del__(self): pass
        class B:
            pass
        a = A()
        a.a = a
        id_a = id(a)
        b = B()
        b.b = b
        gc.collect()
        del a
        del b
        self.assertNotEqual(gc.collect(), 0)
        for obj in gc.garbage:
            if id(obj) == id_a:
                del obj.a
                break
        else:
            self.fail("didn't find obj in garbage (finalizer)")
        gc.garbage.remove(obj)

    @cpython_only
    def test_legacy_finalizer_newclass(self):
        # A() is uncollectable if it is part of a cycle, make sure it shows up
        # in gc.garbage.
        @with_tp_del
        class A(object):
            def __tp_del__(self): pass
        class B(object):
            pass
        a = A()
        a.a = a
        id_a = id(a)
        b = B()
        b.b = b
        gc.collect()
        del a
        del b
        self.assertNotEqual(gc.collect(), 0)
        for obj in gc.garbage:
            if id(obj) == id_a:
                del obj.a
                break
        else:
            self.fail("didn't find obj in garbage (finalizer)")
        gc.garbage.remove(obj)

    def test_function(self):
        # Tricky: f -> d -> f, code should call d.clear() after the exec to
        # break the cycle.
        d = {}
        exec("def f(): pass\n", d)
        gc.collect()
        del d
        # In the free-threaded build, the count returned by `gc.collect()`
        # is 3 because it includes f's code object.
        self.assertIn(gc.collect(), (2, 3))

    def test_function_tp_clear_leaves_consistent_state(self):
        # https://github.com/python/cpython/issues/91636
        code = """if 1:

        import gc
        import weakref

        class LateFin:
            __slots__ = ('ref',)

            def __del__(self):

                # 8. Now `latefin`'s finalizer is called. Here we
                #    obtain a reference to `func`, which is currently
                #    undergoing `tp_clear`.
                global func
                func = self.ref()

        class Cyclic(tuple):
            __slots__ = ()

            # 4. The finalizers of all garbage objects are called. In
            #    this case this is only us as `func` doesn't have a
            #    finalizer.
            def __del__(self):

                # 5. Create a weakref to `func` now. If we had created
                #    it earlier, it would have been cleared by the
                #    garbage collector before calling the finalizers.
                self[1].ref = weakref.ref(self[0])

                # 6. Drop the global reference to `latefin`. The only
                #    remaining reference is the one we have.
                global latefin
                del latefin

            # 7. Now `func` is `tp_clear`-ed. This drops the last
            #    reference to `Cyclic`, which gets `tp_dealloc`-ed.
            #    This drops the last reference to `latefin`.

        latefin = LateFin()
        def func():
            pass
        cyc = tuple.__new__(Cyclic, (func, latefin))

        # 1. Create a reference cycle of `cyc` and `func`.
        func.__module__ = cyc

        # 2. Make the cycle unreachable, but keep the global reference
        #    to `latefin` so that it isn't detected as garbage. This
        #    way its finalizer will not be called immediately.
        del func, cyc

        # 3. Invoke garbage collection,
        #    which will find `cyc` and `func` as garbage.
        gc.collect()

        # 9. Previously, this would crash because `func_qualname`
        #    had been NULL-ed out by func_clear().
        print(f"{func=}")
        """
        # We're mostly just checking that this doesn't crash.
        rc, stdout, stderr = assert_python_ok("-c", code)
        self.assertEqual(rc, 0)
        self.assertRegex(stdout, rb"""\A\s*func=<function  at \S+>\s*\Z""")
        self.assertFalse(stderr)

    @refcount_test
    def test_frame(self):
        def f():
            frame = sys._getframe()
        gc.collect()
        f()
        self.assertEqual(gc.collect(), 1)

    def test_saveall(self):
        # Verify that cyclic garbage like lists show up in gc.garbage if the
        # SAVEALL option is enabled.

        # First make sure we don't save away other stuff that just happens to
        # be waiting for collection.
        gc.collect()
        # if this fails, someone else created immortal trash
        self.assertEqual(gc.garbage, [])

        L = []
        L.append(L)
        id_L = id(L)

        debug = gc.get_debug()
        gc.set_debug(debug | gc.DEBUG_SAVEALL)
        del L
        gc.collect()
        gc.set_debug(debug)

        self.assertEqual(len(gc.garbage), 1)
        obj = gc.garbage.pop()
        self.assertEqual(id(obj), id_L)

    def test_del(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A:
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example, disposed tuples are not freed, but reused.
    # To minimize variations, though, we first store the get_count() results
    # and check them at the end.
    @refcount_test
    @requires_gil_enabled('needs precise allocation counts')
    def test_get_count(self):
        gc.collect()
        a, b, c = gc.get_count()
        x = []
        d, e, f = gc.get_count()
        self.assertEqual((b, c), (0, 0))
        self.assertEqual((e, f), (0, 0))
        # This is less fragile than asserting that a equals 0.
        self.assertLess(a, 5)
        # Between the two calls to get_count(), at least one object was
        # created (the list).
        self.assertGreater(d, a)

    @refcount_test
    def test_collect_generations(self):
        gc.collect()
        # This object will "trickle" into generation N + 1 after
        # each call to collect(N)
        x = []
        gc.collect(0)
        # x is now in the old gen
        a, b, c = gc.get_count()
        # We don't check a since its exact values depends on
        # internal implementation details of the interpreter.
        self.assertEqual((b, c), (1, 0))

    def test_trashcan(self):
        class Ouch:
            n = 0
            def __del__(self):
                Ouch.n = Ouch.n + 1
                if Ouch.n % 17 == 0:
                    gc.collect()

        # "trashcan" is a hack to prevent stack overflow when deallocating
        # very deeply nested tuples etc.  It works in part by abusing the
        # type pointer and refcount fields, and that can yield horrible
        # problems when gc tries to traverse the structures.
        # If this test fails (as it does in 2.0, 2.1 and 2.2), it will
        # most likely die via segfault.

        # Note:  In 2.3 the possibility for compiling without cyclic gc was
        # removed, and that in turn allows the trashcan mechanism to work
        # via much simpler means (e.g., it never abuses the type pointer or
        # refcount fields anymore).  Since it's much less likely to cause a
        # problem now, the various constants in this expensive (we force a lot
        # of full collections) test are cut back from the 2.2 version.
        gc.enable()
        N = 150
        for count in range(2):
            t = []
            for i in range(N):
                t = [t, Ouch()]
            u = []
            for i in range(N):
                u = [u, Ouch()]
            v = {}
            for i in range(N):
                v = {1: v, 2: Ouch()}
        gc.disable()

    @threading_helper.requires_working_threading()
    def test_trashcan_threads(self):
        # Issue #13992: trashcan mechanism should be thread-safe
        NESTING = 60
        N_THREADS = 2

        def sleeper_gen():
            """A generator that releases the GIL when closed or dealloc'ed."""
            try:
                yield
            finally:
                time.sleep(0.000001)

        class C(list):
            # Appending to a list is atomic, which avoids the use of a lock.
            inits = []
            dels = []
            def __init__(self, alist):
                self[:] = alist
                C.inits.append(None)
            def __del__(self):
                # This __del__ is called by subtype_dealloc().
                C.dels.append(None)
                # `g` will release the GIL when garbage-collected.  This
                # helps assert subtype_dealloc's behaviour when threads
                # switch in the middle of it.
                g = sleeper_gen()
                next(g)
                # Now that __del__ is finished, subtype_dealloc will proceed
                # to call list_dealloc, which also uses the trashcan mechanism.

        def make_nested():
            """Create a sufficiently nested container object so that the
            trashcan mechanism is invoked when deallocating it."""
            x = C([])
            for i in range(NESTING):
                x = [C([x])]
            del x

        def run_thread():
            """Exercise make_nested() in a loop."""
            while not exit:
                make_nested()

        old_switchinterval = sys.getswitchinterval()
        support.setswitchinterval(1e-5)
        try:
            exit = []
            threads = []
            for i in range(N_THREADS):
                t = threading.Thread(target=run_thread)
                threads.append(t)
            with threading_helper.start_threads(threads, lambda: exit.append(1)):
                time.sleep(1.0)
        finally:
            sys.setswitchinterval(old_switchinterval)
        gc.collect()
        self.assertEqual(len(C.inits), len(C.dels))

    def test_boom(self):
        class Boom:
            def __getattr__(self, someattribute):
                del self.attr
                raise AttributeError

        a = Boom()
        b = Boom()
        a.attr = b
        b.attr = a

        gc.collect()
        garbagelen = len(gc.garbage)
        del a, b
        # a<->b are in a trash cycle now.  Collection will invoke
        # Boom.__getattr__ (to see whether a and b have __del__ methods), and
        # __getattr__ deletes the internal "attr" attributes as a side effect.
        # That causes the trash cycle to get reclaimed via refcounts falling to
        # 0, thus mutating the trash graph as a side effect of merely asking
        # whether __del__ exists.  This used to (before 2.3b1) crash Python.
        # Now __getattr__ isn't called.
        self.assertEqual(gc.collect(), 2)
        self.assertEqual(len(gc.garbage), garbagelen)

    def test_boom2(self):
        class Boom2:
            def __init__(self):
                self.x = 0

            def __getattr__(self, someattribute):
                self.x += 1
                if self.x > 1:
                    del self.attr
                raise AttributeError

        a = Boom2()
        b = Boom2()
        a.attr = b
        b.attr = a

        gc.collect()
        garbagelen = len(gc.garbage)
        del a, b
        # Much like test_boom(), except that __getattr__ doesn't break the
        # cycle until the second time gc checks for __del__.  As of 2.3b1,
        # there isn't a second time, so this simply cleans up the trash cycle.
        # We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get
        # reclaimed this way.
        self.assertEqual(gc.collect(), 2)
        self.assertEqual(len(gc.garbage), garbagelen)

    def test_get_referents(self):
        alist = [1, 3, 5]
        got = gc.get_referents(alist)
        got.sort()
        self.assertEqual(got, alist)

        atuple = tuple(alist)
        got = gc.get_referents(atuple)
        got.sort()
        self.assertEqual(got, alist)

        adict = {1: 3, 5: 7}
        expected = [1, 3, 5, 7]
        got = gc.get_referents(adict)
        got.sort()
        self.assertEqual(got, expected)

        got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0))
        got.sort()
        self.assertEqual(got, [0, 0] + list(range(5)))

        self.assertEqual(gc.get_referents(1, 'a', 4j), [])

    def test_is_tracked(self):
        # Atomic built-in types are not tracked, user-defined objects and
        # mutable containers are.
        # NOTE: types with special optimizations (e.g. tuple) have tests
        # in their own test files instead.
        self.assertFalse(gc.is_tracked(None))
        self.assertFalse(gc.is_tracked(1))
        self.assertFalse(gc.is_tracked(1.0))
        self.assertFalse(gc.is_tracked(1.0 + 5.0j))
        self.assertFalse(gc.is_tracked(True))
        self.assertFalse(gc.is_tracked(False))
        self.assertFalse(gc.is_tracked(b"a"))
        self.assertFalse(gc.is_tracked("a"))
        self.assertFalse(gc.is_tracked(bytearray(b"a")))
        self.assertFalse(gc.is_tracked(type))
        self.assertFalse(gc.is_tracked(int))
        self.assertFalse(gc.is_tracked(object))
        self.assertFalse(gc.is_tracked(object()))

        class UserClass:
            pass

        class UserInt(int):
            pass

        # Base class is object; no extra fields.
        class UserClassSlots:
            __slots__ = ()

        # Base class is fixed size larger than object; no extra fields.
        class UserFloatSlots(float):
            __slots__ = ()

        # Base class is variable size; no extra fields.
        class UserIntSlots(int):
            __slots__ = ()

        self.assertTrue(gc.is_tracked(gc))
        self.assertTrue(gc.is_tracked(UserClass))
        self.assertTrue(gc.is_tracked(UserClass()))
        self.assertTrue(gc.is_tracked(UserInt()))
        self.assertTrue(gc.is_tracked([]))
        self.assertTrue(gc.is_tracked(set()))
        self.assertTrue(gc.is_tracked(UserClassSlots()))
        self.assertTrue(gc.is_tracked(UserFloatSlots()))
        self.assertTrue(gc.is_tracked(UserIntSlots()))

    def test_is_finalized(self):
        # Objects not tracked by the always gc return false
        self.assertFalse(gc.is_finalized(3))

        storage = []
        class Lazarus:
            def __del__(self):
                storage.append(self)

        lazarus = Lazarus()
        self.assertFalse(gc.is_finalized(lazarus))

        del lazarus
        gc.collect()

        lazarus = storage.pop()
        self.assertTrue(gc.is_finalized(lazarus))

    def test_bug1055820b(self):
        # Corresponds to temp2b.py in the bug report.

        ouch = []
        def callback(ignored):
            ouch[:] = [wr() for wr in WRs]

        Cs = [C1055820(i) for i in range(2)]
        WRs = [weakref.ref(c, callback) for c in Cs]
        c = None

        gc.collect()
        self.assertEqual(len(ouch), 0)
        # Make the two instances trash, and collect again.  The bug was that
        # the callback materialized a strong reference to an instance, but gc
        # cleared the instance's dict anyway.
        Cs = None
        gc.collect()
        self.assertEqual(len(ouch), 2)  # else the callbacks didn't run
        for x in ouch:
            # If the callback resurrected one of these guys, the instance
            # would be damaged, with an empty __dict__.
            self.assertEqual(x, None)

    def test_bug21435(self):
        # This is a poor test - its only virtue is that it happened to
        # segfault on Tim's Windows box before the patch for 21435 was
        # applied.  That's a nasty bug relying on specific pieces of cyclic
        # trash appearing in exactly the right order in finalize_garbage()'s
        # input list.
        # But there's no reliable way to force that order from Python code,
        # so over time chances are good this test won't really be testing much
        # of anything anymore.  Still, if it blows up, there's _some_
        # problem ;-)
        gc.collect()

        class A:
            pass

        class B:
            def __init__(self, x):
                self.x = x

            def __del__(self):
                self.attr = None

        def do_work():
            a = A()
            b = B(A())

            a.attr = b
            b.attr = a

        do_work()
        gc.collect() # this blows up (bad C pointer) when it fails

    @cpython_only
    @requires_subprocess()
    @unittest.skipIf(_testcapi is None, "requires _testcapi")
    def test_garbage_at_shutdown(self):
        import subprocess
        code = """if 1:
            import gc
            import _testcapi
            @_testcapi.with_tp_del
            class X:
                def __init__(self, name):
                    self.name = name
                def __repr__(self):
                    return "<X %%r>" %% self.name
                def __tp_del__(self):
                    pass

            x = X('first')
            x.x = x
            x.y = X('second')
            del x
            gc.set_debug(%s)
        """
        def run_command(code):
            p = subprocess.Popen([sys.executable, "-Wd", "-c", code],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE)
            stdout, stderr = p.communicate()
            p.stdout.close()
            p.stderr.close()
            self.assertEqual(p.returncode, 0)
            self.assertEqual(stdout, b"")
            return stderr

        stderr = run_command(code % "0")
        self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
                      b"shutdown; use", stderr)
        self.assertNotIn(b"<X 'first'>", stderr)
        # With DEBUG_UNCOLLECTABLE, the garbage list gets printed
        stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE")
        self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
                      b"shutdown", stderr)
        self.assertTrue(
            (b"[<X 'first'>, <X 'second'>]" in stderr) or
            (b"[<X 'second'>, <X 'first'>]" in stderr), stderr)
        # With DEBUG_SAVEALL, no additional message should get printed
        # (because gc.garbage also contains normally reclaimable cyclic
        # references, and its elements get printed at runtime anyway).
        stderr = run_command(code % "gc.DEBUG_SAVEALL")
        self.assertNotIn(b"uncollectable objects at shutdown", stderr)

    def test_gc_main_module_at_shutdown(self):
        # Create a reference cycle through the __main__ module and check
        # it gets collected at interpreter shutdown.
        code = """if 1:
            class C:
                def __del__(self):
                    print('__del__ called')
            l = [C()]
            l.append(l)
            """
        rc, out, err = assert_python_ok('-c', code)
        self.assertEqual(out.strip(), b'__del__ called')

    def test_gc_ordinary_module_at_shutdown(self):
        # Same as above, but with a non-__main__ module.
        with temp_dir() as script_dir:
            module = """if 1:
                class C:
                    def __del__(self):
                        print('__del__ called')
                l = [C()]
                l.append(l)
                """
            code = """if 1:
                import sys
                sys.path.insert(0, %r)
                import gctest
                """ % (script_dir,)
            make_script(script_dir, 'gctest', module)
            rc, out, err = assert_python_ok('-c', code)
            self.assertEqual(out.strip(), b'__del__ called')

    def test_global_del_SystemExit(self):
        code = """if 1:
            class ClassWithDel:
                def __del__(self):
                    print('__del__ called')
            a = ClassWithDel()
            a.link = a
            raise SystemExit(0)"""
        self.addCleanup(unlink, TESTFN)
        with open(TESTFN, 'w', encoding="utf-8") as script:
            script.write(code)
        rc, out, err = assert_python_ok(TESTFN)
        self.assertEqual(out.strip(), b'__del__ called')

    def test_get_stats(self):
        stats = gc.get_stats()
        self.assertEqual(len(stats), 3)
        for st in stats:
            self.assertIsInstance(st, dict)
            self.assertEqual(set(st),
                             {"collected", "collections", "uncollectable"})
            self.assertGreaterEqual(st["collected"], 0)
            self.assertGreaterEqual(st["collections"], 0)
            self.assertGreaterEqual(st["uncollectable"], 0)
        # Check that collection counts are incremented correctly
        if gc.isenabled():
            self.addCleanup(gc.enable)
            gc.disable()
        old = gc.get_stats()
        gc.collect(0)
        new = gc.get_stats()
        self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
        self.assertEqual(new[1]["collections"], old[1]["collections"])
        self.assertEqual(new[2]["collections"], old[2]["collections"])
        gc.collect(2)
        new = gc.get_stats()
        self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
        self.assertEqual(new[1]["collections"], old[1]["collections"])
        self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)

    def test_freeze(self):
        gc.freeze()
        self.assertGreater(gc.get_freeze_count(), 0)
        gc.unfreeze()
        self.assertEqual(gc.get_freeze_count(), 0)

    def test_get_objects(self):
        gc.collect()
        l = []
        l.append(l)
        self.assertTrue(
                any(l is element for element in gc.get_objects())
        )

    @requires_gil_enabled('need generational GC')
    def test_get_objects_generations(self):
        gc.collect()
        l = []
        l.append(l)
        self.assertTrue(
                any(l is element for element in gc.get_objects(generation=0))
        )
        gc.collect()
        self.assertFalse(
                any(l is element for element in gc.get_objects(generation=0))
        )
        del l
        gc.collect()

    def test_get_objects_arguments(self):
        gc.collect()
        self.assertEqual(len(gc.get_objects()),
                         len(gc.get_objects(generation=None)))

        self.assertRaises(ValueError, gc.get_objects, 1000)
        self.assertRaises(ValueError, gc.get_objects, -1000)
        self.assertRaises(TypeError, gc.get_objects, "1")
        self.assertRaises(TypeError, gc.get_objects, 1.234)

    def test_resurrection_only_happens_once_per_object(self):
        class A:  # simple self-loop
            def __init__(self):
                self.me = self

        class Lazarus(A):
            resurrected = 0
            resurrected_instances = []

            def __del__(self):
                Lazarus.resurrected += 1
                Lazarus.resurrected_instances.append(self)

        gc.collect()
        gc.disable()

        # We start with 0 resurrections
        laz = Lazarus()
        self.assertEqual(Lazarus.resurrected, 0)

        # Deleting the instance and triggering a collection
        # resurrects the object
        del laz
        gc.collect()
        self.assertEqual(Lazarus.resurrected, 1)
        self.assertEqual(len(Lazarus.resurrected_instances), 1)

        # Clearing the references and forcing a collection
        # should not resurrect the object again.
        Lazarus.resurrected_instances.clear()
        self.assertEqual(Lazarus.resurrected, 1)
        gc.collect()
        self.assertEqual(Lazarus.resurrected, 1)

        gc.enable()

    def test_resurrection_is_transitive(self):
        class Cargo:
            def __init__(self):
                self.me = self

        class Lazarus:
            resurrected_instances = []

            def __del__(self):
                Lazarus.resurrected_instances.append(self)

        gc.collect()
        gc.disable()

        laz = Lazarus()
        cargo = Cargo()
        cargo_id = id(cargo)

        # Create a cycle between cargo and laz
        laz.cargo = cargo
        cargo.laz = laz

        # Drop the references, force a collection and check that
        # everything was resurrected.
        del laz, cargo
        gc.collect()
        self.assertEqual(len(Lazarus.resurrected_instances), 1)
        instance = Lazarus.resurrected_instances.pop()
        self.assertTrue(hasattr(instance, "cargo"))
        self.assertEqual(id(instance.cargo), cargo_id)

        gc.collect()
        gc.enable()

    def test_resurrection_does_not_block_cleanup_of_other_objects(self):

        # When a finalizer resurrects objects, stats were reporting them as
        # having been collected.  This affected both collect()'s return
        # value and the dicts returned by get_stats().
        N = 100

        class A:  # simple self-loop
            def __init__(self):
                self.me = self

        class Z(A):  # resurrecting __del__
            def __del__(self):
                zs.append(self)

        zs = []

        def getstats():
            d = gc.get_stats()[-1]
            return d['collected'], d['uncollectable']

        gc.collect()
        gc.disable()

        # No problems if just collecting A() instances.
        oldc, oldnc = getstats()
        for i in range(N):
            A()
        t = gc.collect()
        c, nc = getstats()
        self.assertEqual(t, N) # instance objects
        self.assertEqual(c - oldc, N)
        self.assertEqual(nc - oldnc, 0)

        # But Z() is not actually collected.
        oldc, oldnc = c, nc
        Z()
        # Nothing is collected - Z() is merely resurrected.
        t = gc.collect()
        c, nc = getstats()
        self.assertEqual(t, 0)
        self.assertEqual(c - oldc, 0)
        self.assertEqual(nc - oldnc, 0)

        # Z() should not prevent anything else from being collected.
        oldc, oldnc = c, nc
        for i in range(N):
            A()
        Z()
        t = gc.collect()
        c, nc = getstats()
        self.assertEqual(t, N)
        self.assertEqual(c - oldc, N)
        self.assertEqual(nc - oldnc, 0)

        # The A() trash should have been reclaimed already but the
        # 2 copies of Z are still in zs (and the associated dicts).
        oldc, oldnc = c, nc
        zs.clear()
        t = gc.collect()
        c, nc = getstats()
        self.assertEqual(t, 2)
        self.assertEqual(c - oldc, 2)
        self.assertEqual(nc - oldnc, 0)

        gc.enable()

    @unittest.skipIf(ContainerNoGC is None,
                     'requires ContainerNoGC extension type')
    def test_trash_weakref_clear(self):
        # Test that trash weakrefs are properly cleared (bpo-38006).
        #
        # Structure we are creating:
        #
        #   Z <- Y <- A--+--> WZ -> C
        #             ^  |
        #             +--+
        # where:
        #   WZ is a weakref to Z with callback C
        #   Y doesn't implement tp_traverse
        #   A contains a reference to itself, Y and WZ
        #
        # A, Y, Z, WZ are all trash.  The GC doesn't know that Z is trash
        # because Y does not implement tp_traverse.  To show the bug, WZ needs
        # to live long enough so that Z is deallocated before it.  Then, if
        # gcmodule is buggy, when Z is being deallocated, C will run.
        #
        # To ensure WZ lives long enough, we put it in a second reference
        # cycle.  That trick only works due to the ordering of the GC prev/next
        # linked lists.  So, this test is a bit fragile.
        #
        # The bug reported in bpo-38006 is caused because the GC did not
        # clear WZ before starting the process of calling tp_clear on the
        # trash.  Normally, handle_weakrefs() would find the weakref via Z and
        # clear it.  However, since the GC cannot find Z, WR is not cleared and
        # it can execute during delete_garbage().  That can lead to disaster
        # since the callback might tinker with objects that have already had
        # tp_clear called on them (leaving them in possibly invalid states).

        callback = unittest.mock.Mock()

        class A:
            __slots__ = ['a', 'y', 'wz']

        class Z:
            pass

        # setup required object graph, as described above
        a = A()
        a.a = a
        a.y = ContainerNoGC(Z())
        a.wz = weakref.ref(a.y.value, callback)
        # create second cycle to keep WZ alive longer
        wr_cycle = [a.wz]
        wr_cycle.append(wr_cycle)
        # ensure trash unrelated to this test is gone
        gc.collect()
        gc.disable()
        # release references and create trash
        del a, wr_cycle
        gc.collect()
        # if called, it means there is a bug in the GC.  The weakref should be
        # cleared before Z dies.
        callback.assert_not_called()
        gc.enable()

    @cpython_only
    def test_get_referents_on_capsule(self):
        # gh-124538: Calling gc.get_referents() on an untracked capsule must not crash.
        import _datetime
        import _socket
        untracked_capsule = _datetime.datetime_CAPI
        tracked_capsule = _socket.CAPI

        # For whoever sees this in the future: if this is failing
        # after making datetime's capsule tracked, that's fine -- this isn't something
        # users are relying on. Just find a different capsule that is untracked.
        self.assertFalse(gc.is_tracked(untracked_capsule))
        self.assertTrue(gc.is_tracked(tracked_capsule))

        self.assertEqual(len(gc.get_referents(untracked_capsule)), 0)
        gc.get_referents(tracked_capsule)

    @cpython_only
    def test_get_objects_during_gc(self):
        # gh-125859: Calling gc.get_objects() or gc.get_referrers() during a
        # collection should not crash.
        test = self
        collected = False

        class GetObjectsOnDel:
            def __del__(self):
                nonlocal collected
                collected = True
                objs = gc.get_objects()
                # NB: can't use "in" here because some objects override __eq__
                for obj in objs:
                    test.assertTrue(obj is not self)
                test.assertEqual(gc.get_referrers(self), [])

        obj = GetObjectsOnDel()
        obj.cycle = obj
        del obj

        gc.collect()
        self.assertTrue(collected)

    def test_traverse_frozen_objects(self):
        # See GH-126312: Objects that were not frozen could traverse over
        # a frozen object on the free-threaded build, which would cause
        # a negative reference count.
        x = [1, 2, 3]
        gc.freeze()
        y = [x]
        y.append(y)
        del y
        gc.collect()
        gc.unfreeze()

    def test_deferred_refcount_frozen(self):
        # Also from GH-126312: objects that use deferred reference counting
        # weren't ignored if they were frozen. Unfortunately, it's pretty
        # difficult to come up with a case that triggers this.
        #
        # Calling gc.collect() while the garbage collector is frozen doesn't
        # trigger this normally, but it *does* if it's inside unittest for whatever
        # reason. We can't call unittest from inside a test, so it has to be
        # in a subprocess.
        source = textwrap.dedent("""
        import gc
        import unittest


        class Test(unittest.TestCase):
            def test_something(self):
                gc.freeze()
                gc.collect()
                gc.unfreeze()


        if __name__ == "__main__":
            unittest.main()
        """)
        assert_python_ok("-c", source)


class IncrementalGCTests(unittest.TestCase):

    def setUp(self):
        # Reenable GC as it is disabled module-wide
        gc.enable()

    def tearDown(self):
        gc.disable()

    @unittest.skipIf(_testinternalcapi is None, "requires _testinternalcapi")
    @requires_gil_enabled("Free threading does not support incremental GC")
    # Use small increments to emulate longer running process in a shorter time
    @gc_threshold(200, 10)
    def test_incremental_gc_handles_fast_cycle_creation(self):

        class LinkedList:

            #Use slots to reduce number of implicit objects
            __slots__ = "next", "prev", "surprise"

            def __init__(self, next=None, prev=None):
                self.next = next
                if next is not None:
                    next.prev = self
                self.prev = prev
                if prev is not None:
                    prev.next = self

        def make_ll(depth):
            head = LinkedList()
            for i in range(depth):
                head = LinkedList(head, head.prev)
            return head

        head = make_ll(1000)
        count = 1000

        # There will be some objects we aren't counting,
        # e.g. the gc stats dicts. This test checks
        # that the counts don't grow, so we try to
        # correct for the uncounted objects
        # This is just an estimate.
        CORRECTION = 20

        enabled = gc.isenabled()
        gc.enable()
        olds = []
        initial_heap_size = _testinternalcapi.get_tracked_heap_size()
        for i in range(20_000):
            newhead = make_ll(20)
            count += 20
            newhead.surprise = head
            olds.append(newhead)
            if len(olds) == 20:
                new_objects = _testinternalcapi.get_tracked_heap_size() - initial_heap_size
                self.assertLess(new_objects, 27_000, f"Heap growing. Reached limit after {i} iterations")
                del olds[:]
        if not enabled:
            gc.disable()


class GCCallbackTests(unittest.TestCase):
    def setUp(self):
        # Save gc state and disable it.
        self.enabled = gc.isenabled()
        gc.disable()
        self.debug = gc.get_debug()
        gc.set_debug(0)
        gc.callbacks.append(self.cb1)
        gc.callbacks.append(self.cb2)
        self.othergarbage = []

    def tearDown(self):
        # Restore gc state
        del self.visit
        gc.callbacks.remove(self.cb1)
        gc.callbacks.remove(self.cb2)
        gc.set_debug(self.debug)
        if self.enabled:
            gc.enable()
        # destroy any uncollectables
        gc.collect()
        for obj in gc.garbage:
            if isinstance(obj, Uncollectable):
                obj.partner = None
        del gc.garbage[:]
        del self.othergarbage
        gc.collect()

    def preclean(self):
        # Remove all fluff from the system.  Invoke this function
        # manually rather than through self.setUp() for maximum
        # safety.
        self.visit = []
        gc.collect()
        garbage, gc.garbage[:] = gc.garbage[:], []
        self.othergarbage.append(garbage)
        self.visit = []

    def cb1(self, phase, info):
        self.visit.append((1, phase, dict(info)))

    def cb2(self, phase, info):
        self.visit.append((2, phase, dict(info)))
        if phase == "stop" and hasattr(self, "cleanup"):
            # Clean Uncollectable from garbage
            uc = [e for e in gc.garbage if isinstance(e, Uncollectable)]
            gc.garbage[:] = [e for e in gc.garbage
                             if not isinstance(e, Uncollectable)]
            for e in uc:
                e.partner = None

    def test_collect(self):
        self.preclean()
        gc.collect()
        # Algorithmically verify the contents of self.visit
        # because it is long and tortuous.

        # Count the number of visits to each callback
        n = [v[0] for v in self.visit]
        n1 = [i for i in n if i == 1]
        n2 = [i for i in n if i == 2]
        self.assertEqual(n1, [1]*2)
        self.assertEqual(n2, [2]*2)

        # Count that we got the right number of start and stop callbacks.
        n = [v[1] for v in self.visit]
        n1 = [i for i in n if i == "start"]
        n2 = [i for i in n if i == "stop"]
        self.assertEqual(n1, ["start"]*2)
        self.assertEqual(n2, ["stop"]*2)

        # Check that we got the right info dict for all callbacks
        for v in self.visit:
            info = v[2]
            self.assertTrue("generation" in info)
            self.assertTrue("collected" in info)
            self.assertTrue("uncollectable" in info)

    def test_collect_generation(self):
        self.preclean()
        gc.collect(2)
        for v in self.visit:
            info = v[2]
            self.assertEqual(info["generation"], 2)

    @cpython_only
    def test_collect_garbage(self):
        self.preclean()
        # Each of these cause two objects to be garbage:
        Uncollectable()
        Uncollectable()
        C1055820(666)
        gc.collect()
        for v in self.visit:
            if v[1] != "stop":
                continue
            info = v[2]
            self.assertEqual(info["collected"], 1)
            self.assertEqual(info["uncollectable"], 4)

        # We should now have the Uncollectables in gc.garbage
        self.assertEqual(len(gc.garbage), 4)
        for e in gc.garbage:
            self.assertIsInstance(e, Uncollectable)

        # Now, let our callback handle the Uncollectable instances
        self.cleanup=True
        self.visit = []
        gc.garbage[:] = []
        gc.collect()
        for v in self.visit:
            if v[1] != "stop":
                continue
            info = v[2]
            self.assertEqual(info["collected"], 0)
            self.assertEqual(info["uncollectable"], 2)

        # Uncollectables should be gone
        self.assertEqual(len(gc.garbage), 0)


    @requires_subprocess()
    @unittest.skipIf(BUILD_WITH_NDEBUG,
                     'built with -NDEBUG')
    def test_refcount_errors(self):
        self.preclean()
        # Verify the "handling" of objects with broken refcounts

        # Skip the test if ctypes is not available
        import_module("ctypes")

        import subprocess
        code = textwrap.dedent('''
            from test.support import gc_collect, SuppressCrashReport

            a = [1, 2, 3]
            b = [a, a]
            a.append(b)

            # Avoid coredump when Py_FatalError() calls abort()
            SuppressCrashReport().__enter__()

            # Simulate the refcount of "a" being too low (compared to the
            # references held on it by live data), but keeping it above zero
            # (to avoid deallocating it):
            import ctypes
            ctypes.pythonapi.Py_DecRef(ctypes.py_object(a))
            del a
            del b

            # The garbage collector should now have a fatal error
            # when it reaches the broken object
            gc_collect()
        ''')
        p = subprocess.Popen([sys.executable, "-c", code],
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        stdout, stderr = p.communicate()
        p.stdout.close()
        p.stderr.close()
        # Verify that stderr has a useful error message:
        self.assertRegex(stderr,
            br'gc.*\.c:[0-9]+: .*: Assertion "gc_get_refs\(.+\) .*" failed.')
        self.assertRegex(stderr,
            br'refcount is too small')
        # "address : 0x7fb5062efc18"
        # "address : 7FB5062EFC18"
        address_regex = br'[0-9a-fA-Fx]+'
        self.assertRegex(stderr,
            br'object address  : ' + address_regex)
        self.assertRegex(stderr,
            br'object refcount : 1')
        self.assertRegex(stderr,
            br'object type     : ' + address_regex)
        self.assertRegex(stderr,
            br'object type name: list')
        self.assertRegex(stderr,
            br'object repr     : \[1, 2, 3, \[\[...\], \[...\]\]\]')


class GCTogglingTests(unittest.TestCase):
    def setUp(self):
        gc.enable()

    def tearDown(self):
        gc.disable()

    def test_bug1055820c(self):
        # Corresponds to temp2c.py in the bug report.  This is pretty
        # elaborate.

        c0 = C1055820(0)
        # Move c0 into generation 2.
        gc.collect()

        c1 = C1055820(1)
        c1.keep_c0_alive = c0
        del c0.loop # now only c1 keeps c0 alive

        c2 = C1055820(2)
        c2wr = weakref.ref(c2) # no callback!

        ouch = []
        def callback(ignored):
            ouch[:] = [c2wr()]

        # The callback gets associated with a wr on an object in generation 2.
        c0wr = weakref.ref(c0, callback)

        c0 = c1 = c2 = None

        # What we've set up:  c0, c1, and c2 are all trash now.  c0 is in
        # generation 2.  The only thing keeping it alive is that c1 points to
        # it. c1 and c2 are in generation 0, and are in self-loops.  There's a
        # global weakref to c2 (c2wr), but that weakref has no callback.
        # There's also a global weakref to c0 (c0wr), and that does have a
        # callback, and that callback references c2 via c2wr().
        #
        #               c0 has a wr with callback, which references c2wr
        #               ^
        #               |
        #               |     Generation 2 above dots
        #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
        #               |     Generation 0 below dots
        #               |
        #               |
        #            ^->c1   ^->c2 has a wr but no callback
        #            |  |    |  |
        #            <--v    <--v
        #
        # So this is the nightmare:  when generation 0 gets collected, we see
        # that c2 has a callback-free weakref, and c1 doesn't even have a
        # weakref.  Collecting generation 0 doesn't see c0 at all, and c0 is
        # the only object that has a weakref with a callback.  gc clears c1
        # and c2.  Clearing c1 has the side effect of dropping the refcount on
        # c0 to 0, so c0 goes away (despite that it's in an older generation)
        # and c0's wr callback triggers.  That in turn materializes a reference
        # to c2 via c2wr(), but c2 gets cleared anyway by gc.

        # We want to let gc happen "naturally", to preserve the distinction
        # between generations.
        junk = []
        i = 0
        detector = GC_Detector()
        if Py_GIL_DISABLED:
            # The free-threaded build doesn't have multiple generations, so
            # just trigger a GC manually.
            gc.collect()
        while not detector.gc_happened:
            i += 1
            if i > 10000:
                self.fail("gc didn't happen after 10000 iterations")
            self.assertEqual(len(ouch), 0)
            junk.append([])  # this will eventually trigger gc

        self.assertEqual(len(ouch), 1)  # else the callback wasn't invoked
        for x in ouch:
            # If the callback resurrected c2, the instance would be damaged,
            # with an empty __dict__.
            self.assertEqual(x, None)

    @gc_threshold(1000, 0, 0)
    def test_bug1055820d(self):
        # Corresponds to temp2d.py in the bug report.  This is very much like
        # test_bug1055820c, but uses a __del__ method instead of a weakref
        # callback to sneak in a resurrection of cyclic trash.

        ouch = []
        class D(C1055820):
            def __del__(self):
                ouch[:] = [c2wr()]

        d0 = D(0)
        # Move all the above into generation 2.
        gc.collect()

        c1 = C1055820(1)
        c1.keep_d0_alive = d0
        del d0.loop # now only c1 keeps d0 alive

        c2 = C1055820(2)
        c2wr = weakref.ref(c2) # no callback!

        d0 = c1 = c2 = None

        # What we've set up:  d0, c1, and c2 are all trash now.  d0 is in
        # generation 2.  The only thing keeping it alive is that c1 points to
        # it.  c1 and c2 are in generation 0, and are in self-loops.  There's
        # a global weakref to c2 (c2wr), but that weakref has no callback.
        # There are no other weakrefs.
        #
        #               d0 has a __del__ method that references c2wr
        #               ^
        #               |
        #               |     Generation 2 above dots
        #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
        #               |     Generation 0 below dots
        #               |
        #               |
        #            ^->c1   ^->c2 has a wr but no callback
        #            |  |    |  |
        #            <--v    <--v
        #
        # So this is the nightmare:  when generation 0 gets collected, we see
        # that c2 has a callback-free weakref, and c1 doesn't even have a
        # weakref.  Collecting generation 0 doesn't see d0 at all.  gc clears
        # c1 and c2.  Clearing c1 has the side effect of dropping the refcount
        # on d0 to 0, so d0 goes away (despite that it's in an older
        # generation) and d0's __del__ triggers.  That in turn materializes
        # a reference to c2 via c2wr(), but c2 gets cleared anyway by gc.

        # We want to let gc happen "naturally", to preserve the distinction
        # between generations.
        detector = GC_Detector()
        junk = []
        i = 0
        if Py_GIL_DISABLED:
            # The free-threaded build doesn't have multiple generations, so
            # just trigger a GC manually.
            gc.collect()
        while not detector.gc_happened:
            i += 1
            if i > 10000:
                self.fail("gc didn't happen after 10000 iterations")
            self.assertEqual(len(ouch), 0)
            junk.append([])  # this will eventually trigger gc

        self.assertEqual(len(ouch), 1)  # else __del__ wasn't invoked
        for x in ouch:
            # If __del__ resurrected c2, the instance would be damaged, with an
            # empty __dict__.
            self.assertEqual(x, None)

    @gc_threshold(1000, 0, 0)
    def test_indirect_calls_with_gc_disabled(self):
        junk = []
        i = 0
        detector = GC_Detector()
        while not detector.gc_happened:
            i += 1
            if i > 10000:
                self.fail("gc didn't happen after 10000 iterations")
            junk.append([])  # this will eventually trigger gc

        try:
            gc.disable()
            junk = []
            i = 0
            detector = GC_Detector()
            while not detector.gc_happened:
                i += 1
                if i > 10000:
                    break
                junk.append([])  # this may eventually trigger gc (if it is enabled)

            self.assertEqual(i, 10001)
        finally:
            gc.enable()


class PythonFinalizationTests(unittest.TestCase):
    def test_ast_fini(self):
        # bpo-44184: Regression test for subtype_dealloc() when deallocating
        # an AST instance also destroy its AST type: subtype_dealloc() must
        # not access the type memory after deallocating the instance, since
        # the type memory can be freed as well. The test is also related to
        # _PyAST_Fini() which clears references to AST types.
        code = textwrap.dedent("""
            import ast
            import codecs
            from test import support

            # Small AST tree to keep their AST types alive
            tree = ast.parse("def f(x, y): return 2*x-y")

            # Store the tree somewhere to survive until the last GC collection
            support.late_deletion(tree)
        """)
        assert_python_ok("-c", code)


def setUpModule():
    global enabled, debug
    enabled = gc.isenabled()
    gc.disable()
    assert not gc.isenabled()
    debug = gc.get_debug()
    gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak
    gc.collect() # Delete 2nd generation garbage


def tearDownModule():
    gc.set_debug(debug)
    # test gc.enable() even if GC is disabled by default
    if verbose:
        print("restoring automatic collection")
    # make sure to always test gc.enable()
    gc.enable()
    assert gc.isenabled()
    if not enabled:
        gc.disable()


if __name__ == "__main__":
    unittest.main()