cpython/Lib/test/test_sys_settrace.py

# Testing the line trace facility.

from test import support
import unittest
import sys
import difflib
import gc
from functools import wraps
import asyncio
from test.support import import_helper, requires_subprocess
import contextlib
import os
import tempfile
import textwrap
import subprocess
import warnings
try:
    import _testinternalcapi
except ImportError:
    _testinternalcapi = None

support.requires_working_socket(module=True)

class tracecontext:
    """Context manager that traces its enter and exit."""
    def __init__(self, output, value):
        self.output = output
        self.value = value

    def __enter__(self):
        self.output.append(self.value)

    def __exit__(self, *exc_info):
        self.output.append(-self.value)

class asynctracecontext:
    """Asynchronous context manager that traces its aenter and aexit."""
    def __init__(self, output, value):
        self.output = output
        self.value = value

    async def __aenter__(self):
        self.output.append(self.value)

    async def __aexit__(self, *exc_info):
        self.output.append(-self.value)

async def asynciter(iterable):
    """Convert an iterable to an asynchronous iterator."""
    for x in iterable:
        yield x

def clean_asynciter(test):
    @wraps(test)
    async def wrapper(*args, **kwargs):
        cleanups = []
        def wrapped_asynciter(iterable):
            it = asynciter(iterable)
            cleanups.append(it.aclose)
            return it
        try:
            return await test(*args, **kwargs, asynciter=wrapped_asynciter)
        finally:
            while cleanups:
                await cleanups.pop()()
    return wrapper

# A very basic example.  If this fails, we're in deep trouble.
def basic():
    return 1

basic.events = [(0, 'call'),
                (1, 'line'),
                (1, 'return')]

# Many of the tests below are tricky because they involve pass statements.
# If there is implicit control flow around a pass statement (in an except
# clause or else clause) under what conditions do you set a line number
# following that clause?


# Some constructs like "while 0:", "if 0:" or "if 1:...else:..." could be optimized
# away.  Make sure that those lines aren't skipped.
def arigo_example0():
    x = 1
    del x
    while 0:
        pass
    x = 1

arigo_example0.events = [(0, 'call'),
                        (1, 'line'),
                        (2, 'line'),
                        (3, 'line'),
                        (5, 'line'),
                        (5, 'return')]

def arigo_example1():
    x = 1
    del x
    if 0:
        pass
    x = 1

arigo_example1.events = [(0, 'call'),
                        (1, 'line'),
                        (2, 'line'),
                        (3, 'line'),
                        (5, 'line'),
                        (5, 'return')]

def arigo_example2():
    x = 1
    del x
    if 1:
        x = 1
    else:
        pass
    return None

arigo_example2.events = [(0, 'call'),
                        (1, 'line'),
                        (2, 'line'),
                        (3, 'line'),
                        (4, 'line'),
                        (7, 'line'),
                        (7, 'return')]


# check that lines consisting of just one instruction get traced:
def one_instr_line():
    x = 1
    del x
    x = 1

one_instr_line.events = [(0, 'call'),
                         (1, 'line'),
                         (2, 'line'),
                         (3, 'line'),
                         (3, 'return')]

def no_pop_tops():      # 0
    x = 1               # 1
    for a in range(2):  # 2
        if a:           # 3
            x = 1       # 4
        else:           # 5
            x = 1       # 6

no_pop_tops.events = [(0, 'call'),
                      (1, 'line'),
                      (2, 'line'),
                      (3, 'line'),
                      (6, 'line'),
                      (2, 'line'),
                      (3, 'line'),
                      (4, 'line'),
                      (2, 'line'),
                      (2, 'return')]

def no_pop_blocks():
    y = 1
    while not y:
        bla
    x = 1

no_pop_blocks.events = [(0, 'call'),
                        (1, 'line'),
                        (2, 'line'),
                        (4, 'line'),
                        (4, 'return')]

def called(): # line -3
    x = 1

def call():   # line 0
    called()

call.events = [(0, 'call'),
               (1, 'line'),
               (-3, 'call'),
               (-2, 'line'),
               (-2, 'return'),
               (1, 'return')]

def raises():
    raise Exception

def test_raise():
    try:
        raises()
    except Exception:
        pass

test_raise.events = [(0, 'call'),
                     (1, 'line'),
                     (2, 'line'),
                     (-3, 'call'),
                     (-2, 'line'),
                     (-2, 'exception'),
                     (-2, 'return'),
                     (2, 'exception'),
                     (3, 'line'),
                     (4, 'line'),
                     (4, 'return')]

def _settrace_and_return(tracefunc):
    sys.settrace(tracefunc)
    sys._getframe().f_back.f_trace = tracefunc
def settrace_and_return(tracefunc):
    _settrace_and_return(tracefunc)

settrace_and_return.events = [(1, 'return')]

def _settrace_and_raise(tracefunc):
    sys.settrace(tracefunc)
    sys._getframe().f_back.f_trace = tracefunc
    raise RuntimeError
def settrace_and_raise(tracefunc):
    try:
        _settrace_and_raise(tracefunc)
    except RuntimeError:
        pass

settrace_and_raise.events = [(2, 'exception'),
                             (3, 'line'),
                             (4, 'line'),
                             (4, 'return')]

# implicit return example
# This test is interesting because of the else: pass
# part of the code.  The code generate for the true
# part of the if contains a jump past the else branch.
# The compiler then generates an implicit "return None"
# Internally, the compiler visits the pass statement
# and stores its line number for use on the next instruction.
# The next instruction is the implicit return None.
def ireturn_example():
    a = 5
    b = 5
    if a == b:
        b = a+1
    else:
        pass

ireturn_example.events = [(0, 'call'),
                          (1, 'line'),
                          (2, 'line'),
                          (3, 'line'),
                          (4, 'line'),
                          (4, 'return')]

# Tight loop with while(1) example (SF #765624)
def tightloop_example():
    items = range(0, 3)
    try:
        i = 0
        while 1:
            b = items[i]; i+=1
    except IndexError:
        pass

tightloop_example.events = [(0, 'call'),
                            (1, 'line'),
                            (2, 'line'),
                            (3, 'line'),
                            (4, 'line'),
                            (5, 'line'),
                            (4, 'line'),
                            (5, 'line'),
                            (4, 'line'),
                            (5, 'line'),
                            (4, 'line'),
                            (5, 'line'),
                            (5, 'exception'),
                            (6, 'line'),
                            (7, 'line'),
                            (7, 'return')]

def tighterloop_example():
    items = range(1, 4)
    try:
        i = 0
        while 1: i = items[i]
    except IndexError:
        pass

tighterloop_example.events = [(0, 'call'),
                            (1, 'line'),
                            (2, 'line'),
                            (3, 'line'),
                            (4, 'line'),
                            (4, 'line'),
                            (4, 'line'),
                            (4, 'line'),
                            (4, 'exception'),
                            (5, 'line'),
                            (6, 'line'),
                            (6, 'return')]

def generator_function():
    try:
        yield True
        "continued"
    finally:
        "finally"
def generator_example():
    # any() will leave the generator before its end
    x = any(generator_function())

    # the following lines were not traced
    for x in range(10):
        y = x

generator_example.events = ([(0, 'call'),
                             (2, 'line'),
                             (-6, 'call'),
                             (-5, 'line'),
                             (-4, 'line'),
                             (-4, 'return'),
                             (-4, 'call'),
                             (-4, 'exception'),
                             (-1, 'line'),
                             (-1, 'return')] +
                            [(5, 'line'), (6, 'line')] * 10 +
                            [(5, 'line'), (5, 'return')])


def lineno_matches_lasti(frame):
    last_line = None
    for start, end, line in frame.f_code.co_lines():
        if start <= frame.f_lasti < end:
            last_line = line
    return last_line == frame.f_lineno

class Tracer:
    def __init__(self, trace_line_events=None, trace_opcode_events=None):
        self.trace_line_events = trace_line_events
        self.trace_opcode_events = trace_opcode_events
        self.events = []

    def _reconfigure_frame(self, frame):
        if self.trace_line_events is not None:
            frame.f_trace_lines = self.trace_line_events
        if self.trace_opcode_events is not None:
            frame.f_trace_opcodes = self.trace_opcode_events

    def trace(self, frame, event, arg):
        assert lineno_matches_lasti(frame)
        self._reconfigure_frame(frame)
        self.events.append((frame.f_lineno, event))
        return self.trace

    def traceWithGenexp(self, frame, event, arg):
        self._reconfigure_frame(frame)
        (o for o in [1])
        self.events.append((frame.f_lineno, event))
        return self.trace


