cpython/Lib/test/test_monitoring.py

"""Test suite for the sys.monitoring."""

import collections
import dis
import functools
import math
import operator
import sys
import textwrap
import types
import unittest

import test.support
from test.support import requires_specialization, script_helper
from test.support.import_helper import import_module

_testcapi = test.support.import_helper.import_module("_testcapi")

PAIR = (0,1)

def f1():
    pass

def f2():
    len([])
    sys.getsizeof(0)

def floop():
    for item in PAIR:
        pass

def gen():
    yield
    yield

def g1():
    for _ in gen():
        pass

TEST_TOOL = 2
TEST_TOOL2 = 3
TEST_TOOL3 = 4

def nth_line(func, offset):
    return func.__code__.co_firstlineno + offset

class MonitoringBasicTest(unittest.TestCase):

    def test_has_objects(self):
        m = sys.monitoring
        m.events
        m.use_tool_id
        m.free_tool_id
        m.get_tool
        m.get_events
        m.set_events
        m.get_local_events
        m.set_local_events
        m.register_callback
        m.restart_events
        m.DISABLE
        m.MISSING
        m.events.NO_EVENTS

    def test_tool(self):
        sys.monitoring.use_tool_id(TEST_TOOL, "MonitoringTest.Tool")
        self.assertEqual(sys.monitoring.get_tool(TEST_TOOL), "MonitoringTest.Tool")
        sys.monitoring.set_events(TEST_TOOL, 15)
        self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 15)
        sys.monitoring.set_events(TEST_TOOL, 0)
        with self.assertRaises(ValueError):
            sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.C_RETURN)
        with self.assertRaises(ValueError):
            sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.C_RAISE)
        sys.monitoring.free_tool_id(TEST_TOOL)
        self.assertEqual(sys.monitoring.get_tool(TEST_TOOL), None)
        with self.assertRaises(ValueError):
            sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.CALL)


class MonitoringTestBase:

    def setUp(self):
        # Check that a previous test hasn't left monitoring on.
        for tool in range(6):
            self.assertEqual(sys.monitoring.get_events(tool), 0)
        self.assertIs(sys.monitoring.get_tool(TEST_TOOL), None)
        self.assertIs(sys.monitoring.get_tool(TEST_TOOL2), None)
        self.assertIs(sys.monitoring.get_tool(TEST_TOOL3), None)
        sys.monitoring.use_tool_id(TEST_TOOL, "test " + self.__class__.__name__)
        sys.monitoring.use_tool_id(TEST_TOOL2, "test2 " + self.__class__.__name__)
        sys.monitoring.use_tool_id(TEST_TOOL3, "test3 " + self.__class__.__name__)

    def tearDown(self):
        # Check that test hasn't left monitoring on.
        for tool in range(6):
            self.assertEqual(sys.monitoring.get_events(tool), 0)
        sys.monitoring.free_tool_id(TEST_TOOL)
        sys.monitoring.free_tool_id(TEST_TOOL2)
        sys.monitoring.free_tool_id(TEST_TOOL3)


class MonitoringCountTest(MonitoringTestBase, unittest.TestCase):

    def check_event_count(self, func, event, expected):

        class Counter:
            def __init__(self):
                self.count = 0
            def __call__(self, *args):
                self.count += 1

        counter = Counter()
        sys.monitoring.register_callback(TEST_TOOL, event, counter)
        if event == E.C_RETURN or event == E.C_RAISE:
            sys.monitoring.set_events(TEST_TOOL, E.CALL)
        else:
            sys.monitoring.set_events(TEST_TOOL, event)
        self.assertEqual(counter.count, 0)
        counter.count = 0
        func()
        self.assertEqual(counter.count, expected)
        prev = sys.monitoring.register_callback(TEST_TOOL, event, None)
        counter.count = 0
        func()
        self.assertEqual(counter.count, 0)
        self.assertEqual(prev, counter)
        sys.monitoring.set_events(TEST_TOOL, 0)

    def test_start_count(self):
        self.check_event_count(f1, E.PY_START, 1)

    def test_resume_count(self):
        self.check_event_count(g1, E.PY_RESUME, 2)

    def test_return_count(self):
        self.check_event_count(f1, E.PY_RETURN, 1)

    def test_call_count(self):
        self.check_event_count(f2, E.CALL, 3)

    def test_c_return_count(self):
        self.check_event_count(f2, E.C_RETURN, 2)


E = sys.monitoring.events

INSTRUMENTED_EVENTS = [
    (E.PY_START, "start"),
    (E.PY_RESUME, "resume"),
    (E.PY_RETURN, "return"),
    (E.PY_YIELD, "yield"),
    (E.JUMP, "jump"),
    (E.BRANCH, "branch"),
]

EXCEPT_EVENTS = [
    (E.RAISE, "raise"),
    (E.PY_UNWIND, "unwind"),
    (E.EXCEPTION_HANDLED, "exception_handled"),
]

SIMPLE_EVENTS = INSTRUMENTED_EVENTS + EXCEPT_EVENTS + [
    (E.C_RAISE, "c_raise"),
    (E.C_RETURN, "c_return"),
]


SIMPLE_EVENT_SET = functools.reduce(operator.or_, [ev for (ev, _) in SIMPLE_EVENTS], 0) | E.CALL


def just_pass():
    pass

just_pass.events = [
    "py_call",
    "start",
    "return",
]

def just_raise():
    raise Exception

just_raise.events = [
    'py_call',
    "start",
    "raise",
    "unwind",
]

def just_call():
    len([])

just_call.events = [
    'py_call',
    "start",
    "c_call",
    "c_return",
    "return",
]

def caught():
    try:
        1/0
    except Exception:
        pass

caught.events = [
    'py_call',
    "start",
    "raise",
    "exception_handled",
    "branch",
    "return",
]

def nested_call():
    just_pass()

nested_call.events = [
    "py_call",
    "start",
    "py_call",
    "start",
    "return",
    "return",
]

PY_CALLABLES = (types.FunctionType, types.MethodType)

class MonitoringEventsBase(MonitoringTestBase):

    def gather_events(self, func):
        events = []
        for event, event_name in SIMPLE_EVENTS:
            def record(*args, event_name=event_name):
                events.append(event_name)
            sys.monitoring.register_callback(TEST_TOOL, event, record)
        def record_call(code, offset, obj, arg):
            if isinstance(obj, PY_CALLABLES):
                events.append("py_call")
            else:
                events.append("c_call")
        sys.monitoring.register_callback(TEST_TOOL, E.CALL, record_call)
        sys.monitoring.set_events(TEST_TOOL, SIMPLE_EVENT_SET)
        events = []
        try:
            func()
        except:
            pass
        sys.monitoring.set_events(TEST_TOOL, 0)
        #Remove the final event, the call to `sys.monitoring.set_events`
        events = events[:-1]
        return events

    def check_events(self, func, expected=None):
        events = self.gather_events(func)
        if expected is None:
            expected = func.events
        self.assertEqual(events, expected)

class MonitoringEventsTest(MonitoringEventsBase, unittest.TestCase):

    def test_just_pass(self):
        self.check_events(just_pass)

    def test_just_raise(self):
        try:
            self.check_events(just_raise)
        except Exception:
            pass
        self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 0)

    def test_just_call(self):
        self.check_events(just_call)

    def test_caught(self):
        self.check_events(caught)

    def test_nested_call(self):
        self.check_events(nested_call)

UP_EVENTS = (E.C_RETURN, E.C_RAISE, E.PY_RETURN, E.PY_UNWIND, E.PY_YIELD)
DOWN_EVENTS = (E.PY_START, E.PY_RESUME)

from test.profilee import testfunc

