llvm/cross-project-tests/debuginfo-tests/dexter/dex/debugger/DebuggerControllers/ConditionalController.py

# DExTer : Debugging Experience Tester
# ~~~~~~   ~         ~~         ~   ~~
#
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
"""Conditional Controller Class for DExTer.-"""


import os
import time
from collections import defaultdict
from itertools import chain

from dex.debugger.DebuggerControllers.ControllerHelpers import (
    in_source_file,
    update_step_watches,
)
from dex.debugger.DebuggerControllers.DebuggerControllerBase import (
    DebuggerControllerBase,
)
from dex.debugger.DebuggerBase import DebuggerBase
from dex.utils.Exceptions import DebuggerException
from dex.utils.Timeout import Timeout


class BreakpointRange:
    """A range of breakpoints and a set of conditions.

    The leading breakpoint (on line `range_from`) is always active.

    When the leading breakpoint is hit the trailing range should be activated
    when `expression` evaluates to any value in `values`. If there are no
    conditions (`expression` is None) then the trailing breakpoint range should
    always be activated upon hitting the leading breakpoint.

    Args:
       expression: None for no conditions, or a str expression to compare
       against `values`.

       hit_count: None for no limit, or int to set the number of times the
                  leading breakpoint is triggered before it is removed.
    """

    def __init__(
        self,
        expression: str,
        path: str,
        range_from: int,
        range_to: int,
        values: list,
        hit_count: int,
        finish_on_remove: bool,
    ):
        self.expression = expression
        self.path = path
        self.range_from = range_from
        self.range_to = range_to
        self.conditional_values = values
        self.max_hit_count = hit_count
        self.current_hit_count = 0
        self.finish_on_remove = finish_on_remove

    def has_conditions(self):
        return self.expression != None

    def get_conditional_expression_list(self):
        conditional_list = []
        for value in self.conditional_values:
            # (<expression>) == (<value>)
            conditional_expression = "({}) == ({})".format(self.expression, value)
            conditional_list.append(conditional_expression)
        return conditional_list

    def add_hit(self):
        self.current_hit_count += 1

    def should_be_removed(self):
        if self.max_hit_count == None:
            return False
        return self.current_hit_count >= self.max_hit_count


class ConditionalController(DebuggerControllerBase):
    def __init__(self, context, step_collection):
        self._bp_ranges = None
        self._watches = set()
        self._step_index = 0
        self._pause_between_steps = context.options.pause_between_steps
        self._max_steps = context.options.max_steps
        # Map {id: BreakpointRange}
        self._leading_bp_handles = {}
        super(ConditionalController, self).__init__(context, step_collection)
        self._build_bp_ranges()

    def _build_bp_ranges(self):
        commands = self.step_collection.commands
        self._bp_ranges = []
        try:
            limit_commands = commands["DexLimitSteps"]
            for lc in limit_commands:
                bpr = BreakpointRange(
                    lc.expression,
                    lc.path,
                    lc.from_line,
                    lc.to_line,
                    lc.values,
                    lc.hit_count,
                    False,
                )
                self._bp_ranges.append(bpr)
        except KeyError:
            raise DebuggerException(
                "Missing DexLimitSteps commands, cannot conditionally step."
            )
        if "DexFinishTest" in commands:
            finish_commands = commands["DexFinishTest"]
            for ic in finish_commands:
                bpr = BreakpointRange(
                    ic.expression,
                    ic.path,
                    ic.on_line,
                    ic.on_line,
                    ic.values,
                    ic.hit_count + 1,
                    True,
                )
                self._bp_ranges.append(bpr)

    def _set_leading_bps(self):
        # Set a leading breakpoint for each BreakpointRange, building a
        # map of {leading bp id: BreakpointRange}.
        for bpr in self._bp_ranges:
            if bpr.has_conditions():
                # Add a conditional breakpoint for each condition.
                for cond_expr in bpr.get_conditional_expression_list():
                    id = self.debugger.add_conditional_breakpoint(
                        bpr.path, bpr.range_from, cond_expr
                    )
                    self._leading_bp_handles[id] = bpr
            else:
                # Add an unconditional breakpoint.
                id = self.debugger.add_breakpoint(bpr.path, bpr.range_from)
                self._leading_bp_handles[id] = bpr

    def _run_debugger_custom(self, cmdline):
        # TODO: Add conditional and unconditional breakpoint support to dbgeng.
        if self.debugger.get_name() == "dbgeng":
            raise DebuggerException(
                "DexLimitSteps commands are not supported by dbgeng"
            )

        self.step_collection.clear_steps()
        self._set_leading_bps()

        for command_obj in chain.from_iterable(self.step_collection.commands.values()):
            self._watches.update(command_obj.get_watches())

        self.debugger.launch(cmdline)
        time.sleep(self._pause_between_steps)

        exit_desired = False
        timed_out = False
        total_timeout = Timeout(self.context.options.timeout_total)

        while not self.debugger.is_finished:
            breakpoint_timeout = Timeout(self.context.options.timeout_breakpoint)
            while self.debugger.is_running and not timed_out:
                # Check to see whether we've timed out while we're waiting.
                if total_timeout.timed_out():
                    self.context.logger.error(
                        "Debugger session has been "
                        f"running for {total_timeout.elapsed}s, timeout reached!"
                    )
                    timed_out = True
                if breakpoint_timeout.timed_out():
                    self.context.logger.error(
                        f"Debugger session has not "
                        f"hit a breakpoint for {breakpoint_timeout.elapsed}s, timeout "
                        "reached!"
                    )
                    timed_out = True

            if timed_out:
                break

            step_info = self.debugger.get_step_info(self._watches, self._step_index)
            if step_info.current_frame:
                self._step_index += 1
                update_step_watches(
                    step_info, self._watches, self.step_collection.commands
                )
                self.step_collection.new_step(self.context, step_info)

            bp_to_delete = []
            for bp_id in self.debugger.get_triggered_breakpoint_ids():
                try:
                    # See if this is one of our leading breakpoints.
                    bpr = self._leading_bp_handles[bp_id]
                except KeyError:
                    # This is a trailing bp. Mark it for removal.
                    bp_to_delete.append(bp_id)
                    continue

                bpr.add_hit()
                if bpr.should_be_removed():
                    if bpr.finish_on_remove:
                        exit_desired = True
                    bp_to_delete.append(bp_id)
                    del self._leading_bp_handles[bp_id]
                # Add a range of trailing breakpoints covering the lines
                # requested in the DexLimitSteps command. Ignore first line as
                # that's covered by the leading bp we just hit and include the
                # final line.
                for line in range(bpr.range_from + 1, bpr.range_to + 1):
                    self.debugger.add_breakpoint(bpr.path, line)

            # Remove any trailing or expired leading breakpoints we just hit.
            self.debugger.delete_breakpoints(bp_to_delete)

            if exit_desired:
                break
            self.debugger.go()
            time.sleep(self._pause_between_steps)