class TraceTestCase(unittest.TestCase):

    # Disable gc collection when tracing, otherwise the
    # deallocators may be traced as well.
    def setUp(self):
        self.using_gc = gc.isenabled()
        gc.disable()
        self.addCleanup(sys.settrace, sys.gettrace())

    def tearDown(self):
        if self.using_gc:
            gc.enable()

    @staticmethod
    def make_tracer():
        """Helper to allow test subclasses to configure tracers differently"""
        return Tracer()

    def compare_events(self, line_offset, events, expected_events):
        events = [(l - line_offset if l is not None else None, e) for (l, e) in events]
        if events != expected_events:
            self.fail(
                "events did not match expectation:\n" +
                "\n".join(difflib.ndiff([str(x) for x in expected_events],
                                        [str(x) for x in events])))

    def run_and_compare(self, func, events):
        tracer = self.make_tracer()
        sys.settrace(tracer.trace)
        func()
        sys.settrace(None)
        self.compare_events(func.__code__.co_firstlineno,
                            tracer.events, events)

    def run_test(self, func):
        self.run_and_compare(func, func.events)

    def run_test2(self, func):
        tracer = self.make_tracer()
        func(tracer.trace)
        sys.settrace(None)
        self.compare_events(func.__code__.co_firstlineno,
                            tracer.events, func.events)

    def test_set_and_retrieve_none(self):
        sys.settrace(None)
        assert sys.gettrace() is None

    def test_set_and_retrieve_func(self):
        def fn(*args):
            pass

        sys.settrace(fn)
        try:
            assert sys.gettrace() is fn
        finally:
            sys.settrace(None)

    def test_01_basic(self):
        self.run_test(basic)
    def test_02_arigo0(self):
        self.run_test(arigo_example0)
    def test_02_arigo1(self):
        self.run_test(arigo_example1)
    def test_02_arigo2(self):
        self.run_test(arigo_example2)
    def test_03_one_instr(self):
        self.run_test(one_instr_line)
    def test_04_no_pop_blocks(self):
        self.run_test(no_pop_blocks)
    def test_05_no_pop_tops(self):
        self.run_test(no_pop_tops)
    def test_06_call(self):
        self.run_test(call)
    def test_07_raise(self):
        self.run_test(test_raise)

    def test_08_settrace_and_return(self):
        self.run_test2(settrace_and_return)
    def test_09_settrace_and_raise(self):
        self.run_test2(settrace_and_raise)
    def test_10_ireturn(self):
        self.run_test(ireturn_example)
    def test_11_tightloop(self):
        self.run_test(tightloop_example)
    def test_12_tighterloop(self):
        self.run_test(tighterloop_example)

    def test_13_genexp(self):
        self.run_test(generator_example)
        # issue1265: if the trace function contains a generator,
        # and if the traced function contains another generator
        # that is not completely exhausted, the trace stopped.
        # Worse: the 'finally' clause was not invoked.
        tracer = self.make_tracer()
        sys.settrace(tracer.traceWithGenexp)
        generator_example()
        sys.settrace(None)
        self.compare_events(generator_example.__code__.co_firstlineno,
                            tracer.events, generator_example.events)

    def test_14_onliner_if(self):
        def onliners():
            if True: x=False
            else: x=True
            return 0
        self.run_and_compare(
            onliners,
            [(0, 'call'),
             (1, 'line'),
             (3, 'line'),
             (3, 'return')])

    def test_15_loops(self):
        # issue1750076: "while" expression is skipped by debugger
        def for_example():
            for x in range(2):
                pass
        self.run_and_compare(
            for_example,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (1, 'line'),
             (2, 'line'),
             (1, 'line'),
             (1, 'return')])

        def while_example():
            # While expression should be traced on every loop
            x = 2
            while x > 0:
                x -= 1
        self.run_and_compare(
            while_example,
            [(0, 'call'),
             (2, 'line'),
             (3, 'line'),
             (4, 'line'),
             (3, 'line'),
             (4, 'line'),
             (3, 'line'),
             (3, 'return')])

    def test_16_blank_lines(self):
        namespace = {}
        exec("def f():\n" + "\n" * 256 + "    pass", namespace)
        self.run_and_compare(
            namespace["f"],
            [(0, 'call'),
             (257, 'line'),
             (257, 'return')])

    def test_17_none_f_trace(self):
        # Issue 20041: fix TypeError when f_trace is set to None.
        def func():
            sys._getframe().f_trace = None
            lineno = 2
        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line')])

    def test_18_except_with_name(self):
        def func():
            try:
                try:
                    raise Exception
                except Exception as e:
                    raise
                    x = "Something"
                    y = "Something"
            except Exception:
                pass

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'exception'),
             (4, 'line'),
             (5, 'line'),
             (8, 'line'),
             (9, 'line'),
             (9, 'return')])

    def test_19_except_with_finally(self):
        def func():
            try:
                try:
                    raise Exception
                finally:
                    y = "Something"
            except Exception:
                b = 23

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'exception'),
             (5, 'line'),
             (6, 'line'),
             (7, 'line'),
             (7, 'return')])

    def test_20_async_for_loop(self):
        class AsyncIteratorWrapper:
            def __init__(self, obj):
                self._it = iter(obj)

            def __aiter__(self):
                return self

            async def __anext__(self):
                try:
                    return next(self._it)
                except StopIteration:
                    raise StopAsyncIteration

        async def doit_async():
            async for letter in AsyncIteratorWrapper("abc"):
                x = letter
            y = 42

        def run(tracer):
            x = doit_async()
            try:
                sys.settrace(tracer)
                x.send(None)
            finally:
                sys.settrace(None)

        tracer = self.make_tracer()
        events = [
                (0, 'call'),
                (1, 'line'),
                (-12, 'call'),
                (-11, 'line'),
                (-11, 'return'),
                (-9, 'call'),
                (-8, 'line'),
                (-8, 'return'),
                (-6, 'call'),
                (-5, 'line'),
                (-4, 'line'),
                (-4, 'return'),
                (1, 'exception'),
                (2, 'line'),
                (1, 'line'),
                (-6, 'call'),
                (-5, 'line'),
                (-4, 'line'),
                (-4, 'return'),
                (1, 'exception'),
                (2, 'line'),
                (1, 'line'),
                (-6, 'call'),
                (-5, 'line'),
                (-4, 'line'),
                (-4, 'return'),
                (1, 'exception'),
                (2, 'line'),
                (1, 'line'),
                (-6, 'call'),
                (-5, 'line'),
                (-4, 'line'),
                (-4, 'exception'),
                (-3, 'line'),
                (-2, 'line'),
                (-2, 'exception'),
                (-2, 'return'),
                (1, 'exception'),
                (3, 'line'),
                (3, 'return')]
        try:
            run(tracer.trace)
        except Exception:
            pass
        self.compare_events(doit_async.__code__.co_firstlineno,
                            tracer.events, events)

    def test_async_for_backwards_jump_has_no_line(self):
        async def arange(n):
            for i in range(n):
                yield i
        async def f():
            async for i in arange(3):
                if i > 100:
                    break # should never be traced

        tracer = self.make_tracer()
        coro = f()
        try:
            sys.settrace(tracer.trace)
            coro.send(None)
        except Exception:
            pass
        finally:
            sys.settrace(None)

        events = [
            (0, 'call'),
            (1, 'line'),
            (-3, 'call'),
            (-2, 'line'),
            (-1, 'line'),
            (-1, 'return'),
            (1, 'exception'),
            (2, 'line'),
            (1, 'line'),
            (-1, 'call'),
            (-2, 'line'),
            (-1, 'line'),
            (-1, 'return'),
            (1, 'exception'),
            (2, 'line'),
            (1, 'line'),
            (-1, 'call'),
            (-2, 'line'),
            (-1, 'line'),
            (-1, 'return'),
            (1, 'exception'),
            (2, 'line'),
            (1, 'line'),
            (-1, 'call'),
            (-2, 'line'),
            (-2, 'return'),
            (1, 'exception'),
            (1, 'return'),
        ]
        self.compare_events(f.__code__.co_firstlineno,
                            tracer.events, events)

    def test_21_repeated_pass(self):
        def func():
            pass
            pass

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (2, 'return')])

    def test_loop_in_try_except(self):
        # https://bugs.python.org/issue41670

        def func():
            try:
                for i in []: pass
                return 1
            except:
                return 2

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'return')])

    def test_try_except_no_exception(self):

        def func():
            try:
                2
            except:
                4
            else:
                6
                if False:
                    8
                else:
                    10
                if func.__name__ == 'Fred':
                    12
            finally:
                14

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (6, 'line'),
             (7, 'line'),
             (10, 'line'),
             (11, 'line'),
             (14, 'line'),
             (14, 'return')])

    def test_try_exception_in_else(self):

        def func():
            try:
                try:
                    3
                except:
                    5
                else:
                    7
                    raise Exception
                finally:
                    10
            except:
                12
            finally:
                14

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (7, 'line'),
             (8, 'line'),
             (8, 'exception'),
             (10, 'line'),
             (11, 'line'),
             (12, 'line'),
             (14, 'line'),
             (14, 'return')])

    def test_nested_loops(self):

        def func():
            for i in range(2):
                for j in range(2):
                    a = i + j
            return a == 1

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (2, 'line'),
             (3, 'line'),
             (2, 'line'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (2, 'line'),
             (3, 'line'),
             (2, 'line'),
             (1, 'line'),
             (4, 'line'),
             (4, 'return')])

    def test_if_break(self):

        def func():
            seq = [1, 0]
            while seq:
                n = seq.pop()
                if n:
                    break   # line 5
            else:
                n = 99
            return n        # line 8

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (4, 'line'),
             (2, 'line'),
             (3, 'line'),
             (4, 'line'),
             (5, 'line'),
             (8, 'line'),
             (8, 'return')])

    def test_break_through_finally(self):

        def func():
            a, c, d, i = 1, 1, 1, 99
            try:
                for i in range(3):
                    try:
                        a = 5
                        if i > 0:
                            break                   # line 7
                        a = 8
                    finally:
                        c = 10
            except:
                d = 12                              # line 12
            assert a == 5 and c == 10 and d == 1    # line 13

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (4, 'line'),
             (5, 'line'),
             (6, 'line'),
             (8, 'line'),
             (10, 'line'),
             (3, 'line'),
             (4, 'line'),
             (5, 'line'),
             (6, 'line'),
             (7, 'line'),
             (10, 'line')] +
             ([(13, 'line'), (13, 'return')] if __debug__ else [(10, 'return')]))

    def test_continue_through_finally(self):

        def func():
            a, b, c, d, i = 1, 1, 1, 1, 99
            try:
                for i in range(2):
                    try:
                        a = 5
                        if i > 0:
                            continue                # line 7
                        b = 8
                    finally:
                        c = 10
            except:
                d = 12                              # line 12
            assert (a, b, c, d) == (5, 8, 10, 1)    # line 13

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (4, 'line'),
             (5, 'line'),
             (6, 'line'),
             (8, 'line'),
             (10, 'line'),
             (3, 'line'),
             (4, 'line'),
             (5, 'line'),
             (6, 'line'),
             (7, 'line'),
             (10, 'line'),
             (3, 'line')] +
             ([(13, 'line'), (13, 'return')] if __debug__ else [(3, 'return')]))

    def test_return_through_finally(self):

        def func():
            try:
                return 2
            finally:
                4

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (4, 'line'),
             (4, 'return')])

    def test_try_except_with_wrong_type(self):

        def func():
            try:
                2/0
            except IndexError:
                4
            finally:
                return 6

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (2, 'exception'),
             (3, 'line'),
             (6, 'line'),
             (6, 'return')])

    def test_finally_with_conditional(self):

        # See gh-105658
        condition = True
        def func():
            try:
                try:
                    raise Exception
                finally:
                    if condition:
                        result = 1
                result = 2
            except:
                result = 3
            return result

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'exception'),
             (5, 'line'),
             (6, 'line'),
             (8, 'line'),
             (9, 'line'),
             (10, 'line'),
             (10, 'return')])

    def test_break_to_continue1(self):

        def func():
            TRUE = 1
            x = [1]
            while x:
                x.pop()
                while TRUE:
                    break
                continue

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (4, 'line'),
             (5, 'line'),
             (6, 'line'),
             (7, 'line'),
             (3, 'line'),
             (3, 'return')])

    def test_break_to_continue2(self):

        def func():
            TRUE = 1
            x = [1]
            while x:
                x.pop()
                while TRUE:
                    break
                else:
                    continue

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (4, 'line'),
             (5, 'line'),
             (6, 'line'),
             (3, 'line'),
             (3, 'return')])

    def test_break_to_break(self):

        def func():
            TRUE = 1
            while TRUE:
                while TRUE:
                    break
                break

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (4, 'line'),
             (5, 'line'),
             (5, 'return')])

    def test_nested_ifs(self):

        def func():
            a = b = 1
            if a == 1:
                if b == 1:
                    x = 4
                else:
                    y = 6
            else:
                z = 8

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (4, 'line'),
             (4, 'return')])

    def test_nested_ifs_with_and(self):

        def func():
            if A:
                if B:
                    if C:
                        if D:
                            return False
                else:
                    return False
            elif E and F:
                return True

        A = B = True
        C = False

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'return')])

    def test_nested_try_if(self):

        def func():
            x = "hello"
            try:
                3/0
            except ZeroDivisionError:
                if x == 'raise':
                    raise ValueError()   # line 6
            f = 7

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'exception'),
             (4, 'line'),
             (5, 'line'),
             (7, 'line'),
             (7, 'return')])

    def test_if_false_in_with(self):

        class C:
            def __enter__(self):
                return self
            def __exit__(*args):
                pass

        def func():
            with C():
                if False:
                    pass

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (-5, 'call'),
             (-4, 'line'),
             (-4, 'return'),
             (2, 'line'),
             (1, 'line'),
             (-3, 'call'),
             (-2, 'line'),
             (-2, 'return'),
             (1, 'return')])

    def test_if_false_in_try_except(self):

        def func():
            try:
                if False:
                    pass
            except Exception:
                X

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (2, 'return')])

    def test_implicit_return_in_class(self):

        def func():
            class A:
                if 3 < 9:
                    a = 1
                else:
                    a = 2

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (1, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'return'),
             (1, 'return')])

    def test_try_in_try(self):
        def func():
            try:
                try:
                    pass
                except Exception as ex:
                    pass
            except Exception:
                pass

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'return')])

    def test_try_in_try_with_exception(self):

        def func():
            try:
                try:
                    raise TypeError
                except ValueError as ex:
                    5
            except TypeError:
                7

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'exception'),
             (4, 'line'),
             (6, 'line'),
             (7, 'line'),
             (7, 'return')])

        def func():
            try:
                try:
                    raise ValueError
                except ValueError as ex:
                    5
            except TypeError:
                7

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'exception'),
             (4, 'line'),
             (5, 'line'),
             (5, 'return')])

    def test_if_in_if_in_if(self):
        def func(a=0, p=1, z=1):
            if p:
                if a:
                    if z:
                        pass
                    else:
                        pass
            else:
                pass

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (2, 'return')])

    def test_early_exit_with(self):

        class C:
            def __enter__(self):
                return self
            def __exit__(*args):
                pass

        def func_break():
            for i in (1,2):
                with C():
                    break
            pass

        def func_return():
            with C():
                return

        self.run_and_compare(func_break,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (-5, 'call'),
             (-4, 'line'),
             (-4, 'return'),
             (3, 'line'),
             (2, 'line'),
             (-3, 'call'),
             (-2, 'line'),
             (-2, 'return'),
             (4, 'line'),
             (4, 'return')])

        self.run_and_compare(func_return,
            [(0, 'call'),
             (1, 'line'),
             (-11, 'call'),
             (-10, 'line'),
             (-10, 'return'),
             (2, 'line'),
             (1, 'line'),
             (-9, 'call'),
             (-8, 'line'),
             (-8, 'return'),
             (1, 'return')])

    def test_flow_converges_on_same_line(self):

        def foo(x):
            if x:
                try:
                    1/(x - 1)
                except ZeroDivisionError:
                    pass
            return x

        def func():
            for i in range(2):
                foo(i)

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (-8, 'call'),
             (-7, 'line'),
             (-2, 'line'),
             (-2, 'return'),
             (1, 'line'),
             (2, 'line'),
             (-8, 'call'),
             (-7, 'line'),
             (-6, 'line'),
             (-5, 'line'),
             (-5, 'exception'),
             (-4, 'line'),
             (-3, 'line'),
             (-2, 'line'),
             (-2, 'return'),
             (1, 'line'),
             (1, 'return')])

    def test_no_tracing_of_named_except_cleanup(self):

        def func():
            x = 0
            try:
                1/x
            except ZeroDivisionError as error:
                if x:
                    raise
            return "done"

        self.run_and_compare(func,
        [(0, 'call'),
            (1, 'line'),
            (2, 'line'),
            (3, 'line'),
            (3, 'exception'),
            (4, 'line'),
            (5, 'line'),
            (7, 'line'),
            (7, 'return')])

    def test_tracing_exception_raised_in_with(self):

        class NullCtx:
            def __enter__(self):
                return self
            def __exit__(self, *excinfo):
                pass

        def func():
            try:
                with NullCtx():
                    1/0
            except ZeroDivisionError:
                pass

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (-5, 'call'),
             (-4, 'line'),
             (-4, 'return'),
             (3, 'line'),
             (3, 'exception'),
             (2, 'line'),
             (-3, 'call'),
             (-2, 'line'),
             (-2, 'return'),
             (4, 'line'),
             (5, 'line'),
             (5, 'return')])

    def test_try_except_star_no_exception(self):

        def func():
            try:
                2
            except* Exception:
                4
            else:
                6
                if False:
                    8
                else:
                    10
                if func.__name__ == 'Fred':
                    12
            finally:
                14

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (6, 'line'),
             (7, 'line'),
             (10, 'line'),
             (11, 'line'),
             (14, 'line'),
             (14, 'return')])

    def test_try_except_star_named_no_exception(self):

        def func():
            try:
                2
            except* Exception as e:
                4
            else:
                6
            finally:
                8

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (6, 'line'),
             (8, 'line'),
             (8, 'return')])

    def test_try_except_star_exception_caught(self):

        def func():
            try:
                raise ValueError(2)
            except* ValueError:
                4
            else:
                6
            finally:
                8

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (2, 'exception'),
             (3, 'line'),
             (4, 'line'),
             (8, 'line'),
             (8, 'return')])

    def test_try_except_star_named_exception_caught(self):

        def func():
            try:
                raise ValueError(2)
            except* ValueError as e:
                4
            else:
                6
            finally:
                8

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (2, 'exception'),
             (3, 'line'),
             (4, 'line'),
             (8, 'line'),
             (8, 'return')])

    def test_try_except_star_exception_not_caught(self):

        def func():
            try:
                try:
                    raise ValueError(3)
                except* TypeError:
                    5
            except ValueError:
                7

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'exception'),
             (4, 'line'),
             (6, 'line'),
             (7, 'line'),
             (7, 'return')])

    def test_try_except_star_named_exception_not_caught(self):

        def func():
            try:
                try:
                    raise ValueError(3)
                except* TypeError as e:
                    5
            except ValueError:
                7

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'exception'),
             (4, 'line'),
             (6, 'line'),
             (7, 'line'),
             (7, 'return')])

    def test_try_except_star_nested(self):

        def func():
            try:
                try:
                    raise ExceptionGroup(
                        'eg',
                        [ValueError(5), TypeError('bad type')])
                except* TypeError as e:
                    7
                except* OSError:
                    9
                except* ValueError:
                    raise
            except* ValueError:
                try:
                    raise TypeError(14)
                except* OSError:
                    16
                except* TypeError as e:
                    18
            return 0

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (4, 'line'),
             (5, 'line'),
             (3, 'line'),
             (3, 'exception'),
             (6, 'line'),
             (7, 'line'),
             (8, 'line'),
             (10, 'line'),
             (11, 'line'),
             (12, 'line'),
             (13, 'line'),
             (14, 'line'),
             (14, 'exception'),
             (15, 'line'),
             (17, 'line'),
             (18, 'line'),
             (19, 'line'),
             (19, 'return')])

    def test_notrace_lambda(self):
        #Regression test for issue 46314

        def func():
            1
            lambda x: 2
            3

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'return')])

    def test_class_creation_with_docstrings(self):

        def func():
            class Class_1:
                ''' the docstring. 2'''
                def __init__(self):
                    ''' Another docstring. 4'''
                    self.a = 5

        self.run_and_compare(func,
            [(0, 'call'),
             (1, 'line'),
             (1, 'call'),
             (1, 'line'),
             (2, 'line'),
             (3, 'line'),
             (3, 'return'),
             (1, 'return')])

    def test_class_creation_with_decorator(self):
        def func():
            def decorator(arg):
                def _dec(c):
                    return c
                return _dec

            @decorator(6)
            @decorator(
                len([8]),
            )
            class MyObject:
                pass

        self.run_and_compare(func, [
            (0, 'call'),
            (1, 'line'),
            (6, 'line'),
            (1, 'call'),
            (2, 'line'),
            (4, 'line'),
            (4, 'return'),
            (7, 'line'),
            (8, 'line'),
            (7, 'line'),
            (1, 'call'),
            (2, 'line'),
            (4, 'line'),
            (4, 'return'),
            (10, 'line'),
            (6, 'call'),
            (6, 'line'),
            (11, 'line'),
            (11, 'return'),
            (7, 'line'),
            (2, 'call'),
            (3, 'line'),
            (3, 'return'),
            (6, 'line'),
            (2, 'call'),
            (3, 'line'),
            (3, 'return'),
            (10, 'line'),
            (10, 'return'),
        ])

    @support.cpython_only
    def test_no_line_event_after_creating_generator(self):
        # Spurious line events before call events only show up with C tracer

        # Skip this test if the _testcapi module isn't available.
        _testcapi = import_helper.import_module('_testcapi')

        def gen():
            yield 1

        def func():
            for _ in (
                gen()
            ):
                pass

        EXPECTED_EVENTS = [
            (0, 'call'),
            (2, 'line'),
            (-3, 'call'),
            (-2, 'line'),
            (-2, 'return'),
            (1, 'line'),
            (4, 'line'),
            (2, 'line'),
            (-2, 'call'),
            (-2, 'return'),
            (2, 'return'),
        ]

        # C level events should be the same as expected and the same as Python level.

        events = []
        # Turning on and off tracing must be on same line to avoid unwanted LINE events.
        _testcapi.settrace_to_record(events); func(); sys.settrace(None)
        start_line = func.__code__.co_firstlineno
        events = [
            (line-start_line, EVENT_NAMES[what])
            for (what, line, arg) in events
        ]
        self.assertEqual(events, EXPECTED_EVENTS)

        self.run_and_compare(func, EXPECTED_EVENTS)

    def test_correct_tracing_quickened_call_class_init(self):

        class C:
            def __init__(self):
                self

        def func():
            C()

        EXPECTED_EVENTS = [
            (0, 'call'),
            (1, 'line'),
            (-3, 'call'),
            (-2, 'line'),
            (-2, 'return'),
            (1, 'return')]

        self.run_and_compare(func, EXPECTED_EVENTS)
        # Quicken
        for _ in range(100):
            func()
        self.run_and_compare(func, EXPECTED_EVENTS)

    def test_settrace_error(self):
        raised = False
        def error_once(frame, event, arg):
            nonlocal raised
            if not raised:
                raised = True
                raise Exception
            return error

        try:
            sys._getframe().f_trace = error_once
            sys.settrace(error_once)
            len([])
        except Exception as ex:
            count = 0
            tb = ex.__traceback__
            while tb:
                if tb.tb_frame.f_code.co_name == "test_settrace_error":
                    count += 1
                tb = tb.tb_next
            if count == 0:
                self.fail("Traceback is missing frame")
            elif count > 1:
                self.fail("Traceback has frame more than once")
        else:
            self.fail("No exception raised")
        finally:
            sys.settrace(None)

    @support.cpython_only
    def test_testcapi_settrace_error(self):

        # Skip this test if the _testcapi module isn't available.
        _testcapi = import_helper.import_module('_testcapi')

        try:
            _testcapi.settrace_to_error([])
            len([])
        except Exception as ex:
            count = 0
            tb = ex.__traceback__
            while tb:
                if tb.tb_frame.f_code.co_name == "test_testcapi_settrace_error":
                    count += 1
                tb = tb.tb_next
            if count == 0:
                self.fail("Traceback is missing frame")
            elif count > 1:
                self.fail("Traceback has frame more than once")
        else:
            self.fail("No exception raised")
        finally:
            sys.settrace(None)

    def test_very_large_function(self):
        # There is a separate code path when the number of lines > (1 << 15).
        d = {}
        exec("""def f():              # line 0
            x = 0                     # line 1
            y = 1                     # line 2
            %s                        # lines 3 through (1 << 16)
            x += 1                    #
            return""" % ('\n' * (1 << 16),), d)
        f = d['f']

        EXPECTED_EVENTS = [
            (0, 'call'),
            (1, 'line'),
            (2, 'line'),
            (65540, 'line'),
            (65541, 'line'),
            (65541, 'return'),
        ]

        self.run_and_compare(f, EXPECTED_EVENTS)