class SimulateProfileTest(MonitoringEventsBase, unittest.TestCase):

    def test_balanced(self):
        events = self.gather_events(testfunc)
        c = collections.Counter(events)
        self.assertEqual(c["c_call"], c["c_return"])
        self.assertEqual(c["start"], c["return"] + c["unwind"])
        self.assertEqual(c["raise"], c["exception_handled"] + c["unwind"])

    def test_frame_stack(self):
        self.maxDiff = None
        stack = []
        errors = []
        seen = set()
        def up(*args):
            frame = sys._getframe(1)
            if not stack:
                errors.append("empty")
            else:
                expected = stack.pop()
                if frame != expected:
                    errors.append(f" Popping {frame} expected {expected}")
        def down(*args):
            frame = sys._getframe(1)
            stack.append(frame)
            seen.add(frame.f_code)
        def call(code, offset, callable, arg):
            if not isinstance(callable, PY_CALLABLES):
                stack.append(sys._getframe(1))
        for event in UP_EVENTS:
            sys.monitoring.register_callback(TEST_TOOL, event, up)
        for event in DOWN_EVENTS:
            sys.monitoring.register_callback(TEST_TOOL, event, down)
        sys.monitoring.register_callback(TEST_TOOL, E.CALL, call)
        sys.monitoring.set_events(TEST_TOOL, SIMPLE_EVENT_SET)
        testfunc()
        sys.monitoring.set_events(TEST_TOOL, 0)
        self.assertEqual(errors, [])
        self.assertEqual(stack, [sys._getframe()])
        self.assertEqual(len(seen), 9)


class CounterWithDisable:

    def __init__(self):
        self.disable = False
        self.count = 0

    def __call__(self, *args):
        self.count += 1
        if self.disable:
            return sys.monitoring.DISABLE


class RecorderWithDisable:

    def __init__(self, events):
        self.disable = False
        self.events = events

    def __call__(self, code, event):
        self.events.append(event)
        if self.disable:
            return sys.monitoring.DISABLE


class MontoringDisableAndRestartTest(MonitoringTestBase, unittest.TestCase):

    def test_disable(self):
        try:
            counter = CounterWithDisable()
            sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter)
            sys.monitoring.set_events(TEST_TOOL, E.PY_START)
            self.assertEqual(counter.count, 0)
            counter.count = 0
            f1()
            self.assertEqual(counter.count, 1)
            counter.disable = True
            counter.count = 0
            f1()
            self.assertEqual(counter.count, 1)
            counter.count = 0
            f1()
            self.assertEqual(counter.count, 0)
            sys.monitoring.set_events(TEST_TOOL, 0)
        finally:
            sys.monitoring.restart_events()

    def test_restart(self):
        try:
            counter = CounterWithDisable()
            sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter)
            sys.monitoring.set_events(TEST_TOOL, E.PY_START)
            counter.disable = True
            f1()
            counter.count = 0
            f1()
            self.assertEqual(counter.count, 0)
            sys.monitoring.restart_events()
            counter.count = 0
            f1()
            self.assertEqual(counter.count, 1)
            sys.monitoring.set_events(TEST_TOOL, 0)
        finally:
            sys.monitoring.restart_events()


class MultipleMonitorsTest(MonitoringTestBase, unittest.TestCase):

    def test_two_same(self):
        try:
            self.assertEqual(sys.monitoring._all_events(), {})
            counter1 = CounterWithDisable()
            counter2 = CounterWithDisable()
            sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
            sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2)
            sys.monitoring.set_events(TEST_TOOL, E.PY_START)
            sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
            self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
            self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
            self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2)})
            counter1.count = 0
            counter2.count = 0
            f1()
            count1 = counter1.count
            count2 = counter2.count
            self.assertEqual((count1, count2), (1, 1))
        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)
            sys.monitoring.set_events(TEST_TOOL2, 0)
            sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
            sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None)
            self.assertEqual(sys.monitoring._all_events(), {})

    def test_three_same(self):
        try:
            self.assertEqual(sys.monitoring._all_events(), {})
            counter1 = CounterWithDisable()
            counter2 = CounterWithDisable()
            counter3 = CounterWithDisable()
            sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
            sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2)
            sys.monitoring.register_callback(TEST_TOOL3, E.PY_START, counter3)
            sys.monitoring.set_events(TEST_TOOL, E.PY_START)
            sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
            sys.monitoring.set_events(TEST_TOOL3, E.PY_START)
            self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
            self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
            self.assertEqual(sys.monitoring.get_events(TEST_TOOL3), E.PY_START)
            self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2) | (1 << TEST_TOOL3)})
            counter1.count = 0
            counter2.count = 0
            counter3.count = 0
            f1()
            count1 = counter1.count
            count2 = counter2.count
            count3 = counter3.count
            self.assertEqual((count1, count2, count3), (1, 1, 1))
        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)
            sys.monitoring.set_events(TEST_TOOL2, 0)
            sys.monitoring.set_events(TEST_TOOL3, 0)
            sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
            sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None)
            sys.monitoring.register_callback(TEST_TOOL3, E.PY_START, None)
            self.assertEqual(sys.monitoring._all_events(), {})

    def test_two_different(self):
        try:
            self.assertEqual(sys.monitoring._all_events(), {})
            counter1 = CounterWithDisable()
            counter2 = CounterWithDisable()
            sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
            sys.monitoring.register_callback(TEST_TOOL2, E.PY_RETURN, counter2)
            sys.monitoring.set_events(TEST_TOOL, E.PY_START)
            sys.monitoring.set_events(TEST_TOOL2, E.PY_RETURN)
            self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
            self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_RETURN)
            self.assertEqual(sys.monitoring._all_events(), {'PY_START': 1 << TEST_TOOL, 'PY_RETURN': 1 << TEST_TOOL2})
            counter1.count = 0
            counter2.count = 0
            f1()
            count1 = counter1.count
            count2 = counter2.count
            self.assertEqual((count1, count2), (1, 1))
        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)
            sys.monitoring.set_events(TEST_TOOL2, 0)
            sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
            sys.monitoring.register_callback(TEST_TOOL2, E.PY_RETURN, None)
            self.assertEqual(sys.monitoring._all_events(), {})

    def test_two_with_disable(self):
        try:
            self.assertEqual(sys.monitoring._all_events(), {})
            counter1 = CounterWithDisable()
            counter2 = CounterWithDisable()
            sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
            sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2)
            sys.monitoring.set_events(TEST_TOOL, E.PY_START)
            sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
            self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
            self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
            self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2)})
            counter1.count = 0
            counter2.count = 0
            counter1.disable = True
            f1()
            count1 = counter1.count
            count2 = counter2.count
            self.assertEqual((count1, count2), (1, 1))
            counter1.count = 0
            counter2.count = 0
            f1()
            count1 = counter1.count
            count2 = counter2.count
            self.assertEqual((count1, count2), (0, 1))
        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)
            sys.monitoring.set_events(TEST_TOOL2, 0)
            sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
            sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None)
            self.assertEqual(sys.monitoring._all_events(), {})
            sys.monitoring.restart_events()

    def test_with_instruction_event(self):
        """Test that the second tool can set events with instruction events set by the first tool."""
        def f():
            pass
        code = f.__code__

        try:
            self.assertEqual(sys.monitoring._all_events(), {})
            sys.monitoring.set_local_events(TEST_TOOL, code, E.INSTRUCTION | E.LINE)
            sys.monitoring.set_local_events(TEST_TOOL2, code, E.LINE)
        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)
            sys.monitoring.set_events(TEST_TOOL2, 0)
            self.assertEqual(sys.monitoring._all_events(), {})


