llvm/lldb/test/API/functionalities/interactive_scripted_process/interactive_scripted_process.py

# Usage:
# ./bin/lldb $LLVM/lldb/test/API/functionalities/interactive_scripted_process/main \
#   -o "br set -p 'Break here'" \
#   -o "command script import $LLVM/lldb/test/API/functionalities/interactive_scripted_process/interactive_scripted_process.py" \
#   -o "create_mux" \
#   -o "create_sub" \
#   -o "br set -p 'also break here'" -o 'continue'

import os, json, struct, signal, tempfile

from threading import Thread
from typing import Any, Dict

import lldb
from lldb.plugins.scripted_process import PassthroughScriptedProcess
from lldb.plugins.scripted_process import PassthroughScriptedThread


class MultiplexedScriptedProcess(PassthroughScriptedProcess):
    def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData):
        super().__init__(exe_ctx, args)
        self.multiplexer = None
        if isinstance(self.driving_process, lldb.SBProcess) and self.driving_process:
            parity = args.GetValueForKey("parity")
            # TODO: Change to Walrus operator (:=) with oneline if assignment
            # Requires python 3.8
            val = parity.GetUnsignedIntegerValue()
            if val is not None:
                self.parity = val

            # Turn PassthroughScriptedThread into MultiplexedScriptedThread
            for thread in self.threads.values():
                thread.__class__ = MultiplexedScriptedThread

    def get_process_id(self) -> int:
        return self.parity + 420

    def launch(self, should_stop: bool = True) -> lldb.SBError:
        self.first_launch = True
        return lldb.SBError()

    def resume(self, should_stop: bool) -> lldb.SBError:
        if self.first_launch:
            self.first_launch = False
            return super().resume()
        else:
            if not self.multiplexer:
                error = lldb.SBError("Multiplexer is not set.")
                return error
            return self.multiplexer.resume(should_stop)

    def get_threads_info(self) -> Dict[int, Any]:
        if not self.multiplexer:
            return super().get_threads_info()
        filtered_threads = self.multiplexer.get_threads_info(pid=self.get_process_id())
        # Update the filtered thread class from PassthroughScriptedThread to MultiplexedScriptedThread
        return dict(
            map(
                lambda pair: (pair[0], MultiplexedScriptedThread(pair[1])),
                filtered_threads.items(),
            )
        )

    def create_breakpoint(self, addr, error, pid=None):
        if not self.multiplexer:
            error.SetErrorString("Multiplexer is not set.")
        return self.multiplexer.create_breakpoint(addr, error, self.get_process_id())

    def get_scripted_thread_plugin(self) -> str:
        return f"{MultiplexedScriptedThread.__module__}.{MultiplexedScriptedThread.__name__}"


class MultiplexedScriptedThread(PassthroughScriptedThread):
    def get_name(self) -> str:
        parity = "Odd" if self.scripted_process.parity % 2 else "Even"
        return f"{parity}{MultiplexedScriptedThread.__name__}.thread-{self.idx}"


