llvm/cross-project-tests/debuginfo-tests/dexter/dex/debugger/lldb/LLDB.py

# DExTer : Debugging Experience Tester
# ~~~~~~   ~         ~~         ~   ~~
#
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
"""Interface for communicating with the LLDB debugger via its python interface.
"""

import imp
import os
import shlex
from subprocess import CalledProcessError, check_output, STDOUT
import sys

from dex.debugger.DebuggerBase import DebuggerBase, watch_is_active
from dex.dextIR import FrameIR, LocIR, StepIR, StopReason, ValueIR
from dex.dextIR import StackFrame, SourceLocation, ProgramState
from dex.utils.Exceptions import DebuggerException, LoadDebuggerException
from dex.utils.ReturnCode import ReturnCode


class LLDB(DebuggerBase):
    def __init__(self, context, *args):
        self.lldb_executable = context.options.lldb_executable
        self._debugger = None
        self._target = None
        self._process = None
        self._thread = None
        # Map {id (int): condition (str)} for breakpoints which have a
        # condition. See get_triggered_breakpoint_ids usage for more info.
        self._breakpoint_conditions = {}
        super(LLDB, self).__init__(context, *args)

    def _custom_init(self):
        self._debugger = self._interface.SBDebugger.Create()
        self._debugger.SetAsync(False)
        self._target = self._debugger.CreateTargetWithFileAndArch(
            self.context.options.executable, self.context.options.arch
        )
        if not self._target:
            raise LoadDebuggerException(
                'could not create target for executable "{}" with arch:{}'.format(
                    self.context.options.executable, self.context.options.arch
                )
            )

    def _custom_exit(self):
        if getattr(self, "_process", None):
            self._process.Kill()
        if getattr(self, "_debugger", None) and getattr(self, "_target", None):
            self._debugger.DeleteTarget(self._target)

    def _translate_stop_reason(self, reason):
        if reason == self._interface.eStopReasonNone:
            return None
        if reason == self._interface.eStopReasonBreakpoint:
            return StopReason.BREAKPOINT
        if reason == self._interface.eStopReasonPlanComplete:
            return StopReason.STEP
        if reason == self._interface.eStopReasonThreadExiting:
            return StopReason.PROGRAM_EXIT
        if reason == self._interface.eStopReasonException:
            return StopReason.ERROR
        return StopReason.OTHER

    def _load_interface(self):
        try:
            args = [self.lldb_executable, "-P"]
            pythonpath = check_output(args, stderr=STDOUT).rstrip().decode("utf-8")
        except CalledProcessError as e:
            raise LoadDebuggerException(str(e), sys.exc_info())
        except OSError as e:
            raise LoadDebuggerException(
                '{} ["{}"]'.format(e.strerror, self.lldb_executable), sys.exc_info()
            )

        if not os.path.isdir(pythonpath):
            raise LoadDebuggerException(
                'path "{}" does not exist [result of {}]'.format(pythonpath, args),
                sys.exc_info(),
            )

        try:
            module_info = imp.find_module("lldb", [pythonpath])
            return imp.load_module("lldb", *module_info)
        except ImportError as e:
            msg = str(e)
            if msg.endswith("not a valid Win32 application."):
                msg = "{} [Are you mixing 32-bit and 64-bit binaries?]".format(msg)
            raise LoadDebuggerException(msg, sys.exc_info())

    @classmethod
    def get_name(cls):
        return "lldb"

    @classmethod
    def get_option_name(cls):
        return "lldb"

    @property
    def version(self):
        try:
            return self._interface.SBDebugger_GetVersionString()
        except AttributeError:
            return None

    def clear_breakpoints(self):
        self._target.DeleteAllBreakpoints()

    def _add_breakpoint(self, file_, line):
        return self._add_conditional_breakpoint(file_, line, None)

    def _add_conditional_breakpoint(self, file_, line, condition):
        bp = self._target.BreakpointCreateByLocation(file_, line)
        if not bp:
            raise DebuggerException(
                "could not add breakpoint [{}:{}]".format(file_, line)
            )
        id = bp.GetID()
        if condition:
            bp.SetCondition(condition)
            assert id not in self._breakpoint_conditions
            self._breakpoint_conditions[id] = condition
        return id

    def _evaulate_breakpoint_condition(self, id):
        """Evaluate the breakpoint condition and return the result.

        Returns True if a conditional breakpoint with the specified id cannot
        be found (i.e. assume it is an unconditional breakpoint).
        """
        try:
            condition = self._breakpoint_conditions[id]
        except KeyError:
            # This must be an unconditional breakpoint.
            return True
        valueIR = self.evaluate_expression(condition)
        return valueIR.type_name == "bool" and valueIR.value == "true"

    def get_triggered_breakpoint_ids(self):
        # Breakpoints can only have been triggered if we've hit one.
        stop_reason = self._translate_stop_reason(self._thread.GetStopReason())
        if stop_reason != StopReason.BREAKPOINT:
            return []
        breakpoint_ids = set()
        # When the stop reason is eStopReasonBreakpoint, GetStopReasonDataCount
        # counts all breakpoints associated with the location that lldb has
        # stopped at, regardless of their condition. I.e. Even if we have two
        # breakpoints at the same source location that have mutually exclusive
        # conditions, both will be counted by GetStopReasonDataCount when
        # either condition is true. Check each breakpoint condition manually to
        # filter the list down to breakpoints that have caused this stop.
        #
        # Breakpoints have two data parts: Breakpoint ID, Location ID. We're
        # only interested in the Breakpoint ID so we skip every other item.
        for i in range(0, self._thread.GetStopReasonDataCount(), 2):
            id = self._thread.GetStopReasonDataAtIndex(i)
            if self._evaulate_breakpoint_condition(id):
                breakpoint_ids.add(id)
        return breakpoint_ids

    def delete_breakpoints(self, ids):
        for id in ids:
            bp = self._target.FindBreakpointByID(id)
            if not bp:
                # The ID is not valid.
                raise KeyError
            try:
                del self._breakpoint_conditions[id]
            except KeyError:
                # This must be an unconditional breakpoint.
                pass
            self._target.BreakpointDelete(id)

    def launch(self, cmdline):
        num_resolved_breakpoints = 0
        for b in self._target.breakpoint_iter():
            num_resolved_breakpoints += b.GetNumLocations() > 0
        assert num_resolved_breakpoints > 0

        if self.context.options.target_run_args:
            cmdline += shlex.split(self.context.options.target_run_args)
        launch_info = self._target.GetLaunchInfo()
        launch_info.SetWorkingDirectory(os.getcwd())
        launch_info.SetArguments(cmdline, True)
        error = self._interface.SBError()
        self._process = self._target.Launch(launch_info, error)
        
        if error.Fail():
            raise DebuggerException(error.GetCString())
        if not os.path.exists(self._target.executable.fullpath):
            raise DebuggerException("exe does not exist")
        if not self._process or self._process.GetNumThreads() == 0:
            raise DebuggerException("could not launch process")
        if self._process.GetNumThreads() != 1:
            raise DebuggerException("multiple threads not supported")
        self._thread = self._process.GetThreadAtIndex(0)
        
        num_stopped_threads = 0
        for thread in self._process:
            if thread.GetStopReason() == self._interface.eStopReasonBreakpoint:
                num_stopped_threads += 1
        assert num_stopped_threads > 0
        assert self._thread, (self._process, self._thread)

    def step(self):
        self._thread.StepInto()
        stop_reason = self._thread.GetStopReason()
        # If we (1) completed a step and (2) are sitting at a breakpoint,
        # but (3) the breakpoint is not reported as the stop reason, then
        # we'll need to step once more to hit the breakpoint.
        #
        # dexter sets breakpoints on every source line, then steps
        # each source line. Older lldb's would overwrite the stop
        # reason with "breakpoint hit" when we stopped at a breakpoint,
        # even if the breakpoint hadn't been exectued yet.  One
        # step per source line, hitting a breakpoint each time.
        #
        # But a more accurate behavior is that the step completes
        # with step-completed stop reason, then when we step again,
        # we execute the breakpoint and stop (with the pc the same) and
        # a breakpoint-hit stop reason.  So we need to step twice per line.
        if stop_reason == self._interface.eStopReasonPlanComplete:
            stepped_to_breakpoint = False
            pc = self._thread.GetFrameAtIndex(0).GetPC()
            for bp in self._target.breakpoints:
                for bploc in bp.locations:
                    if (
                        bploc.IsEnabled()
                        and bploc.GetAddress().GetLoadAddress(self._target) == pc
                    ):
                        stepped_to_breakpoint = True
            if stepped_to_breakpoint:
                self._thread.StepInto()

    def go(self) -> ReturnCode:
        self._process.Continue()
        return ReturnCode.OK

    def _get_step_info(self, watches, step_index):
        frames = []
        state_frames = []

        for i in range(0, self._thread.GetNumFrames()):
            sb_frame = self._thread.GetFrameAtIndex(i)
            sb_line = sb_frame.GetLineEntry()
            sb_filespec = sb_line.GetFileSpec()

            try:
                path = os.path.join(
                    sb_filespec.GetDirectory(), sb_filespec.GetFilename()
                )
            except (AttributeError, TypeError):
                path = None

            function = self._sanitize_function_name(sb_frame.GetFunctionName())

            loc_dict = {
                "path": path,
                "lineno": sb_line.GetLine(),
                "column": sb_line.GetColumn(),
            }
            loc = LocIR(**loc_dict)
            valid_loc_for_watch = loc.path and os.path.exists(loc.path)

            frame = FrameIR(function=function, is_inlined=sb_frame.IsInlined(), loc=loc)

            if any(
                name in (frame.function or "")  # pylint: disable=no-member
                for name in self.frames_below_main
            ):
                break

            frames.append(frame)

            state_frame = StackFrame(
                function=frame.function,
                is_inlined=frame.is_inlined,
                location=SourceLocation(**loc_dict),
                watches={},
            )
            if valid_loc_for_watch:
                for expr in map(
                    # Filter out watches that are not active in the current frame,
                    # and then evaluate all the active watches.
                    lambda watch_info, idx=i: self.evaluate_expression(
                        watch_info.expression, idx
                    ),
                    filter(
                        lambda watch_info, idx=i, line_no=loc.lineno, loc_path=loc.path: watch_is_active(
                            watch_info, loc_path, idx, line_no
                        ),
                        watches,
                    ),
                ):
                    state_frame.watches[expr.expression] = expr
            state_frames.append(state_frame)

        if len(frames) == 1 and frames[0].function is None:
            frames = []
            state_frames = []

        reason = self._translate_stop_reason(self._thread.GetStopReason())

        return StepIR(
            step_index=step_index,
            frames=frames,
            stop_reason=reason,
            program_state=ProgramState(state_frames),
        )

    @property
    def is_running(self):
        # We're not running in async mode so this is always False.
        return False

    @property
    def is_finished(self):
        return not self._thread.GetFrameAtIndex(0)

    @property
    def frames_below_main(self):
        return ["__scrt_common_main_seh", "__libc_start_main", "__libc_start_call_main"]

    def evaluate_expression(self, expression, frame_idx=0) -> ValueIR:
        result = self._thread.GetFrameAtIndex(frame_idx).EvaluateExpression(expression)
        error_string = str(result.error)

        value = result.value
        could_evaluate = not any(
            s in error_string
            for s in [
                "Can't run the expression locally",
                "use of undeclared identifier",
                "no member named",
                "Couldn't lookup symbols",
                "Couldn't look up symbols",
                "reference to local variable",
                "invalid use of 'this' outside of a non-static member function",
            ]
        )

        is_optimized_away = any(
            s in error_string
            for s in [
                "value may have been optimized out",
            ]
        )

        is_irretrievable = any(
            s in error_string
            for s in [
                "couldn't get the value of variable",
                "couldn't read its memory",
                "couldn't read from memory",
                "Cannot access memory at address",
                "invalid address (fault address:",
            ]
        )

        if could_evaluate and not is_irretrievable and not is_optimized_away:
            assert error_string == "success", (error_string, expression, value)
            # assert result.value is not None, (result.value, expression)

        if error_string == "success":
            error_string = None

        # attempt to find expression as a variable, if found, take the variable
        # obj's type information as it's 'usually' more accurate.
        var_result = self._thread.GetFrameAtIndex(frame_idx).FindVariable(expression)
        if str(var_result.error) == "success":
            type_name = var_result.type.GetDisplayTypeName()
        else:
            type_name = result.type.GetDisplayTypeName()

        return ValueIR(
            expression=expression,
            value=value,
            type_name=type_name,
            error_string=error_string,
            could_evaluate=could_evaluate,
            is_optimized_away=is_optimized_away,
            is_irretrievable=is_irretrievable,
        )