class LineMonitoringTest(MonitoringTestBase, unittest.TestCase):

    def test_lines_single(self):
        try:
            self.assertEqual(sys.monitoring._all_events(), {})
            events = []
            recorder = RecorderWithDisable(events)
            sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
            sys.monitoring.set_events(TEST_TOOL, E.LINE)
            f1()
            sys.monitoring.set_events(TEST_TOOL, 0)
            sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
            start = nth_line(LineMonitoringTest.test_lines_single, 0)
            self.assertEqual(events, [start+7, nth_line(f1, 1), start+8])
        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)
            sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
            self.assertEqual(sys.monitoring._all_events(), {})
            sys.monitoring.restart_events()

    def test_lines_loop(self):
        try:
            self.assertEqual(sys.monitoring._all_events(), {})
            events = []
            recorder = RecorderWithDisable(events)
            sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
            sys.monitoring.set_events(TEST_TOOL, E.LINE)
            floop()
            sys.monitoring.set_events(TEST_TOOL, 0)
            sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
            start = nth_line(LineMonitoringTest.test_lines_loop, 0)
            floop_1 = nth_line(floop, 1)
            floop_2 = nth_line(floop, 2)
            self.assertEqual(
                events,
                [start+7, floop_1, floop_2, floop_1, floop_2, floop_1, start+8]
            )
        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)
            sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
            self.assertEqual(sys.monitoring._all_events(), {})
            sys.monitoring.restart_events()

    def test_lines_two(self):
        try:
            self.assertEqual(sys.monitoring._all_events(), {})
            events = []
            recorder = RecorderWithDisable(events)
            events2 = []
            recorder2 = RecorderWithDisable(events2)
            sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
            sys.monitoring.register_callback(TEST_TOOL2, E.LINE, recorder2)
            sys.monitoring.set_events(TEST_TOOL, E.LINE); sys.monitoring.set_events(TEST_TOOL2, E.LINE)
            f1()
            sys.monitoring.set_events(TEST_TOOL, 0); sys.monitoring.set_events(TEST_TOOL2, 0)
            sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
            sys.monitoring.register_callback(TEST_TOOL2, E.LINE, None)
            start = nth_line(LineMonitoringTest.test_lines_two, 0)
            expected = [start+10, nth_line(f1, 1), start+11]
            self.assertEqual(events, expected)
            self.assertEqual(events2, expected)
        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)
            sys.monitoring.set_events(TEST_TOOL2, 0)
            sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
            sys.monitoring.register_callback(TEST_TOOL2, E.LINE, None)
            self.assertEqual(sys.monitoring._all_events(), {})
            sys.monitoring.restart_events()

    def check_lines(self, func, expected, tool=TEST_TOOL):
        try:
            self.assertEqual(sys.monitoring._all_events(), {})
            events = []
            recorder = RecorderWithDisable(events)
            sys.monitoring.register_callback(tool, E.LINE, recorder)
            sys.monitoring.set_events(tool, E.LINE)
            func()
            sys.monitoring.set_events(tool, 0)
            sys.monitoring.register_callback(tool, E.LINE, None)
            lines = [ line - func.__code__.co_firstlineno for line in events[1:-1] ]
            self.assertEqual(lines, expected)
        finally:
            sys.monitoring.set_events(tool, 0)


    def test_linear(self):

        def func():
            line = 1
            line = 2
            line = 3
            line = 4
            line = 5

        self.check_lines(func, [1,2,3,4,5])

    def test_branch(self):
        def func():
            if "true".startswith("t"):
                line = 2
                line = 3
            else:
                line = 5
            line = 6

        self.check_lines(func, [1,2,3,6])

    def test_try_except(self):

        def func1():
            try:
                line = 2
                line = 3
            except:
                line = 5
            line = 6

        self.check_lines(func1, [1,2,3,6])

        def func2():
            try:
                line = 2
                raise 3
            except:
                line = 5
            line = 6

        self.check_lines(func2, [1,2,3,4,5,6])

    def test_generator_with_line(self):

        def f():
            def a():
                yield
            def b():
                yield from a()
            next(b())

        self.check_lines(f, [1,3,5,4,2,4])

class TestDisable(MonitoringTestBase, unittest.TestCase):

    def gen(self, cond):
        for i in range(10):
            if cond:
                yield 1
            else:
                yield 2

    def raise_handle_reraise(self):
        try:
            1/0
        except:
            raise

    def test_disable_legal_events(self):
        for event, name in INSTRUMENTED_EVENTS:
            try:
                counter = CounterWithDisable()
                counter.disable = True
                sys.monitoring.register_callback(TEST_TOOL, event, counter)
                sys.monitoring.set_events(TEST_TOOL, event)
                for _ in self.gen(1):
                    pass
                self.assertLess(counter.count, 4)
            finally:
                sys.monitoring.set_events(TEST_TOOL, 0)
                sys.monitoring.register_callback(TEST_TOOL, event, None)


    def test_disable_illegal_events(self):
        for event, name in EXCEPT_EVENTS:
            try:
                counter = CounterWithDisable()
                counter.disable = True
                sys.monitoring.register_callback(TEST_TOOL, event, counter)
                sys.monitoring.set_events(TEST_TOOL, event)
                with self.assertRaises(ValueError):
                    self.raise_handle_reraise()
            finally:
                sys.monitoring.set_events(TEST_TOOL, 0)
                sys.monitoring.register_callback(TEST_TOOL, event, None)


class ExceptionRecorder:

    event_type = E.RAISE

    def __init__(self, events):
        self.events = events

    def __call__(self, code, offset, exc):
        self.events.append(("raise", type(exc)))

class CheckEvents(MonitoringTestBase, unittest.TestCase):

    def get_events(self, func, tool, recorders):
        try:
            self.assertEqual(sys.monitoring._all_events(), {})
            event_list = []
            all_events = 0
            for recorder in recorders:
                ev = recorder.event_type
                sys.monitoring.register_callback(tool, ev, recorder(event_list))
                all_events |= ev
            sys.monitoring.set_events(tool, all_events)
            func()
            sys.monitoring.set_events(tool, 0)
            for recorder in recorders:
                sys.monitoring.register_callback(tool, recorder.event_type, None)
            return event_list
        finally:
            sys.monitoring.set_events(tool, 0)
            for recorder in recorders:
                sys.monitoring.register_callback(tool, recorder.event_type, None)

    def check_events(self, func, expected, tool=TEST_TOOL, recorders=(ExceptionRecorder,)):
        events = self.get_events(func, tool, recorders)
        if events != expected:
            print(events, file = sys.stderr)
        self.assertEqual(events, expected)

    def check_balanced(self, func, recorders):
        events = self.get_events(func, TEST_TOOL, recorders)
        self.assertEqual(len(events)%2, 0)
        for r, h in zip(events[::2],events[1::2]):
            r0 = r[0]
            self.assertIn(r0, ("raise", "reraise"))
            h0 = h[0]
            self.assertIn(h0, ("handled", "unwind"))
            self.assertEqual(r[1], h[1])


class StopiterationRecorder(ExceptionRecorder):

    event_type = E.STOP_ITERATION

class ReraiseRecorder(ExceptionRecorder):

    event_type = E.RERAISE

    def __call__(self, code, offset, exc):
        self.events.append(("reraise", type(exc)))

class UnwindRecorder(ExceptionRecorder):

    event_type = E.PY_UNWIND

    def __call__(self, code, offset, exc):
        self.events.append(("unwind", type(exc), code.co_name))

class ExceptionHandledRecorder(ExceptionRecorder):

    event_type = E.EXCEPTION_HANDLED

    def __call__(self, code, offset, exc):
        self.events.append(("handled", type(exc)))

class ThrowRecorder(ExceptionRecorder):

    event_type = E.PY_THROW

    def __call__(self, code, offset, exc):
        self.events.append(("throw", type(exc)))

class CallRecorder:

    event_type = E.CALL

    def __init__(self, events):
        self.events = events

    def __call__(self, code, offset, func, arg):
        self.events.append(("call", func.__name__, arg))

class ReturnRecorder:

    event_type = E.PY_RETURN

    def __init__(self, events):
        self.events = events

    def __call__(self, code, offset, val):
        self.events.append(("return", code.co_name, val))