EVENT_NAMES = [
    'call',
    'exception',
    'line',
    'return'
]


class SkipLineEventsTraceTestCase(TraceTestCase):
    """Repeat the trace tests, but with per-line events skipped"""

    def compare_events(self, line_offset, events, expected_events):
        skip_line_events = [e for e in expected_events if e[1] != 'line']
        super().compare_events(line_offset, events, skip_line_events)

    @staticmethod
    def make_tracer():
        return Tracer(trace_line_events=False)


@support.cpython_only
class TraceOpcodesTestCase(TraceTestCase):
    """Repeat the trace tests, but with per-opcodes events enabled"""

    def compare_events(self, line_offset, events, expected_events):
        skip_opcode_events = [e for e in events if e[1] != 'opcode']
        if len(events) > 1:
            self.assertLess(len(skip_opcode_events), len(events),
                            msg="No 'opcode' events received by the tracer")
        super().compare_events(line_offset, skip_opcode_events, expected_events)

    @staticmethod
    def make_tracer():
        return Tracer(trace_opcode_events=True)

    @requires_subprocess()
    def test_trace_opcodes_after_settrace(self):
        """Make sure setting f_trace_opcodes after starting trace works even
        if it's the first time f_trace_opcodes is being set. GH-103615"""

        code = textwrap.dedent("""
            import sys

            def opcode_trace_func(frame, event, arg):
                if event == "opcode":
                    print("opcode trace triggered")
                return opcode_trace_func

            sys.settrace(opcode_trace_func)
            sys._getframe().f_trace = opcode_trace_func
            sys._getframe().f_trace_opcodes = True
            a = 1
        """)

        # We can't use context manager because Windows can't execute a file while
        # it's being written
        tmp = tempfile.NamedTemporaryFile(delete=False, suffix='.py')
        tmp.write(code.encode('utf-8'))
        tmp.close()
        try:
            p = subprocess.Popen([sys.executable, tmp.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            p.wait()
            out = p.stdout.read()
        finally:
            os.remove(tmp.name)
            p.stdout.close()
            p.stderr.close()
        self.assertIn(b"opcode trace triggered", out)


class RaisingTraceFuncTestCase(unittest.TestCase):
    def setUp(self):
        self.addCleanup(sys.settrace, sys.gettrace())

    def trace(self, frame, event, arg):
        """A trace function that raises an exception in response to a
        specific trace event."""
        if event == self.raiseOnEvent:
            raise ValueError # just something that isn't RuntimeError
        else:
            return self.trace

    def f(self):
        """The function to trace; raises an exception if that's the case
        we're testing, so that the 'exception' trace event fires."""
        if self.raiseOnEvent == 'exception':
            x = 0
            y = 1/x
        else:
            return 1

    def run_test_for_event(self, event):
        """Tests that an exception raised in response to the given event is
        handled OK."""
        self.raiseOnEvent = event
        try:
            for i in range(sys.getrecursionlimit() + 1):
                sys.settrace(self.trace)
                try:
                    self.f()
                except ValueError:
                    pass
                else:
                    self.fail("exception not raised!")
        except RuntimeError:
            self.fail("recursion counter not reset")

    # Test the handling of exceptions raised by each kind of trace event.
    def test_call(self):
        self.run_test_for_event('call')
    def test_line(self):
        self.run_test_for_event('line')
    def test_return(self):
        self.run_test_for_event('return')
    def test_exception(self):
        self.run_test_for_event('exception')

    def test_trash_stack(self):
        def f():
            for i in range(5):
                print(i)  # line tracing will raise an exception at this line

        def g(frame, why, extra):
            if (why == 'line' and
                frame.f_lineno == f.__code__.co_firstlineno + 2):
                raise RuntimeError("i am crashing")
            return g

        sys.settrace(g)
        try:
            f()
        except RuntimeError:
            # the test is really that this doesn't segfault:
            import gc
            gc.collect()
        else:
            self.fail("exception not propagated")


    def test_exception_arguments(self):
        def f():
            x = 0
            # this should raise an error
            x.no_such_attr
        def g(frame, event, arg):
            if (event == 'exception'):
                type, exception, trace = arg
                self.assertIsInstance(exception, Exception)
            return g

        existing = sys.gettrace()
        try:
            sys.settrace(g)
            try:
                f()
            except AttributeError:
                # this is expected
                pass
        finally:
            sys.settrace(existing)

    def test_line_event_raises_before_opcode_event(self):
        exception = ValueError("BOOM!")
        def trace(frame, event, arg):
            if event == "line":
                raise exception
            frame.f_trace_opcodes = True
            return trace
        def f():
            pass
        with self.assertRaises(ValueError) as caught:
            sys.settrace(trace)
            f()
        self.assertIs(caught.exception, exception)


# 'Jump' tests: assigning to frame.f_lineno within a trace function
# moves the execution position - it's how debuggers implement a Jump
# command (aka. "Set next statement").

class JumpTracer:
    """Defines a trace function that jumps from one place to another."""

    def __init__(self, function, jumpFrom, jumpTo, event='line',
                 decorated=False):
        self.code = function.__code__
        self.jumpFrom = jumpFrom
        self.jumpTo = jumpTo
        self.event = event
        self.firstLine = None if decorated else self.code.co_firstlineno
        self.done = False

    def trace(self, frame, event, arg):
        if self.done:
            return
        assert lineno_matches_lasti(frame)
        # frame.f_code.co_firstlineno is the first line of the decorator when
        # 'function' is decorated and the decorator may be written using
        # multiple physical lines when it is too long. Use the first line
        # trace event in 'function' to find the first line of 'function'.
        if (self.firstLine is None and frame.f_code == self.code and
                event == 'line'):
            self.firstLine = frame.f_lineno - 1
        if (event == self.event and self.firstLine is not None and
                frame.f_lineno == self.firstLine + self.jumpFrom):
            f = frame
            while f is not None and f.f_code != self.code:
                f = f.f_back
            if f is not None:
                # Cope with non-integer self.jumpTo (because of
                # no_jump_to_non_integers below).
                try:
                    frame.f_lineno = self.firstLine + self.jumpTo
                except TypeError:
                    frame.f_lineno = self.jumpTo
                self.done = True
        return self.trace

# This verifies the line-numbers-must-be-integers rule.
def no_jump_to_non_integers(output):
    try:
        output.append(2)
    except ValueError as e:
        output.append('integer' in str(e))

# This verifies that you can't set f_lineno via _getframe or similar
# trickery.
def no_jump_without_trace_function():
    try:
        previous_frame = sys._getframe().f_back
        previous_frame.f_lineno = previous_frame.f_lineno
    except ValueError as e:
        # This is the exception we wanted; make sure the error message
        # talks about trace functions.
        if 'trace' not in str(e):
            raise
    else:
        # Something's wrong - the expected exception wasn't raised.
        raise AssertionError("Trace-function-less jump failed to fail")


class JumpTestCase(unittest.TestCase):
    unbound_locals = r"assigning None to [0-9]+ unbound local"

    def setUp(self):
        self.addCleanup(sys.settrace, sys.gettrace())
        sys.settrace(None)

    def compare_jump_output(self, expected, received):
        if received != expected:
            self.fail( "Outputs don't match:\n" +
                       "Expected: " + repr(expected) + "\n" +
                       "Received: " + repr(received))

    def run_test(self, func, jumpFrom, jumpTo, expected, error=None,
                 event='line', decorated=False, warning=None):
        wrapped = func
        while hasattr(wrapped, '__wrapped__'):
            wrapped = wrapped.__wrapped__

        tracer = JumpTracer(wrapped, jumpFrom, jumpTo, event, decorated)
        sys.settrace(tracer.trace)
        output = []

        with contextlib.ExitStack() as stack:
            if error is not None:
                stack.enter_context(self.assertRaisesRegex(*error))
            if warning is not None:
                stack.enter_context(self.assertWarnsRegex(*warning))
            else:
                stack.enter_context(warnings.catch_warnings())
                warnings.simplefilter('error')
            func(output)

        sys.settrace(None)
        self.compare_jump_output(expected, output)

    def run_async_test(self, func, jumpFrom, jumpTo, expected, error=None,
                 event='line', decorated=False, warning=None):
        wrapped = func
        while hasattr(wrapped, '__wrapped__'):
            wrapped = wrapped.__wrapped__

        tracer = JumpTracer(wrapped, jumpFrom, jumpTo, event, decorated)
        sys.settrace(tracer.trace)
        output = []

        with contextlib.ExitStack() as stack:
            if error is not None:
                stack.enter_context(self.assertRaisesRegex(*error))
            if warning is not None:
                stack.enter_context(self.assertWarnsRegex(*warning))
            asyncio.run(func(output))

        sys.settrace(None)
        asyncio.set_event_loop_policy(None)
        self.compare_jump_output(expected, output)

    def jump_test(jumpFrom, jumpTo, expected, error=None, event='line', warning=None):
        """Decorator that creates a test that makes a jump
        from one place to another in the following code.
        """
        def decorator(func):
            @wraps(func)
            def test(self):
                self.run_test(func, jumpFrom, jumpTo, expected,
                              error=error, event=event, decorated=True, warning=warning)
            return test
        return decorator

    def async_jump_test(jumpFrom, jumpTo, expected, error=None, event='line', warning=None):
        """Decorator that creates a test that makes a jump
        from one place to another in the following asynchronous code.
        """
        def decorator(func):
            @wraps(func)
            def test(self):
                self.run_async_test(func, jumpFrom, jumpTo, expected,
                              error=error, event=event, decorated=True, warning=warning)
            return test
        return decorator

    ## The first set of 'jump' tests are for things that are allowed:

    @jump_test(1, 3, [3])
    def test_jump_simple_forwards(output):
        output.append(1)
        output.append(2)
        output.append(3)

    @jump_test(2, 1, [1, 1, 2])
    def test_jump_simple_backwards(output):
        output.append(1)
        output.append(2)

    @jump_test(1, 4, [5], warning=(RuntimeWarning, unbound_locals))
    def test_jump_is_none_forwards(output):
        x = None
        if x is None:
            output.append(3)
        else:
            output.append(5)

    @jump_test(6, 5, [3, 5, 6])
    def test_jump_is_none_backwards(output):
        x = None
        if x is None:
            output.append(3)
        else:
            output.append(5)
        output.append(6)

    @jump_test(2, 4, [5])
    def test_jump_is_not_none_forwards(output):
        x = None
        if x is not None:
            output.append(3)
        else:
            output.append(5)

    @jump_test(6, 5, [5, 5, 6])
    def test_jump_is_not_none_backwards(output):
        x = None
        if x is not None:
            output.append(3)
        else:
            output.append(5)
        output.append(6)

    @jump_test(3, 5, [2, 5], warning=(RuntimeWarning, unbound_locals))
    def test_jump_out_of_block_forwards(output):
        for i in 1, 2:
            output.append(2)
            for j in [3]:  # Also tests jumping over a block
                output.append(4)
        output.append(5)

    @jump_test(6, 1, [1, 3, 5, 1, 3, 5, 6, 7])
    def test_jump_out_of_block_backwards(output):
        output.append(1)
        for i in [1]:
            output.append(3)
            for j in [2]:  # Also tests jumping over a block
                output.append(5)
            output.append(6)
        output.append(7)

    @async_jump_test(4, 5, [3, 5])
    @clean_asynciter
    async def test_jump_out_of_async_for_block_forwards(output, asynciter):
        for i in [1]:
            async for i in asynciter([1, 2]):
                output.append(3)
                output.append(4)
            output.append(5)

    @async_jump_test(5, 2, [2, 4, 2, 4, 5, 6])
    @clean_asynciter
    async def test_jump_out_of_async_for_block_backwards(output, asynciter):
        for i in [1]:
            output.append(2)
            async for i in asynciter([1]):
                output.append(4)
                output.append(5)
            output.append(6)

    @jump_test(1, 2, [3])
    def test_jump_to_codeless_line(output):
        output.append(1)
        # Jumping to this line should skip to the next one.
        output.append(3)

    @jump_test(2, 2, [1, 2, 3])
    def test_jump_to_same_line(output):
        output.append(1)
        output.append(2)
        output.append(3)

    # Tests jumping within a finally block, and over one.
    @jump_test(4, 9, [2, 9])
    def test_jump_in_nested_finally(output):
        try:
            output.append(2)
        finally:
            output.append(4)
            try:
                output.append(6)
            finally:
                output.append(8)
            output.append(9)

    @jump_test(6, 7, [2, 7], (ZeroDivisionError, ''))
    def test_jump_in_nested_finally_2(output):
        try:
            output.append(2)
            1/0
            return
        finally:
            output.append(6)
            output.append(7)
        output.append(8)

    @jump_test(6, 11, [2, 11], (ZeroDivisionError, ''))
    def test_jump_in_nested_finally_3(output):
        try:
            output.append(2)
            1/0
            return
        finally:
            output.append(6)
            try:
                output.append(8)
            finally:
                output.append(10)
            output.append(11)
        output.append(12)

    @jump_test(5, 11, [2, 4], (ValueError, 'comes after the current code block'))
    def test_no_jump_over_return_try_finally_in_finally_block(output):
        try:
            output.append(2)
        finally:
            output.append(4)
            output.append(5)
            return
            try:
                output.append(8)
            finally:
                output.append(10)
            pass
        output.append(12)

    @jump_test(3, 4, [1], (ValueError, 'after'))
    def test_no_jump_infinite_while_loop(output):
        output.append(1)
        while True:
            output.append(3)
        output.append(4)

    @jump_test(2, 4, [4, 4])
    def test_jump_forwards_into_while_block(output):
        i = 1
        output.append(2)
        while i <= 2:
            output.append(4)
            i += 1

    @jump_test(5, 3, [3, 3, 3, 5])
    def test_jump_backwards_into_while_block(output):
        i = 1
        while i <= 2:
            output.append(3)
            i += 1
        output.append(5)

    @jump_test(2, 3, [1, 3])
    def test_jump_forwards_out_of_with_block(output):
        with tracecontext(output, 1):
            output.append(2)
        output.append(3)

    @async_jump_test(2, 3, [1, 3])
    async def test_jump_forwards_out_of_async_with_block(output):
        async with asynctracecontext(output, 1):
            output.append(2)
        output.append(3)

    @jump_test(3, 1, [1, 2, 1, 2, 3, -2])
    def test_jump_backwards_out_of_with_block(output):
        output.append(1)
        with tracecontext(output, 2):
            output.append(3)

    @async_jump_test(3, 1, [1, 2, 1, 2, 3, -2])
    async def test_jump_backwards_out_of_async_with_block(output):
        output.append(1)
        async with asynctracecontext(output, 2):
            output.append(3)

    @jump_test(2, 5, [5])
    def test_jump_forwards_out_of_try_finally_block(output):
        try:
            output.append(2)
        finally:
            output.append(4)
        output.append(5)

    @jump_test(3, 1, [1, 1, 3, 5])
    def test_jump_backwards_out_of_try_finally_block(output):
        output.append(1)
        try:
            output.append(3)
        finally:
            output.append(5)

    @jump_test(2, 6, [6])
    def test_jump_forwards_out_of_try_except_block(output):
        try:
            output.append(2)
        except:
            output.append(4)
            raise
        output.append(6)

    @jump_test(3, 1, [1, 1, 3])
    def test_jump_backwards_out_of_try_except_block(output):
        output.append(1)
        try:
            output.append(3)
        except:
            output.append(5)
            raise

    @jump_test(5, 7, [4, 7, 8])
    def test_jump_between_except_blocks(output):
        try:
            1/0
        except ZeroDivisionError:
            output.append(4)
            output.append(5)
        except FloatingPointError:
            output.append(7)
        output.append(8)

    @jump_test(5, 7, [4, 7, 8])
    def test_jump_from_except_to_finally(output):
        try:
            1/0
        except ZeroDivisionError:
            output.append(4)
            output.append(5)
        finally:
            output.append(7)
        output.append(8)

    @jump_test(5, 6, [4, 6, 7])
    def test_jump_within_except_block(output):
        try:
            1/0
        except:
            output.append(4)
            output.append(5)
            output.append(6)
        output.append(7)

    @jump_test(6, 1, [1, 5, 1, 5], warning=(RuntimeWarning, unbound_locals))
    def test_jump_over_try_except(output):
        output.append(1)
        try:
            1 / 0
        except ZeroDivisionError as e:
            output.append(5)
        x = 42  # has to be a two-instruction block

    @jump_test(2, 4, [1, 4, 5, -4])
    def test_jump_across_with(output):
        output.append(1)
        with tracecontext(output, 2):
            output.append(3)
        with tracecontext(output, 4):
            output.append(5)

    @async_jump_test(2, 4, [1, 4, 5, -4])
    async def test_jump_across_async_with(output):
        output.append(1)
        async with asynctracecontext(output, 2):
            output.append(3)
        async with asynctracecontext(output, 4):
            output.append(5)

    @jump_test(4, 5, [1, 3, 5, 6])
    def test_jump_out_of_with_block_within_for_block(output):
        output.append(1)
        for i in [1]:
            with tracecontext(output, 3):
                output.append(4)
            output.append(5)
        output.append(6)

    @async_jump_test(4, 5, [1, 3, 5, 6])
    async def test_jump_out_of_async_with_block_within_for_block(output):
        output.append(1)
        for i in [1]:
            async with asynctracecontext(output, 3):
                output.append(4)
            output.append(5)
        output.append(6)

    @jump_test(4, 5, [1, 2, 3, 5, -2, 6])
    def test_jump_out_of_with_block_within_with_block(output):
        output.append(1)
        with tracecontext(output, 2):
            with tracecontext(output, 3):
                output.append(4)
            output.append(5)
        output.append(6)

    @async_jump_test(4, 5, [1, 2, 3, 5, -2, 6])
    async def test_jump_out_of_async_with_block_within_with_block(output):
        output.append(1)
        with tracecontext(output, 2):
            async with asynctracecontext(output, 3):
                output.append(4)
            output.append(5)
        output.append(6)

    @jump_test(5, 6, [2, 4, 6, 7])
    def test_jump_out_of_with_block_within_finally_block(output):
        try:
            output.append(2)
        finally:
            with tracecontext(output, 4):
                output.append(5)
            output.append(6)
        output.append(7)

    @async_jump_test(5, 6, [2, 4, 6, 7])
    async def test_jump_out_of_async_with_block_within_finally_block(output):
        try:
            output.append(2)
        finally:
            async with asynctracecontext(output, 4):
                output.append(5)
            output.append(6)
        output.append(7)

    @jump_test(8, 11, [1, 3, 5, 11, 12])
    def test_jump_out_of_complex_nested_blocks(output):
        output.append(1)
        for i in [1]:
            output.append(3)
            for j in [1, 2]:
                output.append(5)
                try:
                    for k in [1, 2]:
                        output.append(8)
                finally:
                    output.append(10)
            output.append(11)
        output.append(12)

    @jump_test(3, 5, [1, 2, 5], warning=(RuntimeWarning, unbound_locals))
    def test_jump_out_of_with_assignment(output):
        output.append(1)
        with tracecontext(output, 2) \
                as x:
            output.append(4)
        output.append(5)

    @async_jump_test(3, 5, [1, 2, 5], warning=(RuntimeWarning, unbound_locals))
    async def test_jump_out_of_async_with_assignment(output):
        output.append(1)
        async with asynctracecontext(output, 2) \
                as x:
            output.append(4)
        output.append(5)

    @jump_test(3, 6, [1, 6, 8, 9])
    def test_jump_over_return_in_try_finally_block(output):
        output.append(1)
        try:
            output.append(3)
            if not output: # always false
                return
            output.append(6)
        finally:
            output.append(8)
        output.append(9)

    @jump_test(5, 8, [1, 3, 8, 10, 11, 13])
    def test_jump_over_break_in_try_finally_block(output):
        output.append(1)
        while True:
            output.append(3)
            try:
                output.append(5)
                if not output: # always false
                    break
                output.append(8)
            finally:
                output.append(10)
            output.append(11)
            break
        output.append(13)

    @jump_test(1, 7, [7, 8], warning=(RuntimeWarning, unbound_locals))
    def test_jump_over_for_block_before_else(output):
        output.append(1)
        if not output:  # always false
            for i in [3]:
                output.append(4)
        else:
            output.append(6)
            output.append(7)
        output.append(8)

    @async_jump_test(1, 7, [7, 8], warning=(RuntimeWarning, unbound_locals))
    async def test_jump_over_async_for_block_before_else(output):
        output.append(1)
        if not output:  # always false
            async for i in asynciter([3]):
                output.append(4)
        else:
            output.append(6)
            output.append(7)
        output.append(8)

    # The second set of 'jump' tests are for things that are not allowed:

    @jump_test(2, 3, [1], (ValueError, 'after'))
    def test_no_jump_too_far_forwards(output):
        output.append(1)
        output.append(2)

    @jump_test(2, -2, [1], (ValueError, 'before'))
    def test_no_jump_too_far_backwards(output):
        output.append(1)
        output.append(2)

    # Test each kind of 'except' line.
    @jump_test(2, 3, [4], (ValueError, 'except'))
    def test_no_jump_to_except_1(output):
        try:
            output.append(2)
        except:
            output.append(4)
            raise

    @jump_test(2, 3, [4], (ValueError, 'except'))
    def test_no_jump_to_except_2(output):
        try:
            output.append(2)
        except ValueError:
            output.append(4)
            raise

    @jump_test(2, 3, [4], (ValueError, 'except'))
    def test_no_jump_to_except_3(output):
        try:
            output.append(2)
        except ValueError as e:
            output.append(4)
            raise e

    @jump_test(2, 3, [4], (ValueError, 'except'))
    def test_no_jump_to_except_4(output):
        try:
            output.append(2)
        except (ValueError, RuntimeError) as e:
            output.append(4)
            raise e

    @jump_test(1, 3, [], (ValueError, 'into'))
    def test_no_jump_forwards_into_for_block(output):
        output.append(1)
        for i in 1, 2:
            output.append(3)

    @async_jump_test(1, 3, [], (ValueError, 'into'))
    async def test_no_jump_forwards_into_async_for_block(output):
        output.append(1)
        async for i in asynciter([1, 2]):
            output.append(3)
        pass

    @jump_test(3, 2, [2, 2], (ValueError, 'into'))
    def test_no_jump_backwards_into_for_block(output):
        for i in 1, 2:
            output.append(2)
        output.append(3)


    @async_jump_test(3, 2, [2, 2], (ValueError, "can't jump into the body of a for loop"))
    async def test_no_jump_backwards_into_async_for_block(output):
        async for i in asynciter([1, 2]):
            output.append(2)
        output.append(3)

    @jump_test(1, 3, [], (ValueError, 'stack'))
    def test_no_jump_forwards_into_with_block(output):
        output.append(1)
        with tracecontext(output, 2):
            output.append(3)

    @async_jump_test(1, 3, [], (ValueError, 'stack'))
    async def test_no_jump_forwards_into_async_with_block(output):
        output.append(1)
        async with asynctracecontext(output, 2):
            output.append(3)

    @jump_test(3, 2, [1, 2, -1], (ValueError, 'stack'))
    def test_no_jump_backwards_into_with_block(output):
        with tracecontext(output, 1):
            output.append(2)
        output.append(3)

    @async_jump_test(3, 2, [1, 2, -1], (ValueError, 'stack'))
    async def test_no_jump_backwards_into_async_with_block(output):
        async with asynctracecontext(output, 1):
            output.append(2)
        output.append(3)

    @jump_test(1, 3, [3, 5])
    def test_jump_forwards_into_try_finally_block(output):
        output.append(1)
        try:
            output.append(3)
        finally:
            output.append(5)

    @jump_test(5, 2, [2, 4, 2, 4, 5])
    def test_jump_backwards_into_try_finally_block(output):
        try:
            output.append(2)
        finally:
            output.append(4)
        output.append(5)

    @jump_test(1, 3, [3])
    def test_jump_forwards_into_try_except_block(output):
        output.append(1)
        try:
            output.append(3)
        except:
            output.append(5)
            raise

    @jump_test(6, 2, [2, 2, 6])
    def test_jump_backwards_into_try_except_block(output):
        try:
            output.append(2)
        except:
            output.append(4)
            raise
        output.append(6)

    # 'except' with a variable creates an implicit finally block
    @jump_test(5, 7, [4, 7, 8], warning=(RuntimeWarning, unbound_locals))
    def test_jump_between_except_blocks_2(output):
        try:
            1/0
        except ZeroDivisionError:
            output.append(4)
            output.append(5)
        except FloatingPointError as e:
            output.append(7)
        output.append(8)

    @jump_test(1, 5, [5])
    def test_jump_into_finally_block(output):
        output.append(1)
        try:
            output.append(3)
        finally:
            output.append(5)

    @jump_test(3, 6, [2, 6, 7])
    def test_jump_into_finally_block_from_try_block(output):
        try:
            output.append(2)
            output.append(3)
        finally:  # still executed if the jump is failed
            output.append(5)
            output.append(6)
        output.append(7)

    @jump_test(5, 1, [1, 3, 1, 3, 5])
    def test_jump_out_of_finally_block(output):
        output.append(1)
        try:
            output.append(3)
        finally:
            output.append(5)

    @jump_test(1, 5, [], (ValueError, "can't jump into an 'except' block as there's no exception"))
    def test_no_jump_into_bare_except_block(output):
        output.append(1)
        try:
            output.append(3)
        except:
            output.append(5)

    @jump_test(1, 5, [], (ValueError, "can't jump into an 'except' block as there's no exception"))
    def test_no_jump_into_qualified_except_block(output):
        output.append(1)
        try:
            output.append(3)
        except Exception:
            output.append(5)

    @jump_test(3, 6, [2, 5, 6], (ValueError, "can't jump into an 'except' block as there's no exception"))
    def test_no_jump_into_bare_except_block_from_try_block(output):
        try:
            output.append(2)
            output.append(3)
        except:  # executed if the jump is failed
            output.append(5)
            output.append(6)
            raise
        output.append(8)

    @jump_test(3, 6, [2], (ValueError, "can't jump into an 'except' block as there's no exception"))
    def test_no_jump_into_qualified_except_block_from_try_block(output):
        try:
            output.append(2)
            output.append(3)
        except ZeroDivisionError:
            output.append(5)
            output.append(6)
            raise
        output.append(8)

    @jump_test(7, 1, [1, 3, 6, 1, 3, 6, 7])
    def test_jump_out_of_bare_except_block(output):
        output.append(1)
        try:
            output.append(3)
            1/0
        except:
            output.append(6)
            output.append(7)

    @jump_test(7, 1, [1, 3, 6, 1, 3, 6, 7])
    def test_jump_out_of_qualified_except_block(output):
        output.append(1)
        try:
            output.append(3)
            1/0
        except Exception:
            output.append(6)
            output.append(7)

    @jump_test(3, 5, [1, 2, 5, -2])
    def test_jump_between_with_blocks(output):
        output.append(1)
        with tracecontext(output, 2):
            output.append(3)
        with tracecontext(output, 4):
            output.append(5)

    @async_jump_test(3, 5, [1, 2, 5, -2])
    async def test_jump_between_async_with_blocks(output):
        output.append(1)
        async with asynctracecontext(output, 2):
            output.append(3)
        async with asynctracecontext(output, 4):
            output.append(5)

    @jump_test(5, 7, [2, 4], (ValueError, "after"))
    def test_no_jump_over_return_out_of_finally_block(output):
        try:
            output.append(2)
        finally:
            output.append(4)
            output.append(5)
            return
        output.append(7)

    @jump_test(7, 4, [1, 6], (ValueError, 'into'))
    def test_no_jump_into_for_block_before_else(output):
        output.append(1)
        if not output:  # always false
            for i in [3]:
                output.append(4)
        else:
            output.append(6)
            output.append(7)
        output.append(8)

    @async_jump_test(7, 4, [1, 6], (ValueError, 'into'))
    async def test_no_jump_into_async_for_block_before_else(output):
        output.append(1)
        if not output:  # always false
            async for i in asynciter([3]):
                output.append(4)
        else:
            output.append(6)
            output.append(7)
        output.append(8)

    def test_no_jump_to_non_integers(self):
        self.run_test(no_jump_to_non_integers, 2, "Spam", [True])

    def test_no_jump_without_trace_function(self):
        # Must set sys.settrace(None) in setUp(), else condition is not
        # triggered.
        no_jump_without_trace_function()

    def test_large_function(self):
        d = {}
        exec("""def f(output):        # line 0
            x = 0                     # line 1
            y = 1                     # line 2
            '''                       # line 3
            %s                        # lines 4-1004
            '''                       # line 1005
            x += 1                    # line 1006
            output.append(x)          # line 1007
            return""" % ('\n' * 1000,), d)
        f = d['f']
        self.run_test(f, 2, 1007, [0], warning=(RuntimeWarning, self.unbound_locals))

    def test_jump_to_firstlineno(self):
        # This tests that PDB can jump back to the first line in a
        # file.  See issue #1689458.  It can only be triggered in a
        # function call if the function is defined on a single line.
        code = compile("""
# Comments don't count.
output.append(2)  # firstlineno is here.
output.append(3)
output.append(4)
""", "<fake module>", "exec")
        class fake_function:
            __code__ = code
        tracer = JumpTracer(fake_function, 4, 1)
        sys.settrace(tracer.trace)
        namespace = {"output": []}
        exec(code, namespace)
        sys.settrace(None)
        self.compare_jump_output([2, 3, 2, 3, 4], namespace["output"])

    @jump_test(2, 3, [1], event='call', error=(ValueError, "can't jump from"
               " the 'call' trace event of a new frame"))
    def test_no_jump_from_call(output):
        output.append(1)
        def nested():
            output.append(3)
        nested()
        output.append(5)

    @jump_test(2, 1, [1], event='return', error=(ValueError,
               "can only jump from a 'line' trace event"))
    def test_no_jump_from_return_event(output):
        output.append(1)
        return

    @jump_test(2, 1, [1], event='exception', error=(ValueError,
               "can only jump from a 'line' trace event"))
    def test_no_jump_from_exception_event(output):
        output.append(1)
        1 / 0

    @jump_test(3, 2, [2, 2, 5], event='return')
    def test_jump_from_yield(output):
        def gen():
            output.append(2)
            yield 3
        next(gen())
        output.append(5)

    @jump_test(2, 3, [1, 3], warning=(RuntimeWarning, unbound_locals))
    def test_jump_forward_over_listcomp(output):
        output.append(1)
        x = [i for i in range(10)]
        output.append(3)

    # checking for segfaults.
    # See https://github.com/python/cpython/issues/92311
    @jump_test(3, 1, [], warning=(RuntimeWarning, unbound_locals))
    def test_jump_backward_over_listcomp(output):
        a = 1
        x = [i for i in range(10)]
        c = 3

    @jump_test(8, 2, [2, 7, 2], warning=(RuntimeWarning, unbound_locals))
    def test_jump_backward_over_listcomp_v2(output):
        flag = False
        output.append(2)
        if flag:
            return
        x = [i for i in range(5)]
        flag = 6
        output.append(7)
        output.append(8)

    @async_jump_test(2, 3, [1, 3], warning=(RuntimeWarning, unbound_locals))
    async def test_jump_forward_over_async_listcomp(output):
        output.append(1)
        x = [i async for i in asynciter(range(10))]
        output.append(3)

    @async_jump_test(3, 1, [], warning=(RuntimeWarning, unbound_locals))
    async def test_jump_backward_over_async_listcomp(output):
        a = 1
        x = [i async for i in asynciter(range(10))]
        c = 3

    @async_jump_test(8, 2, [2, 7, 2], warning=(RuntimeWarning, unbound_locals))
    async def test_jump_backward_over_async_listcomp_v2(output):
        flag = False
        output.append(2)
        if flag:
            return
        x = [i async for i in asynciter(range(5))]
        flag = 6
        output.append(7)
        output.append(8)

    # checking for segfaults.
    @jump_test(3, 7, [], error=(ValueError, "stack"))
    def test_jump_with_null_on_stack_load_global(output):
        a = 1
        print(
            output.append(3)
        )
        output.append(5)
        (
            ( # 7
                a
                +
                10
            )
            +
            13
        )
        output.append(15)

    # checking for segfaults.
    @jump_test(4, 8, [], error=(ValueError, "stack"))
    def test_jump_with_null_on_stack_push_null(output):
        a = 1
        f = print
        f(
            output.append(4)
        )
        output.append(6)
        (
            ( # 8
                a
                +
                11
            )
            +
            14
        )
        output.append(16)

    # checking for segfaults.
    @jump_test(3, 7, [], error=(ValueError, "stack"))
    def test_jump_with_null_on_stack_load_attr(output):
        a = 1
        list.append(
            output, 3
        )
        output.append(5)
        (
            ( # 7
                a
                +
                10
            )
            +
            13
        )
        output.append(15)

    @jump_test(2, 3, [1, 3], warning=(RuntimeWarning, unbound_locals))
    def test_jump_extended_args_unpack_ex_simple(output):
        output.append(1)
        _, *_, _ = output.append(2) or "Spam"
        output.append(3)

    @jump_test(3, 4, [1, 4, 4, 5], warning=(RuntimeWarning, unbound_locals))
    def test_jump_extended_args_unpack_ex_tricky(output):
        output.append(1)
        (
            _, *_, _
        ) = output.append(4) or "Spam"
        output.append(5)

    @support.requires_resource('cpu')
    def test_jump_extended_args_for_iter(self):
        # In addition to failing when extended arg handling is broken, this can
        # also hang for a *very* long time:
        source = [
            "def f(output):",
            "    output.append(1)",
            "    for _ in spam:",
            *(f"        output.append({i})" for i in range(3, 100_000)),
            f"    output.append(100_000)",
        ]
        namespace = {}
        exec("\n".join(source), namespace)
        f = namespace["f"]
        self.run_test(f,  2, 100_000, [1, 100_000], warning=(RuntimeWarning, self.unbound_locals))

    @jump_test(2, 3, [1, 3], warning=(RuntimeWarning, unbound_locals))
    def test_jump_or_pop(output):
        output.append(1)
        _ = output.append(2) and "Spam"
        output.append(3)


class TestExtendedArgs(unittest.TestCase):

    def setUp(self):
        self.addCleanup(sys.settrace, sys.gettrace())
        sys.settrace(None)

    def count_traces(self, func):
        # warmup
        for _ in range(20):
            func()

        counts = {"call": 0, "line": 0, "return": 0}
        def trace(frame, event, arg):
            counts[event] += 1
            return trace

        sys.settrace(trace)
        func()
        sys.settrace(None)

        return counts

    def test_trace_unpack_long_sequence(self):
        ns = {}
        code = "def f():\n  (" + "y,\n   "*300 + ") = range(300)"
        exec(code, ns)
        counts = self.count_traces(ns["f"])
        self.assertEqual(counts, {'call': 1, 'line': 301, 'return': 1})

    def test_trace_lots_of_globals(self):

        count = min(1000, int(support.get_c_recursion_limit() * 0.8))

        code = """if 1:
            def f():
                return (
                    {}
                )
        """.format("\n+\n".join(f"var{i}\n" for i in range(count)))
        ns = {f"var{i}": i for i in range(count)}
        exec(code, ns)
        counts = self.count_traces(ns["f"])
        self.assertEqual(counts, {'call': 1, 'line': count * 2, 'return': 1})


class TestEdgeCases(unittest.TestCase):

    def setUp(self):
        self.addCleanup(sys.settrace, sys.gettrace())
        sys.settrace(None)

    def test_reentrancy(self):
        def foo(*args):
            ...

        def bar(*args):
            ...

        class A:
            def __call__(self, *args):
                pass

            def __del__(self):
                sys.settrace(bar)

        sys.settrace(A())
        sys.settrace(foo)
        self.assertEqual(sys.gettrace(), bar)


    def test_same_object(self):
        def foo(*args):
            ...

        sys.settrace(foo)
        del foo
        sys.settrace(sys.gettrace())


class TestLinesAfterTraceStarted(TraceTestCase):

    def test_events(self):
        tracer = Tracer()
        sys._getframe().f_trace = tracer.trace
        sys.settrace(tracer.trace)
        line = 4
        line = 5
        sys.settrace(None)
        self.compare_events(
            TestLinesAfterTraceStarted.test_events.__code__.co_firstlineno,
            tracer.events, [
                (4, 'line'),
                (5, 'line'),
                (6, 'line')])


class TestSetLocalTrace(TraceTestCase):

    def test_with_branches(self):

        def tracefunc(frame, event, arg):
            if frame.f_code.co_name == "func":
                frame.f_trace = tracefunc
                line = frame.f_lineno - frame.f_code.co_firstlineno
                events.append((line, event))
            return tracefunc

        def func(arg = 1):
            N = 1
            if arg >= 2:
                not_reached = 3
            else:
                reached = 5
            if arg >= 3:
                not_reached = 7
            else:
                reached = 9
            the_end = 10

        EXPECTED_EVENTS = [
            (0, 'call'),
            (1, 'line'),
            (2, 'line'),
            (5, 'line'),
            (6, 'line'),
            (9, 'line'),
            (10, 'line'),
            (10, 'return'),
        ]

        events = []
        sys.settrace(tracefunc)
        sys._getframe().f_trace = tracefunc
        func()
        self.assertEqual(events, EXPECTED_EVENTS)
        sys.settrace(None)


if __name__ == "__main__":
    unittest.main()