class MultiplexerScriptedProcess(PassthroughScriptedProcess):
    listener = None
    multiplexed_processes = None

    def wait_for_driving_process_to_stop(self):
        def handle_process_state_event():
            # Update multiplexer process
            log("Updating interactive scripted process threads")
            dbg = self.driving_target.GetDebugger()
            new_driving_thread_ids = []
            for driving_thread in self.driving_process:
                new_driving_thread_ids.append(driving_thread.id)
                log(f"{len(self.threads)} New thread {hex(driving_thread.id)}")
                structured_data = lldb.SBStructuredData()
                structured_data.SetFromJSON(
                    json.dumps(
                        {
                            "driving_target_idx": dbg.GetIndexOfTarget(
                                self.driving_target
                            ),
                            "thread_idx": driving_thread.GetIndexID(),
                        }
                    )
                )

                self.threads[driving_thread.id] = PassthroughScriptedThread(
                    self, structured_data
                )

            for thread_id in self.threads:
                if thread_id not in new_driving_thread_ids:
                    log(f"Removing old thread {hex(thread_id)}")
                    del self.threads[thread_id]

            print(f"New thread count: {len(self.threads)}")

            mux_process = self.target.GetProcess()
            mux_process.ForceScriptedState(lldb.eStateRunning)
            mux_process.ForceScriptedState(lldb.eStateStopped)

            for child_process in self.multiplexed_processes.values():
                child_process.ForceScriptedState(lldb.eStateRunning)
                child_process.ForceScriptedState(lldb.eStateStopped)

        event = lldb.SBEvent()
        while True:
            if not self.driving_process:
                continue
            if self.listener.WaitForEvent(1, event):
                event_mask = event.GetType()
                if event_mask & lldb.SBProcess.eBroadcastBitStateChanged:
                    state = lldb.SBProcess.GetStateFromEvent(event)
                    log(f"Received public process state event: {state}")
                    if state == lldb.eStateStopped:
                        # If it's a stop event, iterate over the driving process
                        # thread, looking for a breakpoint stop reason, if internal
                        # continue.
                        handle_process_state_event()
            else:
                continue

    def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData):
        super().__init__(exe_ctx, args, launched_driving_process=False)
        if isinstance(self.driving_target, lldb.SBTarget) and self.driving_target:
            self.listener = lldb.SBListener(
                "lldb.listener.multiplexer-scripted-process"
            )
            self.multiplexed_processes = {}

            # Copy breakpoints from real target to passthrough
            with tempfile.NamedTemporaryFile() as tf:
                bkpt_file = lldb.SBFileSpec(tf.name)
                error = self.driving_target.BreakpointsWriteToFile(bkpt_file)
                if error.Fail():
                    log(
                        "Failed to save breakpoints from driving target (%s)"
                        % error.GetCString()
                    )
                bkpts_list = lldb.SBBreakpointList(self.target)
                error = self.target.BreakpointsCreateFromFile(bkpt_file, bkpts_list)
                if error.Fail():
                    log(
                        "Failed create breakpoints from driving target \
                        (bkpt file: %s)"
                        % tf.name
                    )

            # Copy breakpoint from passthrough to real target
            if error.Success():
                self.driving_target.DeleteAllBreakpoints()
                for bkpt in self.target.breakpoints:
                    if bkpt.IsValid():
                        for bl in bkpt:
                            real_bpkt = self.driving_target.BreakpointCreateBySBAddress(
                                bl.GetAddress()
                            )
                            if not real_bpkt.IsValid():
                                log(
                                    "Failed to set breakpoint at address %s in \
                                    driving target"
                                    % hex(bl.GetLoadAddress())
                                )

            self.listener_thread = Thread(
                target=self.wait_for_driving_process_to_stop, daemon=True
            )
            self.listener_thread.start()

    def launch(self, should_stop: bool = True) -> lldb.SBError:
        if not self.driving_target:
            return lldb.SBError(
                f"{self.__class__.__name__}.resume: Invalid driving target."
            )

        if self.driving_process:
            return lldb.SBError(
                f"{self.__class__.__name__}.resume: Invalid driving process."
            )

        error = lldb.SBError()
        launch_info = lldb.SBLaunchInfo(None)
        launch_info.SetListener(self.listener)
        driving_process = self.driving_target.Launch(launch_info, error)

        if not driving_process or error.Fail():
            return error

        self.driving_process = driving_process

        for module in self.driving_target.modules:
            path = module.file.fullpath
            load_addr = module.GetObjectFileHeaderAddress().GetLoadAddress(
                self.driving_target
            )
            self.loaded_images.append({"path": path, "load_addr": load_addr})

        self.first_resume = True
        return error

    def resume(self, should_stop: bool = True) -> lldb.SBError:
        if self.first_resume:
            # When we resume the multiplexer process for the first time,
            # we shouldn't do anything because lldb's execution machinery
            # will resume the driving process by itself.

            # Also, no need to update the multiplexer scripted process state
            # here because since it's listening for the real process stop events.
            # Once it receives the stop event from the driving process,
            # `wait_for_driving_process_to_stop` will update the multiplexer
            # state for us.

            self.first_resume = False
            return lldb.SBError()

        if not self.driving_process:
            return lldb.SBError(
                f"{self.__class__.__name__}.resume: Invalid driving process."
            )

        return self.driving_process.Continue()

    def get_threads_info(self, pid: int = None) -> Dict[int, Any]:
        if not pid:
            return super().get_threads_info()
        parity = pid % 2
        return dict(filter(lambda pair: pair[0] % 2 == parity, self.threads.items()))

    def create_breakpoint(self, addr, error, pid=None):
        if not self.driving_target:
            error.SetErrorString("%s has no driving target." % self.__class__.__name__)
            return False

        def create_breakpoint_with_name(target, load_addr, name, error):
            addr = lldb.SBAddress(load_addr, target)
            if not addr.IsValid():
                error.SetErrorString("Invalid breakpoint address %s" % hex(load_addr))
                return False
            bkpt = target.BreakpointCreateBySBAddress(addr)
            if not bkpt.IsValid():
                error.SetErrorString(
                    "Failed to create breakpoint at address %s"
                    % hex(addr.GetLoadAddress())
                )
                return False
            error = bkpt.AddNameWithErrorHandling(name)
            return error.Success()

        name = (
            "multiplexer_scripted_process"
            if not pid
            else f"multiplexed_scripted_process_{pid}"
        )

        if pid is not None:
            # This means that this method has been called from one of the
            # multiplexed scripted process. That also means that the multiplexer
            # target doesn't have this breakpoint created.
            mux_error = lldb.SBError()
            bkpt = create_breakpoint_with_name(self.target, addr, name, mux_error)
            if mux_error.Fail():
                error.SetError(
                    "Failed to create breakpoint in multiplexer \
                               target: %s"
                    % mux_error.GetCString()
                )
                return False
        return create_breakpoint_with_name(self.driving_target, addr, name, error)