class ExceptionMonitoringTest(CheckEvents):

    exception_recorders = (
        ExceptionRecorder,
        ReraiseRecorder,
        ExceptionHandledRecorder,
        UnwindRecorder
    )

    def test_simple_try_except(self):

        def func1():
            try:
                line = 2
                raise KeyError
            except:
                line = 5
            line = 6

        self.check_events(func1, [("raise", KeyError)])

    def test_implicit_stop_iteration(self):
        """Generators are documented as raising a StopIteration
           when they terminate.
           However, we don't do that if we can avoid it, for speed.
           sys.monitoring handles that by injecting a STOP_ITERATION
           event when we would otherwise have skip the RAISE event.
           This test checks that both paths record an equivalent event.
           """

        def gen():
            yield 1
            return 2

        def implicit_stop_iteration(iterator=None):
            if iterator is None:
                iterator = gen()
            for _ in iterator:
                pass

        recorders=(ExceptionRecorder, StopiterationRecorder,)
        expected = [("raise", StopIteration)]

        # Make sure that the loop is unspecialized, and that it will not
        # re-specialize immediately, so that we can we can test the
        # unspecialized version of the loop first.
        # Note: this assumes that we don't specialize loops over sets.
        implicit_stop_iteration(set(range(100)))

        # This will record a RAISE event for the StopIteration.
        self.check_events(implicit_stop_iteration, expected, recorders=recorders)

        # Now specialize, so that we see a STOP_ITERATION event.
        for _ in range(100):
            implicit_stop_iteration()

        # This will record a STOP_ITERATION event for the StopIteration.
        self.check_events(implicit_stop_iteration, expected, recorders=recorders)

    initial = [
        ("raise", ZeroDivisionError),
        ("handled", ZeroDivisionError)
    ]

    reraise = [
        ("reraise", ZeroDivisionError),
        ("handled", ZeroDivisionError)
    ]

    def test_explicit_reraise(self):

        def func():
            try:
                try:
                    1/0
                except:
                    raise
            except:
                pass

        self.check_balanced(
            func,
            recorders = self.exception_recorders)

    def test_explicit_reraise_named(self):

        def func():
            try:
                try:
                    1/0
                except Exception as ex:
                    raise
            except:
                pass

        self.check_balanced(
            func,
            recorders = self.exception_recorders)

    def test_implicit_reraise(self):

        def func():
            try:
                try:
                    1/0
                except ValueError:
                    pass
            except:
                pass

        self.check_balanced(
            func,
            recorders = self.exception_recorders)


    def test_implicit_reraise_named(self):

        def func():
            try:
                try:
                    1/0
                except ValueError as ex:
                    pass
            except:
                pass

        self.check_balanced(
            func,
            recorders = self.exception_recorders)

    def test_try_finally(self):

        def func():
            try:
                try:
                    1/0
                finally:
                    pass
            except:
                pass

        self.check_balanced(
            func,
            recorders = self.exception_recorders)

    def test_async_for(self):

        def func():

            async def async_generator():
                for i in range(1):
                    raise ZeroDivisionError
                    yield i

            async def async_loop():
                try:
                    async for item in async_generator():
                        pass
                except Exception:
                    pass

            try:
                async_loop().send(None)
            except StopIteration:
                pass

        self.check_balanced(
            func,
            recorders = self.exception_recorders)

    def test_throw(self):

        def gen():
            yield 1
            yield 2

        def func():
            try:
                g = gen()
                next(g)
                g.throw(IndexError)
            except IndexError:
                pass

        self.check_balanced(
            func,
            recorders = self.exception_recorders)

        events = self.get_events(
            func,
            TEST_TOOL,
            self.exception_recorders + (ThrowRecorder,)
        )
        self.assertEqual(events[0], ("throw", IndexError))

    @requires_specialization
    def test_no_unwind_for_shim_frame(self):

        class B:
            def __init__(self):
                raise ValueError()

        def f():
            try:
                return B()
            except ValueError:
                pass

        for _ in range(100):
            f()
        recorders = (
            ReturnRecorder,
            UnwindRecorder
        )
        events = self.get_events(f, TEST_TOOL, recorders)
        adaptive_insts = dis.get_instructions(f, adaptive=True)
        self.assertIn(
            "CALL_ALLOC_AND_ENTER_INIT",
            [i.opname for i in adaptive_insts]
        )
        #There should be only one unwind event
        expected = [
            ('unwind', ValueError, '__init__'),
            ('return', 'f', None),
        ]

        self.assertEqual(events, expected)

class LineRecorder:

    event_type = E.LINE


    def __init__(self, events):
        self.events = events

    def __call__(self, code, line):
        self.events.append(("line", code.co_name, line - code.co_firstlineno))

class CEventRecorder:

    def __init__(self, events):
        self.events = events

    def __call__(self, code, offset, func, arg):
        self.events.append((self.event_name, func.__name__, arg))

class CReturnRecorder(CEventRecorder):

    event_type = E.C_RETURN
    event_name = "C return"

class CRaiseRecorder(CEventRecorder):

    event_type = E.C_RAISE
    event_name = "C raise"

MANY_RECORDERS = ExceptionRecorder, CallRecorder, LineRecorder, CReturnRecorder, CRaiseRecorder

class TestManyEvents(CheckEvents):

    def test_simple(self):

        def func1():
            line1 = 1
            line2 = 2
            line3 = 3

        self.check_events(func1, recorders = MANY_RECORDERS, expected = [
            ('line', 'get_events', 10),
            ('call', 'func1', sys.monitoring.MISSING),
            ('line', 'func1', 1),
            ('line', 'func1', 2),
            ('line', 'func1', 3),
            ('line', 'get_events', 11),
            ('call', 'set_events', 2)])

    def test_c_call(self):

        def func2():
            line1 = 1
            [].append(2)
            line3 = 3

        self.check_events(func2, recorders = MANY_RECORDERS, expected = [
            ('line', 'get_events', 10),
            ('call', 'func2', sys.monitoring.MISSING),
            ('line', 'func2', 1),
            ('line', 'func2', 2),
            ('call', 'append', [2]),
            ('C return', 'append', [2]),
            ('line', 'func2', 3),
            ('line', 'get_events', 11),
            ('call', 'set_events', 2)])

    def test_try_except(self):

        def func3():
            try:
                line = 2
                raise KeyError
            except:
                line = 5
            line = 6

        self.check_events(func3, recorders = MANY_RECORDERS, expected = [
            ('line', 'get_events', 10),
            ('call', 'func3', sys.monitoring.MISSING),
            ('line', 'func3', 1),
            ('line', 'func3', 2),
            ('line', 'func3', 3),
            ('raise', KeyError),
            ('line', 'func3', 4),
            ('line', 'func3', 5),
            ('line', 'func3', 6),
            ('line', 'get_events', 11),
            ('call', 'set_events', 2)])

class InstructionRecorder:

    event_type = E.INSTRUCTION

    def __init__(self, events):
        self.events = events

    def __call__(self, code, offset):
        # Filter out instructions in check_events to lower noise
        if code.co_name != "get_events":
            self.events.append(("instruction", code.co_name, offset))


LINE_AND_INSTRUCTION_RECORDERS = InstructionRecorder, LineRecorder

class TestLineAndInstructionEvents(CheckEvents):
    maxDiff = None

    def test_simple(self):

        def func1():
            line1 = 1
            line2 = 2
            line3 = 3

        self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
            ('line', 'get_events', 10),
            ('line', 'func1', 1),
            ('instruction', 'func1', 2),
            ('instruction', 'func1', 4),
            ('line', 'func1', 2),
            ('instruction', 'func1', 6),
            ('instruction', 'func1', 8),
            ('line', 'func1', 3),
            ('instruction', 'func1', 10),
            ('instruction', 'func1', 12),
            ('instruction', 'func1', 14),
            ('line', 'get_events', 11)])

    def test_c_call(self):

        def func2():
            line1 = 1
            [].append(2)
            line3 = 3

        self.check_events(func2, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
            ('line', 'get_events', 10),
            ('line', 'func2', 1),
            ('instruction', 'func2', 2),
            ('instruction', 'func2', 4),
            ('line', 'func2', 2),
            ('instruction', 'func2', 6),
            ('instruction', 'func2', 8),
            ('instruction', 'func2', 28),
            ('instruction', 'func2', 30),
            ('instruction', 'func2', 38),
            ('line', 'func2', 3),
            ('instruction', 'func2', 40),
            ('instruction', 'func2', 42),
            ('instruction', 'func2', 44),
            ('line', 'get_events', 11)])

    def test_try_except(self):

        def func3():
            try:
                line = 2
                raise KeyError
            except:
                line = 5
            line = 6

        self.check_events(func3, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
            ('line', 'get_events', 10),
            ('line', 'func3', 1),
            ('instruction', 'func3', 2),
            ('line', 'func3', 2),
            ('instruction', 'func3', 4),
            ('instruction', 'func3', 6),
            ('line', 'func3', 3),
            ('instruction', 'func3', 8),
            ('instruction', 'func3', 18),
            ('instruction', 'func3', 20),
            ('line', 'func3', 4),
            ('instruction', 'func3', 22),
            ('line', 'func3', 5),
            ('instruction', 'func3', 24),
            ('instruction', 'func3', 26),
            ('instruction', 'func3', 28),
            ('line', 'func3', 6),
            ('instruction', 'func3', 30),
            ('instruction', 'func3', 32),
            ('instruction', 'func3', 34),
            ('line', 'get_events', 11)])

    def test_with_restart(self):
        def func1():
            line1 = 1
            line2 = 2
            line3 = 3

        self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
            ('line', 'get_events', 10),
            ('line', 'func1', 1),
            ('instruction', 'func1', 2),
            ('instruction', 'func1', 4),
            ('line', 'func1', 2),
            ('instruction', 'func1', 6),
            ('instruction', 'func1', 8),
            ('line', 'func1', 3),
            ('instruction', 'func1', 10),
            ('instruction', 'func1', 12),
            ('instruction', 'func1', 14),
            ('line', 'get_events', 11)])

        sys.monitoring.restart_events()

        self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
            ('line', 'get_events', 10),
            ('line', 'func1', 1),
            ('instruction', 'func1', 2),
            ('instruction', 'func1', 4),
            ('line', 'func1', 2),
            ('instruction', 'func1', 6),
            ('instruction', 'func1', 8),
            ('line', 'func1', 3),
            ('instruction', 'func1', 10),
            ('instruction', 'func1', 12),
            ('instruction', 'func1', 14),
            ('line', 'get_events', 11)])

    def test_turn_off_only_instruction(self):
        """
        LINE events should be recorded after INSTRUCTION event is turned off
        """
        events = []
        def line(*args):
            events.append("line")
        sys.monitoring.set_events(TEST_TOOL, 0)
        sys.monitoring.register_callback(TEST_TOOL, E.LINE, line)
        sys.monitoring.register_callback(TEST_TOOL, E.INSTRUCTION, lambda *args: None)
        sys.monitoring.set_events(TEST_TOOL, E.LINE | E.INSTRUCTION)
        sys.monitoring.set_events(TEST_TOOL, E.LINE)
        events = []
        a = 0
        sys.monitoring.set_events(TEST_TOOL, 0)
        self.assertGreater(len(events), 0)

class TestInstallIncrementally(MonitoringTestBase, unittest.TestCase):

    def check_events(self, func, must_include, tool=TEST_TOOL, recorders=(ExceptionRecorder,)):
        try:
            self.assertEqual(sys.monitoring._all_events(), {})
            event_list = []
            all_events = 0
            for recorder in recorders:
                all_events |= recorder.event_type
                sys.monitoring.set_events(tool, all_events)
            for recorder in recorders:
                sys.monitoring.register_callback(tool, recorder.event_type, recorder(event_list))
            func()
            sys.monitoring.set_events(tool, 0)
            for recorder in recorders:
                sys.monitoring.register_callback(tool, recorder.event_type, None)
            for line in must_include:
                self.assertIn(line, event_list)
        finally:
            sys.monitoring.set_events(tool, 0)
            for recorder in recorders:
                sys.monitoring.register_callback(tool, recorder.event_type, None)

    @staticmethod
    def func1():
        line1 = 1

    MUST_INCLUDE_LI = [
            ('instruction', 'func1', 2),
            ('line', 'func1', 2),
            ('instruction', 'func1', 4),
            ('instruction', 'func1', 6)]

    def test_line_then_instruction(self):
        recorders = [ LineRecorder, InstructionRecorder ]
        self.check_events(self.func1,
                          recorders = recorders, must_include = self.MUST_INCLUDE_LI)

    def test_instruction_then_line(self):
        recorders = [ InstructionRecorder, LineRecorder ]
        self.check_events(self.func1,
                          recorders = recorders, must_include = self.MUST_INCLUDE_LI)

    @staticmethod
    def func2():
        len(())

    MUST_INCLUDE_CI = [
            ('instruction', 'func2', 2),
            ('call', 'func2', sys.monitoring.MISSING),
            ('call', 'len', ()),
            ('instruction', 'func2', 12),
            ('instruction', 'func2', 14)]



    def test_call_then_instruction(self):
        recorders = [ CallRecorder, InstructionRecorder ]
        self.check_events(self.func2,
                          recorders = recorders, must_include = self.MUST_INCLUDE_CI)

    def test_instruction_then_call(self):
        recorders = [ InstructionRecorder, CallRecorder ]
        self.check_events(self.func2,
                          recorders = recorders, must_include = self.MUST_INCLUDE_CI)

LOCAL_RECORDERS = CallRecorder, LineRecorder, CReturnRecorder, CRaiseRecorder

class TestLocalEvents(MonitoringTestBase, unittest.TestCase):

    def check_events(self, func, expected, tool=TEST_TOOL, recorders=()):
        try:
            self.assertEqual(sys.monitoring._all_events(), {})
            event_list = []
            all_events = 0
            for recorder in recorders:
                ev = recorder.event_type
                sys.monitoring.register_callback(tool, ev, recorder(event_list))
                all_events |= ev
            sys.monitoring.set_local_events(tool, func.__code__, all_events)
            func()
            sys.monitoring.set_local_events(tool, func.__code__, 0)
            for recorder in recorders:
                sys.monitoring.register_callback(tool, recorder.event_type, None)
            self.assertEqual(event_list, expected)
        finally:
            sys.monitoring.set_local_events(tool, func.__code__, 0)
            for recorder in recorders:
                sys.monitoring.register_callback(tool, recorder.event_type, None)


    def test_simple(self):

        def func1():
            line1 = 1
            line2 = 2
            line3 = 3

        self.check_events(func1, recorders = LOCAL_RECORDERS, expected = [
            ('line', 'func1', 1),
            ('line', 'func1', 2),
            ('line', 'func1', 3)])

    def test_c_call(self):

        def func2():
            line1 = 1
            [].append(2)
            line3 = 3

        self.check_events(func2, recorders = LOCAL_RECORDERS, expected = [
            ('line', 'func2', 1),
            ('line', 'func2', 2),
            ('call', 'append', [2]),
            ('C return', 'append', [2]),
            ('line', 'func2', 3)])

    def test_try_except(self):

        def func3():
            try:
                line = 2
                raise KeyError
            except:
                line = 5
            line = 6

        self.check_events(func3, recorders = LOCAL_RECORDERS, expected = [
            ('line', 'func3', 1),
            ('line', 'func3', 2),
            ('line', 'func3', 3),
            ('line', 'func3', 4),
            ('line', 'func3', 5),
            ('line', 'func3', 6)])

    def test_set_non_local_event(self):
        with self.assertRaises(ValueError):
            sys.monitoring.set_local_events(TEST_TOOL, just_call.__code__, E.RAISE)

def line_from_offset(code, offset):
    for start, end, line in code.co_lines():
        if start <= offset < end:
            if line is None:
                return f"[offset={offset}]"
            return line - code.co_firstlineno
    return -1

class JumpRecorder:

    event_type = E.JUMP
    name = "jump"

    def __init__(self, events):
        self.events = events

    def __call__(self, code, from_, to):
        from_line = line_from_offset(code, from_)
        to_line = line_from_offset(code, to)
        self.events.append((self.name, code.co_name, from_line, to_line))


class BranchRecorder(JumpRecorder):

    event_type = E.BRANCH
    name = "branch"