def multiplex(mux_process, muxed_process):
    muxed_process.GetScriptedImplementation().multiplexer = (
        mux_process.GetScriptedImplementation()
    )
    mux_process.GetScriptedImplementation().multiplexed_processes[
        muxed_process.GetProcessID()
    ] = muxed_process


def launch_scripted_process(target, class_name, dictionary):
    structured_data = lldb.SBStructuredData()
    structured_data.SetFromJSON(json.dumps(dictionary))

    launch_info = lldb.SBLaunchInfo(None)
    launch_info.SetProcessPluginName("ScriptedProcess")
    launch_info.SetScriptedProcessClassName(class_name)
    launch_info.SetScriptedProcessDictionary(structured_data)

    error = lldb.SBError()
    return target.Launch(launch_info, error)


def duplicate_target(driving_target):
    error = lldb.SBError()
    exe = driving_target.executable.fullpath
    triple = driving_target.triple
    debugger = driving_target.GetDebugger()
    return debugger.CreateTargetWithFileAndTargetTriple(exe, triple)


def create_mux_process(debugger, command, exe_ctx, result, dict):
    if not debugger.GetNumTargets() > 0:
        return result.SetError(
            "Interactive scripted processes requires one non scripted process."
        )

    debugger.SetAsync(True)

    driving_target = debugger.GetSelectedTarget()
    if not driving_target:
        return result.SetError("Driving target is invalid")

    # Create a seconde target for the multiplexer scripted process
    mux_target = duplicate_target(driving_target)
    if not mux_target:
        return result.SetError(
            "Couldn't duplicate driving target to launch multiplexer scripted process"
        )

    class_name = f"{__name__}.{MultiplexerScriptedProcess.__name__}"
    dictionary = {"driving_target_idx": debugger.GetIndexOfTarget(driving_target)}
    mux_process = launch_scripted_process(mux_target, class_name, dictionary)
    if not mux_process:
        return result.SetError("Couldn't launch multiplexer scripted process")


def create_child_processes(debugger, command, exe_ctx, result, dict):
    if not debugger.GetNumTargets() >= 2:
        return result.SetError("Scripted Multiplexer process not setup")

    debugger.SetAsync(True)

    # Create a seconde target for the multiplexer scripted process
    mux_target = debugger.GetSelectedTarget()
    if not mux_target:
        return result.SetError("Couldn't get multiplexer scripted process target")
    mux_process = mux_target.GetProcess()
    if not mux_process:
        return result.SetError("Couldn't get multiplexer scripted process")

    driving_target = mux_process.GetScriptedImplementation().driving_target
    if not driving_target:
        return result.SetError("Driving target is invalid")

    # Create a target for the multiplexed even scripted process
    even_target = duplicate_target(driving_target)
    if not even_target:
        return result.SetError(
            "Couldn't duplicate driving target to launch multiplexed even scripted process"
        )

    class_name = f"{__name__}.{MultiplexedScriptedProcess.__name__}"
    dictionary = {"driving_target_idx": debugger.GetIndexOfTarget(mux_target)}
    dictionary["parity"] = 0
    even_process = launch_scripted_process(even_target, class_name, dictionary)
    if not even_process:
        return result.SetError("Couldn't launch multiplexed even scripted process")
    multiplex(mux_process, even_process)

    # Create a target for the multiplexed odd scripted process
    odd_target = duplicate_target(driving_target)
    if not odd_target:
        return result.SetError(
            "Couldn't duplicate driving target to launch multiplexed odd scripted process"
        )

    dictionary["parity"] = 1
    odd_process = launch_scripted_process(odd_target, class_name, dictionary)
    if not odd_process:
        return result.SetError("Couldn't launch multiplexed odd scripted process")
    multiplex(mux_process, odd_process)


def log(message):
    # FIXME: For now, we discard the log message until we can pass it to an lldb
    # logging channel.
    should_log = False
    if should_log:
        print(message)


def __lldb_init_module(dbg, dict):
    dbg.HandleCommand(
        "command script add -o -f interactive_scripted_process.create_mux_process create_mux"
    )
    dbg.HandleCommand(
        "command script add -o -f interactive_scripted_process.create_child_processes create_sub"
    )