class JumpOffsetRecorder:

    event_type = E.JUMP
    name = "jump"

    def __init__(self, events, offsets=False):
        self.events = events

    def __call__(self, code, from_, to):
        self.events.append((self.name, code.co_name, from_, to))

class BranchOffsetRecorder(JumpOffsetRecorder):

    event_type = E.BRANCH
    name = "branch"


JUMP_AND_BRANCH_RECORDERS = JumpRecorder, BranchRecorder
JUMP_BRANCH_AND_LINE_RECORDERS = JumpRecorder, BranchRecorder, LineRecorder
FLOW_AND_LINE_RECORDERS = JumpRecorder, BranchRecorder, LineRecorder, ExceptionRecorder, ReturnRecorder
BRANCH_OFFSET_RECORDERS = BranchOffsetRecorder,

class TestBranchAndJumpEvents(CheckEvents):
    maxDiff = None

    def test_loop(self):

        def func():
            x = 1
            for a in range(2):
                if a:
                    x = 4
                else:
                    x = 6
            7

        self.check_events(func, recorders = JUMP_AND_BRANCH_RECORDERS, expected = [
            ('branch', 'func', 2, 2),
            ('branch', 'func', 3, 6),
            ('jump', 'func', 6, 2),
            ('branch', 'func', 2, 2),
            ('branch', 'func', 3, 4),
            ('jump', 'func', 4, 2),
            ('branch', 'func', 2, 7)])

        self.check_events(func, recorders = JUMP_BRANCH_AND_LINE_RECORDERS, expected = [
            ('line', 'get_events', 10),
            ('line', 'func', 1),
            ('line', 'func', 2),
            ('branch', 'func', 2, 2),
            ('line', 'func', 3),
            ('branch', 'func', 3, 6),
            ('line', 'func', 6),
            ('jump', 'func', 6, 2),
            ('line', 'func', 2),
            ('branch', 'func', 2, 2),
            ('line', 'func', 3),
            ('branch', 'func', 3, 4),
            ('line', 'func', 4),
            ('jump', 'func', 4, 2),
            ('line', 'func', 2),
            ('branch', 'func', 2, 7),
            ('line', 'func', 7),
            ('line', 'get_events', 11)])

    def test_except_star(self):

        class Foo:
            def meth(self):
                pass

        def func():
            try:
                try:
                    raise KeyError
                except* Exception as e:
                    f = Foo(); f.meth()
            except KeyError:
                pass


        self.check_events(func, recorders = JUMP_BRANCH_AND_LINE_RECORDERS, expected = [
            ('line', 'get_events', 10),
            ('line', 'func', 1),
            ('line', 'func', 2),
            ('line', 'func', 3),
            ('line', 'func', 4),
            ('branch', 'func', 4, 4),
            ('line', 'func', 5),
            ('line', 'meth', 1),
            ('jump', 'func', 5, '[offset=118]'),
            ('branch', 'func', '[offset=122]', '[offset=126]'),
            ('line', 'get_events', 11)])

        self.check_events(func, recorders = FLOW_AND_LINE_RECORDERS, expected = [
            ('line', 'get_events', 10),
            ('line', 'func', 1),
            ('line', 'func', 2),
            ('line', 'func', 3),
            ('raise', KeyError),
            ('line', 'func', 4),
            ('branch', 'func', 4, 4),
            ('line', 'func', 5),
            ('line', 'meth', 1),
            ('return', 'meth', None),
            ('jump', 'func', 5, '[offset=118]'),
            ('branch', 'func', '[offset=122]', '[offset=126]'),
            ('return', 'func', None),
            ('line', 'get_events', 11)])

    def test_while_offset_consistency(self):

        def foo(n=0):
            while n<4:
                pass
                n += 1
            return None

        in_loop = ('branch', 'foo', 10, 14)
        exit_loop = ('branch', 'foo', 10, 30)
        self.check_events(foo, recorders = BRANCH_OFFSET_RECORDERS, expected = [
            in_loop,
            in_loop,
            in_loop,
            in_loop,
            exit_loop])


class TestLoadSuperAttr(CheckEvents):
    RECORDERS = CallRecorder, LineRecorder, CRaiseRecorder, CReturnRecorder

    def _exec(self, co):
        d = {}
        exec(co, d, d)
        return d

    def _exec_super(self, codestr, optimized=False):
        # The compiler checks for statically visible shadowing of the name
        # `super`, and declines to emit `LOAD_SUPER_ATTR` if shadowing is found.
        # So inserting `super = super` prevents the compiler from emitting
        # `LOAD_SUPER_ATTR`, and allows us to test that monitoring events for
        # `LOAD_SUPER_ATTR` are equivalent to those we'd get from the
        # un-optimized `LOAD_GLOBAL super; CALL; LOAD_ATTR` form.
        assignment = "x = 1" if optimized else "super = super"
        codestr = f"{assignment}\n{textwrap.dedent(codestr)}"
        co = compile(codestr, "<string>", "exec")
        # validate that we really do have a LOAD_SUPER_ATTR, only when optimized
        self.assertEqual(self._has_load_super_attr(co), optimized)
        return self._exec(co)

    def _has_load_super_attr(self, co):
        has = any(instr.opname == "LOAD_SUPER_ATTR" for instr in dis.get_instructions(co))
        if not has:
            has = any(
                isinstance(c, types.CodeType) and self._has_load_super_attr(c)
                for c in co.co_consts
            )
        return has

    def _super_method_call(self, optimized=False):
        codestr = """
            class A:
                def method(self, x):
                    return x

            class B(A):
                def method(self, x):
                    return super(
                    ).method(
                        x
                    )

            b = B()
            def f():
                return b.method(1)
        """
        d = self._exec_super(codestr, optimized)
        expected = [
            ('line', 'get_events', 10),
            ('call', 'f', sys.monitoring.MISSING),
            ('line', 'f', 1),
            ('call', 'method', d["b"]),
            ('line', 'method', 1),
            ('call', 'super', sys.monitoring.MISSING),
            ('C return', 'super', sys.monitoring.MISSING),
            ('line', 'method', 2),
            ('line', 'method', 3),
            ('line', 'method', 2),
            ('call', 'method', d["b"]),
            ('line', 'method', 1),
            ('line', 'method', 1),
            ('line', 'get_events', 11),
            ('call', 'set_events', 2),
        ]
        return d["f"], expected

    def test_method_call(self):
        nonopt_func, nonopt_expected = self._super_method_call(optimized=False)
        opt_func, opt_expected = self._super_method_call(optimized=True)

        self.check_events(nonopt_func, recorders=self.RECORDERS, expected=nonopt_expected)
        self.check_events(opt_func, recorders=self.RECORDERS, expected=opt_expected)

    def _super_method_call_error(self, optimized=False):
        codestr = """
            class A:
                def method(self, x):
                    return x

            class B(A):
                def method(self, x):
                    return super(
                        x,
                        self,
                    ).method(
                        x
                    )

            b = B()
            def f():
                try:
                    return b.method(1)
                except TypeError:
                    pass
                else:
                    assert False, "should have raised TypeError"
        """
        d = self._exec_super(codestr, optimized)
        expected = [
            ('line', 'get_events', 10),
            ('call', 'f', sys.monitoring.MISSING),
            ('line', 'f', 1),
            ('line', 'f', 2),
            ('call', 'method', d["b"]),
            ('line', 'method', 1),
            ('line', 'method', 2),
            ('line', 'method', 3),
            ('line', 'method', 1),
            ('call', 'super', 1),
            ('C raise', 'super', 1),
            ('line', 'f', 3),
            ('line', 'f', 4),
            ('line', 'get_events', 11),
            ('call', 'set_events', 2),
        ]
        return d["f"], expected

    def test_method_call_error(self):
        nonopt_func, nonopt_expected = self._super_method_call_error(optimized=False)
        opt_func, opt_expected = self._super_method_call_error(optimized=True)

        self.check_events(nonopt_func, recorders=self.RECORDERS, expected=nonopt_expected)
        self.check_events(opt_func, recorders=self.RECORDERS, expected=opt_expected)

    def _super_attr(self, optimized=False):
        codestr = """
            class A:
                x = 1

            class B(A):
                def method(self):
                    return super(
                    ).x

            b = B()
            def f():
                return b.method()
        """
        d = self._exec_super(codestr, optimized)
        expected = [
            ('line', 'get_events', 10),
            ('call', 'f', sys.monitoring.MISSING),
            ('line', 'f', 1),
            ('call', 'method', d["b"]),
            ('line', 'method', 1),
            ('call', 'super', sys.monitoring.MISSING),
            ('C return', 'super', sys.monitoring.MISSING),
            ('line', 'method', 2),
            ('line', 'method', 1),
            ('line', 'get_events', 11),
            ('call', 'set_events', 2)
        ]
        return d["f"], expected

    def test_attr(self):
        nonopt_func, nonopt_expected = self._super_attr(optimized=False)
        opt_func, opt_expected = self._super_attr(optimized=True)

        self.check_events(nonopt_func, recorders=self.RECORDERS, expected=nonopt_expected)
        self.check_events(opt_func, recorders=self.RECORDERS, expected=opt_expected)

    def test_vs_other_type_call(self):
        code_template = textwrap.dedent("""
            class C:
                def method(self):
                    return {cls}().__repr__{call}
            c = C()
            def f():
                return c.method()
        """)

        def get_expected(name, call_method, ns):
            repr_arg = 0 if name == "int" else sys.monitoring.MISSING
            return [
                ('line', 'get_events', 10),
                ('call', 'f', sys.monitoring.MISSING),
                ('line', 'f', 1),
                ('call', 'method', ns["c"]),
                ('line', 'method', 1),
                ('call', name, sys.monitoring.MISSING),
                ('C return', name, sys.monitoring.MISSING),
                *(
                    [
                        ('call', '__repr__', repr_arg),
                        ('C return', '__repr__', repr_arg),
                    ] if call_method else []
                ),
                ('line', 'get_events', 11),
                ('call', 'set_events', 2),
            ]

        for call_method in [True, False]:
            with self.subTest(call_method=call_method):
                call_str = "()" if call_method else ""
                code_super = code_template.format(cls="super", call=call_str)
                code_int = code_template.format(cls="int", call=call_str)
                co_super = compile(code_super, '<string>', 'exec')
                self.assertTrue(self._has_load_super_attr(co_super))
                ns_super = self._exec(co_super)
                ns_int = self._exec(code_int)

                self.check_events(
                    ns_super["f"],
                    recorders=self.RECORDERS,
                    expected=get_expected("super", call_method, ns_super)
                )
                self.check_events(
                    ns_int["f"],
                    recorders=self.RECORDERS,
                    expected=get_expected("int", call_method, ns_int)
                )


class TestSetGetEvents(MonitoringTestBase, unittest.TestCase):

    def test_global(self):
        sys.monitoring.set_events(TEST_TOOL, E.PY_START)
        self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
        sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
        self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
        sys.monitoring.set_events(TEST_TOOL, 0)
        self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 0)
        sys.monitoring.set_events(TEST_TOOL2,0)
        self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), 0)

    def test_local(self):
        code = f1.__code__
        sys.monitoring.set_local_events(TEST_TOOL, code, E.PY_START)
        self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), E.PY_START)
        sys.monitoring.set_local_events(TEST_TOOL2, code, E.PY_START)
        self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL2, code), E.PY_START)
        sys.monitoring.set_local_events(TEST_TOOL, code, 0)
        self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), 0)
        sys.monitoring.set_local_events(TEST_TOOL2, code, 0)
        self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL2, code), 0)

class TestUninitialized(unittest.TestCase, MonitoringTestBase):

    @staticmethod
    def f():
        pass

    def test_get_local_events_uninitialized(self):
        self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, self.f.__code__), 0)

class TestRegressions(MonitoringTestBase, unittest.TestCase):

    def test_105162(self):
        caught = None

        def inner():
            nonlocal caught
            try:
                yield
            except Exception:
                caught = "inner"
                yield

        def outer():
            nonlocal caught
            try:
                yield from inner()
            except Exception:
                caught = "outer"
                yield

        def run():
            gen = outer()
            gen.send(None)
            gen.throw(Exception)
        run()
        self.assertEqual(caught, "inner")
        caught = None
        try:
            sys.monitoring.set_events(TEST_TOOL, E.PY_RESUME)
            run()
            self.assertEqual(caught, "inner")
        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)

    def test_108390(self):

        class Foo:
            def __init__(self, set_event):
                if set_event:
                    sys.monitoring.set_events(TEST_TOOL, E.PY_RESUME)

        def make_foo_optimized_then_set_event():
            for i in range(100):
                Foo(i == 99)

        try:
            make_foo_optimized_then_set_event()
        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)

    def test_gh108976(self):
        sys.monitoring.use_tool_id(0, "test")
        self.addCleanup(sys.monitoring.free_tool_id, 0)
        sys.monitoring.set_events(0, 0)
        sys.monitoring.register_callback(0, E.LINE, lambda *args: sys.monitoring.set_events(0, 0))
        sys.monitoring.register_callback(0, E.INSTRUCTION, lambda *args: 0)
        sys.monitoring.set_events(0, E.LINE | E.INSTRUCTION)
        sys.monitoring.set_events(0, 0)

    def test_call_function_ex(self):
        def f(a=1, b=2):
            return a + b
        args = (1, 2)
        empty_args = []

        call_data = []
        sys.monitoring.use_tool_id(0, "test")
        self.addCleanup(sys.monitoring.free_tool_id, 0)
        sys.monitoring.set_events(0, 0)
        sys.monitoring.register_callback(0, E.CALL, lambda code, offset, callable, arg0: call_data.append((callable, arg0)))
        sys.monitoring.set_events(0, E.CALL)
        f(*args)
        f(*empty_args)
        sys.monitoring.set_events(0, 0)
        self.assertEqual(call_data[0], (f, 1))
        self.assertEqual(call_data[1], (f, sys.monitoring.MISSING))

    def test_instruction_explicit_callback(self):
        # gh-122247
        # Calling the instruction event callback explicitly should not
        # crash CPython
        def callback(code, instruction_offset):
            pass

        sys.monitoring.use_tool_id(0, "test")
        self.addCleanup(sys.monitoring.free_tool_id, 0)
        sys.monitoring.register_callback(0, sys.monitoring.events.INSTRUCTION, callback)
        sys.monitoring.set_events(0, sys.monitoring.events.INSTRUCTION)
        callback(None, 0)  # call the *same* handler while it is registered
        sys.monitoring.restart_events()
        sys.monitoring.set_events(0, 0)


class TestOptimizer(MonitoringTestBase, unittest.TestCase):

    def setUp(self):
        _testinternalcapi = import_module("_testinternalcapi")
        if hasattr(_testinternalcapi, "get_optimizer"):
            self.old_opt = _testinternalcapi.get_optimizer()
            opt = _testinternalcapi.new_counter_optimizer()
            _testinternalcapi.set_optimizer(opt)
        super(TestOptimizer, self).setUp()

    def tearDown(self):
        super(TestOptimizer, self).tearDown()
        import _testinternalcapi
        if hasattr(_testinternalcapi, "get_optimizer"):
            _testinternalcapi.set_optimizer(self.old_opt)

    def test_for_loop(self):
        def test_func(x):
            i = 0
            while i < x:
                i += 1

        code = test_func.__code__
        sys.monitoring.set_local_events(TEST_TOOL, code, E.PY_START)
        self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), E.PY_START)
        test_func(1000)
        sys.monitoring.set_local_events(TEST_TOOL, code, 0)
        self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), 0)

class TestTier2Optimizer(CheckEvents):

    def test_monitoring_already_opimized_loop(self):
        def test_func(recorder):
            set_events = sys.monitoring.set_events
            line = E.LINE
            i = 0
            for i in range(551):
                # Turn on events without branching once i reaches 500.
                set_events(TEST_TOOL, line * int(i >= 500))
                pass
                pass
                pass

        self.assertEqual(sys.monitoring._all_events(), {})
        events = []
        recorder = LineRecorder(events)
        sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
        try:
            test_func(recorder)
        finally:
            sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
            sys.monitoring.set_events(TEST_TOOL, 0)
        self.assertGreater(len(events), 250)

class TestMonitoringAtShutdown(unittest.TestCase):

    def test_monitoring_live_at_shutdown(self):
        # gh-115832: An object destructor running during the final GC of
        # interpreter shutdown triggered an infinite loop in the
        # instrumentation code.
        script = test.support.findfile("_test_monitoring_shutdown.py")
        script_helper.run_test_script(script)


class TestCApiEventGeneration(MonitoringTestBase, unittest.TestCase):

    class Scope:
        def __init__(self, *args):
            self.args = args

        def __enter__(self):
            _testcapi.monitoring_enter_scope(*self.args)

        def __exit__(self, *args):
            _testcapi.monitoring_exit_scope()

    def setUp(self):
        super(TestCApiEventGeneration, self).setUp()

        capi = _testcapi

        self.codelike = capi.CodeLike(2)

        self.cases = [
            # (Event, function, *args)
            ( 1, E.PY_START, capi.fire_event_py_start),
            ( 1, E.PY_RESUME, capi.fire_event_py_resume),
            ( 1, E.PY_YIELD, capi.fire_event_py_yield, 10),
            ( 1, E.PY_RETURN, capi.fire_event_py_return, 20),
            ( 2, E.CALL, capi.fire_event_call, callable, 40),
            ( 1, E.JUMP, capi.fire_event_jump, 60),
            ( 1, E.BRANCH, capi.fire_event_branch, 70),
            ( 1, E.PY_THROW, capi.fire_event_py_throw, ValueError(1)),
            ( 1, E.RAISE, capi.fire_event_raise, ValueError(2)),
            ( 1, E.EXCEPTION_HANDLED, capi.fire_event_exception_handled, ValueError(5)),
            ( 1, E.PY_UNWIND, capi.fire_event_py_unwind, ValueError(6)),
            ( 1, E.STOP_ITERATION, capi.fire_event_stop_iteration, 7),
            ( 1, E.STOP_ITERATION, capi.fire_event_stop_iteration, StopIteration(8)),
        ]

        self.EXPECT_RAISED_EXCEPTION = [E.PY_THROW, E.RAISE, E.EXCEPTION_HANDLED, E.PY_UNWIND]


    def check_event_count(self, event, func, args, expected, callback_raises=None):
        class Counter:
            def __init__(self, callback_raises):
                self.callback_raises = callback_raises
                self.count = 0

            def __call__(self, *args):
                self.count += 1
                if self.callback_raises:
                    exc = self.callback_raises
                    self.callback_raises = None
                    raise exc

        try:
            counter = Counter(callback_raises)
            sys.monitoring.register_callback(TEST_TOOL, event, counter)
            if event == E.C_RETURN or event == E.C_RAISE:
                sys.monitoring.set_events(TEST_TOOL, E.CALL)
            else:
                sys.monitoring.set_events(TEST_TOOL, event)
            event_value = int(math.log2(event))
            with self.Scope(self.codelike, event_value):
                counter.count = 0
                try:
                    func(*args)
                except ValueError as e:
                    self.assertIsInstance(expected, ValueError)
                    self.assertEqual(str(e), str(expected))
                    return
                else:
                    self.assertEqual(counter.count, expected)

            prev = sys.monitoring.register_callback(TEST_TOOL, event, None)
            with self.Scope(self.codelike, event_value):
                counter.count = 0
                func(*args)
                self.assertEqual(counter.count, 0)
                self.assertEqual(prev, counter)
        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)

    def test_fire_event(self):
        for expected, event, function, *args in self.cases:
            offset = 0
            self.codelike = _testcapi.CodeLike(1)
            with self.subTest(function.__name__):
                args_ = (self.codelike, offset) + tuple(args)
                self.check_event_count(event, function, args_, expected)

    def test_missing_exception(self):
        for _, event, function, *args in self.cases:
            if event not in self.EXPECT_RAISED_EXCEPTION:
                continue
            assert args and isinstance(args[-1], BaseException)
            offset = 0
            self.codelike = _testcapi.CodeLike(1)
            with self.subTest(function.__name__):
                args_ = (self.codelike, offset) + tuple(args[:-1]) + (None,)
                evt = int(math.log2(event))
                expected = ValueError(f"Firing event {evt} with no exception set")
                self.check_event_count(event, function, args_, expected)

    def test_fire_event_failing_callback(self):
        for expected, event, function, *args in self.cases:
            offset = 0
            self.codelike = _testcapi.CodeLike(1)
            with self.subTest(function.__name__):
                args_ = (self.codelike, offset) + tuple(args)
                exc = OSError(42)
                with self.assertRaises(type(exc)):
                    self.check_event_count(event, function, args_, expected,
                                           callback_raises=exc)


    CANNOT_DISABLE = { E.PY_THROW, E.RAISE, E.RERAISE,
                       E.EXCEPTION_HANDLED, E.PY_UNWIND }

    def check_disable(self, event, func, args, expected):
        try:
            counter = CounterWithDisable()
            sys.monitoring.register_callback(TEST_TOOL, event, counter)
            if event == E.C_RETURN or event == E.C_RAISE:
                sys.monitoring.set_events(TEST_TOOL, E.CALL)
            else:
                sys.monitoring.set_events(TEST_TOOL, event)
            event_value = int(math.log2(event))
            with self.Scope(self.codelike, event_value):
                counter.count = 0
                func(*args)
                self.assertEqual(counter.count, expected)
                counter.disable = True
                if event in self.CANNOT_DISABLE:
                    # use try-except rather then assertRaises to avoid
                    # events from framework code
                    try:
                        counter.count = 0
                        func(*args)
                        self.assertEqual(counter.count, expected)
                    except ValueError:
                        pass
                    else:
                        self.Error("Expected a ValueError")
                else:
                    counter.count = 0
                    func(*args)
                    self.assertEqual(counter.count, expected)
                    counter.count = 0
                    func(*args)
                    self.assertEqual(counter.count, expected - 1)
        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)

    def test_disable_event(self):
        for expected, event, function, *args in self.cases:
            offset = 0
            self.codelike = _testcapi.CodeLike(2)
            with self.subTest(function.__name__):
                args_ = (self.codelike, 0) + tuple(args)
                self.check_disable(event, function, args_, expected)

    def test_enter_scope_two_events(self):
        try:
            yield_counter = CounterWithDisable()
            unwind_counter = CounterWithDisable()
            sys.monitoring.register_callback(TEST_TOOL, E.PY_YIELD, yield_counter)
            sys.monitoring.register_callback(TEST_TOOL, E.PY_UNWIND, unwind_counter)
            sys.monitoring.set_events(TEST_TOOL, E.PY_YIELD | E.PY_UNWIND)

            yield_value = int(math.log2(E.PY_YIELD))
            unwind_value = int(math.log2(E.PY_UNWIND))
            cl = _testcapi.CodeLike(2)
            common_args = (cl, 0)
            with self.Scope(cl, yield_value, unwind_value):
                yield_counter.count = 0
                unwind_counter.count = 0

                _testcapi.fire_event_py_unwind(*common_args, ValueError(42))
                assert(yield_counter.count == 0)
                assert(unwind_counter.count == 1)

                _testcapi.fire_event_py_yield(*common_args, ValueError(42))
                assert(yield_counter.count == 1)
                assert(unwind_counter.count == 1)

                yield_counter.disable = True
                _testcapi.fire_event_py_yield(*common_args, ValueError(42))
                assert(yield_counter.count == 2)
                assert(unwind_counter.count == 1)

                _testcapi.fire_event_py_yield(*common_args, ValueError(42))
                assert(yield_counter.count == 2)
                assert(unwind_counter.count == 1)

        finally:
            sys.monitoring.set_events(TEST_TOOL, 0)