llvm/lldb/test/API/functionalities/gdb_remote_client/TestGDBRemoteClient.py

import lldb
import binascii
import os.path
from lldbsuite.test.lldbtest import *
from lldbsuite.test.decorators import *
from lldbsuite.test.gdbclientutils import *
from lldbsuite.test.lldbgdbclient import GDBRemoteTestBase


class TestGDBRemoteClient(GDBRemoteTestBase):
    class gPacketResponder(MockGDBServerResponder):
        registers = [
            "name:rax;bitsize:64;offset:0;encoding:uint;format:hex;set:General Purpose Registers;ehframe:0;dwarf:0;",
            "name:rbx;bitsize:64;offset:8;encoding:uint;format:hex;set:General Purpose Registers;ehframe:3;dwarf:3;",
            "name:rcx;bitsize:64;offset:16;encoding:uint;format:hex;set:General Purpose Registers;ehframe:2;dwarf:2;generic:arg4;",
            "name:rdx;bitsize:64;offset:24;encoding:uint;format:hex;set:General Purpose Registers;ehframe:1;dwarf:1;generic:arg3;",
            "name:rdi;bitsize:64;offset:32;encoding:uint;format:hex;set:General Purpose Registers;ehframe:5;dwarf:5;generic:arg1;",
            "name:rsi;bitsize:64;offset:40;encoding:uint;format:hex;set:General Purpose Registers;ehframe:4;dwarf:4;generic:arg2;",
            "name:rbp;bitsize:64;offset:48;encoding:uint;format:hex;set:General Purpose Registers;ehframe:6;dwarf:6;generic:fp;",
            "name:rsp;bitsize:64;offset:56;encoding:uint;format:hex;set:General Purpose Registers;ehframe:7;dwarf:7;generic:sp;",
        ]

        def qRegisterInfo(self, num):
            try:
                return self.registers[num]
            except IndexError:
                return "E45"

        def readRegisters(self):
            return len(self.registers) * 16 * "0"

        def readRegister(self, register):
            return "0000000000000000"

    def test_connect(self):
        """Test connecting to a remote gdb server"""
        target = self.createTarget("a.yaml")
        process = self.connect(target)
        self.assertPacketLogContains(["qProcessInfo", "qfThreadInfo"])

    def test_attach_fail(self):
        error_msg = "mock-error-msg"

        class MyResponder(MockGDBServerResponder):
            # Pretend we don't have any process during the initial queries.
            def qC(self):
                return "E42"

            def qfThreadInfo(self):
                return "OK"  # No threads.

            # Then, when we are asked to attach, error out.
            def vAttach(self, pid):
                return "E42;" + binascii.hexlify(error_msg.encode()).decode()

        self.server.responder = MyResponder()

        target = self.dbg.CreateTarget("")
        process = self.connect(target)
        lldbutil.expect_state_changes(
            self, self.dbg.GetListener(), process, [lldb.eStateConnected]
        )

        error = lldb.SBError()
        target.AttachToProcessWithID(lldb.SBListener(), 47, error)
        self.assertEqual(error_msg, error.GetCString())

    def test_launch_fail(self):
        class MyResponder(MockGDBServerResponder):
            # Pretend we don't have any process during the initial queries.
            def qC(self):
                return "E42"

            def qfThreadInfo(self):
                return "OK"  # No threads.

            # Then, when we are asked to attach, error out.
            def A(self, packet):
                return "E47"

        self.server.responder = MyResponder()

        target = self.createTarget("a.yaml")
        process = self.connect(target)
        lldbutil.expect_state_changes(
            self, self.dbg.GetListener(), process, [lldb.eStateConnected]
        )

        error = lldb.SBError()
        target.Launch(
            lldb.SBListener(), None, None, None, None, None, None, 0, True, error
        )
        self.assertRegex(error.GetCString(), "Cannot launch '.*a': Error 71")

    def test_launch_rich_error(self):
        class MyResponder(MockGDBServerResponder):
            def qC(self):
                return "E42"

            def qfThreadInfo(self):
                return "OK"  # No threads.

            # Then, when we are asked to attach, error out.
            def vRun(self, packet):
                return "Eff;" + seven.hexlify("I'm a teapot")

        self.server.responder = MyResponder()

        target = self.createTarget("a.yaml")
        process = self.connect(target)
        lldbutil.expect_state_changes(
            self, self.dbg.GetListener(), process, [lldb.eStateConnected]
        )

        error = lldb.SBError()
        target.Launch(
            lldb.SBListener(), None, None, None, None, None, None, 0, True, error
        )
        self.assertRegex(error.GetCString(), "Cannot launch '.*a': I'm a teapot")

    def test_read_registers_using_g_packets(self):
        """Test reading registers using 'g' packets (default behavior)"""
        self.dbg.HandleCommand(
            "settings set plugin.process.gdb-remote.use-g-packet-for-reading true"
        )
        self.addTearDownHook(
            lambda: self.runCmd(
                "settings set plugin.process.gdb-remote.use-g-packet-for-reading false"
            )
        )
        self.server.responder = self.gPacketResponder()
        target = self.createTarget("a.yaml")
        process = self.connect(target)

        self.assertEqual(1, self.server.responder.packetLog.count("g"))
        self.server.responder.packetLog = []
        self.read_registers(process)
        # Reading registers should not cause any 'p' packets to be exchanged.
        self.assertEqual(
            0, len([p for p in self.server.responder.packetLog if p.startswith("p")])
        )

    def test_read_registers_using_p_packets(self):
        """Test reading registers using 'p' packets"""
        self.dbg.HandleCommand(
            "settings set plugin.process.gdb-remote.use-g-packet-for-reading false"
        )
        self.server.responder = self.gPacketResponder()
        target = self.createTarget("a.yaml")
        process = self.connect(target)

        self.read_registers(process)
        self.assertNotIn("g", self.server.responder.packetLog)
        self.assertGreater(
            len([p for p in self.server.responder.packetLog if p.startswith("p")]), 0
        )

    def test_write_registers_using_P_packets(self):
        """Test writing registers using 'P' packets (default behavior)"""
        self.server.responder = self.gPacketResponder()
        target = self.createTarget("a.yaml")
        process = self.connect(target)

        self.write_registers(process)
        self.assertEqual(
            0, len([p for p in self.server.responder.packetLog if p.startswith("G")])
        )
        self.assertGreater(
            len([p for p in self.server.responder.packetLog if p.startswith("P")]), 0
        )

    def test_write_registers_using_G_packets(self):
        """Test writing registers using 'G' packets"""

        class MyResponder(self.gPacketResponder):
            def readRegister(self, register):
                # empty string means unsupported
                return ""

        self.server.responder = MyResponder()
        target = self.createTarget("a.yaml")
        process = self.connect(target)

        self.write_registers(process)
        self.assertEqual(
            0, len([p for p in self.server.responder.packetLog if p.startswith("P")])
        )
        self.assertGreater(
            len([p for p in self.server.responder.packetLog if p.startswith("G")]), 0
        )

    def read_registers(self, process):
        self.for_each_gpr(
            process, lambda r: self.assertEqual("0x0000000000000000", r.GetValue())
        )

    def write_registers(self, process):
        self.for_each_gpr(
            process, lambda r: r.SetValueFromCString("0x0000000000000000")
        )

    def for_each_gpr(self, process, operation):
        registers = process.GetThreadAtIndex(0).GetFrameAtIndex(0).GetRegisters()
        self.assertGreater(registers.GetSize(), 0)
        regSet = registers[0]
        numChildren = regSet.GetNumChildren()
        self.assertGreater(numChildren, 0)
        for i in range(numChildren):
            operation(regSet.GetChildAtIndex(i))

    def test_launch_A(self):
        class MyResponder(MockGDBServerResponder):
            def __init__(self, *args, **kwargs):
                self.started = False
                return super().__init__(*args, **kwargs)

            def qC(self):
                if self.started:
                    return "QCp10.10"
                else:
                    return "E42"

            def qfThreadInfo(self):
                if self.started:
                    return "mp10.10"
                else:
                    return "E42"

            def qsThreadInfo(self):
                return "l"

            def A(self, packet):
                self.started = True
                return "OK"

            def qLaunchSuccess(self):
                if self.started:
                    return "OK"
                return "E42"

        self.server.responder = MyResponder()

        target = self.createTarget("a.yaml")
        # NB: apparently GDB packets are using "/" on Windows too
        exe_path = self.getBuildArtifact("a").replace(os.path.sep, "/")
        exe_hex = binascii.b2a_hex(exe_path.encode()).decode()
        process = self.connect(target)
        lldbutil.expect_state_changes(
            self, self.dbg.GetListener(), process, [lldb.eStateConnected]
        )

        target.Launch(
            lldb.SBListener(),
            ["arg1", "arg2", "arg3"],  # argv
            [],  # envp
            None,  # stdin_path
            None,  # stdout_path
            None,  # stderr_path
            None,  # working_directory
            0,  # launch_flags
            True,  # stop_at_entry
            lldb.SBError(),
        )  # error
        self.assertTrue(process, PROCESS_IS_VALID)
        self.assertEqual(process.GetProcessID(), 16)

        self.assertPacketLogContains(
            [
                "A%d,0,%s,8,1,61726731,8,2,61726732,8,3,61726733"
                % (len(exe_hex), exe_hex),
            ]
        )

    def test_launch_vRun(self):
        class MyResponder(MockGDBServerResponder):
            def __init__(self, *args, **kwargs):
                self.started = False
                return super().__init__(*args, **kwargs)

            def qC(self):
                if self.started:
                    return "QCp10.10"
                else:
                    return "E42"

            def qfThreadInfo(self):
                if self.started:
                    return "mp10.10"
                else:
                    return "E42"

            def qsThreadInfo(self):
                return "l"

            def vRun(self, packet):
                self.started = True
                return "T13"

            def A(self, packet):
                return "E28"

        self.server.responder = MyResponder()

        target = self.createTarget("a.yaml")
        # NB: apparently GDB packets are using "/" on Windows too
        exe_path = self.getBuildArtifact("a").replace(os.path.sep, "/")
        exe_hex = binascii.b2a_hex(exe_path.encode()).decode()
        process = self.connect(target)
        lldbutil.expect_state_changes(
            self, self.dbg.GetListener(), process, [lldb.eStateConnected]
        )

        process = target.Launch(
            lldb.SBListener(),
            ["arg1", "arg2", "arg3"],  # argv
            [],  # envp
            None,  # stdin_path
            None,  # stdout_path
            None,  # stderr_path
            None,  # working_directory
            0,  # launch_flags
            True,  # stop_at_entry
            lldb.SBError(),
        )  # error
        self.assertTrue(process, PROCESS_IS_VALID)
        self.assertEqual(process.GetProcessID(), 16)

        self.assertPacketLogContains(
            ["vRun;%s;61726731;61726732;61726733" % (exe_hex,)]
        )

    def test_launch_QEnvironment(self):
        class MyResponder(MockGDBServerResponder):
            def qC(self):
                return "E42"

            def qfThreadInfo(self):
                return "E42"

            def vRun(self, packet):
                self.started = True
                return "E28"

        self.server.responder = MyResponder()

        target = self.createTarget("a.yaml")
        process = self.connect(target)
        lldbutil.expect_state_changes(
            self, self.dbg.GetListener(), process, [lldb.eStateConnected]
        )

        target.Launch(
            lldb.SBListener(),
            [],  # argv
            [
                "PLAIN=foo",
                "NEEDSENC=frob$",
                "NEEDSENC2=fr*ob",
                "NEEDSENC3=fro}b",
                "NEEDSENC4=f#rob",
                "EQUALS=foo=bar",
            ],  # envp
            None,  # stdin_path
            None,  # stdout_path
            None,  # stderr_path
            None,  # working_directory
            0,  # launch_flags
            True,  # stop_at_entry
            lldb.SBError(),
        )  # error

        self.assertPacketLogContains(
            [
                "QEnvironment:EQUALS=foo=bar",
                "QEnvironmentHexEncoded:4e45454453454e433d66726f6224",
                "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62",
                "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62",
                "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62",
                "QEnvironment:PLAIN=foo",
            ]
        )

    def test_launch_QEnvironmentHexEncoded_only(self):
        class MyResponder(MockGDBServerResponder):
            def qC(self):
                return "E42"

            def qfThreadInfo(self):
                return "E42"

            def vRun(self, packet):
                self.started = True
                return "E28"

            def QEnvironment(self, packet):
                return ""

        self.server.responder = MyResponder()

        target = self.createTarget("a.yaml")
        process = self.connect(target)
        lldbutil.expect_state_changes(
            self, self.dbg.GetListener(), process, [lldb.eStateConnected]
        )

        target.Launch(
            lldb.SBListener(),
            [],  # argv
            [
                "PLAIN=foo",
                "NEEDSENC=frob$",
                "NEEDSENC2=fr*ob",
                "NEEDSENC3=fro}b",
                "NEEDSENC4=f#rob",
                "EQUALS=foo=bar",
            ],  # envp
            None,  # stdin_path
            None,  # stdout_path
            None,  # stderr_path
            None,  # working_directory
            0,  # launch_flags
            True,  # stop_at_entry
            lldb.SBError(),
        )  # error

        self.assertPacketLogContains(
            [
                "QEnvironmentHexEncoded:455155414c533d666f6f3d626172",
                "QEnvironmentHexEncoded:4e45454453454e433d66726f6224",
                "QEnvironmentHexEncoded:4e45454453454e43323d66722a6f62",
                "QEnvironmentHexEncoded:4e45454453454e43333d66726f7d62",
                "QEnvironmentHexEncoded:4e45454453454e43343d6623726f62",
                "QEnvironmentHexEncoded:504c41494e3d666f6f",
            ]
        )

    def test_detach_no_multiprocess(self):
        class MyResponder(MockGDBServerResponder):
            def __init__(self):
                super().__init__()
                self.detached = None

            def qfThreadInfo(self):
                return "10200"

            def D(self, packet):
                self.detached = packet
                return "OK"

        self.server.responder = MyResponder()
        target = self.dbg.CreateTarget("")
        process = self.connect(target)
        process.Detach()
        self.assertEqual(self.server.responder.detached, "D")

    def test_detach_pid(self):
        class MyResponder(MockGDBServerResponder):
            def __init__(self, test_case):
                super().__init__()
                self.test_case = test_case
                self.detached = None

            def qSupported(self, client_supported):
                self.test_case.assertIn("multiprocess+", client_supported)
                return "multiprocess+;" + super().qSupported(client_supported)

            def qfThreadInfo(self):
                return "mp400.10200"

            def D(self, packet):
                self.detached = packet
                return "OK"

        self.server.responder = MyResponder(self)
        target = self.dbg.CreateTarget("")
        process = self.connect(target)
        process.Detach()
        self.assertRegex(self.server.responder.detached, r"D;0*400")

    def test_signal_gdb(self):
        class MyResponder(MockGDBServerResponder):
            def qSupported(self, client_supported):
                return "PacketSize=3fff;QStartNoAckMode+"

            def haltReason(self):
                return "S0a"

            def cont(self):
                return self.haltReason()

        self.server.responder = MyResponder()

        self.runCmd("platform select remote-linux")
        target = self.createTarget("a.yaml")
        process = self.connect(target)

        self.assertEqual(process.threads[0].GetStopReason(), lldb.eStopReasonSignal)
        self.assertEqual(process.threads[0].GetStopDescription(100), "signal SIGBUS")

    def test_signal_lldb_old(self):
        class MyResponder(MockGDBServerResponder):
            def qSupported(self, client_supported):
                return "PacketSize=3fff;QStartNoAckMode+"

            def qHostInfo(self):
                return "triple:61726d76372d756e6b6e6f776e2d6c696e75782d676e75;"

            def QThreadSuffixSupported(self):
                return "OK"

            def haltReason(self):
                return "S0a"

            def cont(self):
                return self.haltReason()

        self.server.responder = MyResponder()

        self.runCmd("platform select remote-linux")
        target = self.createTarget("a.yaml")
        process = self.connect(target)

        self.assertEqual(process.threads[0].GetStopReason(), lldb.eStopReasonSignal)
        self.assertEqual(process.threads[0].GetStopDescription(100), "signal SIGUSR1")

    def test_signal_lldb(self):
        class MyResponder(MockGDBServerResponder):
            def qSupported(self, client_supported):
                return "PacketSize=3fff;QStartNoAckMode+;native-signals+"

            def qHostInfo(self):
                return "triple:61726d76372d756e6b6e6f776e2d6c696e75782d676e75;"

            def haltReason(self):
                return "S0a"

            def cont(self):
                return self.haltReason()

        self.server.responder = MyResponder()

        self.runCmd("platform select remote-linux")
        target = self.createTarget("a.yaml")
        process = self.connect(target)

        self.assertEqual(process.threads[0].GetStopReason(), lldb.eStopReasonSignal)
        self.assertEqual(process.threads[0].GetStopDescription(100), "signal SIGUSR1")

    def do_siginfo_test(self, platform, target_yaml, raw_data, expected):
        class MyResponder(MockGDBServerResponder):
            def qSupported(self, client_supported):
                return "PacketSize=3fff;QStartNoAckMode+;qXfer:siginfo:read+"

            def qXferRead(self, obj, annex, offset, length):
                if obj == "siginfo":
                    return raw_data, False
                else:
                    return None, False

            def haltReason(self):
                return "T02"

            def cont(self):
                return self.haltReason()

        self.server.responder = MyResponder()

        self.runCmd("platform select " + platform)
        target = self.createTarget(target_yaml)
        process = self.connect(target)

        siginfo = process.threads[0].GetSiginfo()
        self.assertSuccess(siginfo.GetError())

        for key, value in expected.items():
            self.assertEqual(
                siginfo.GetValueForExpressionPath("." + key).GetValueAsUnsigned(), value
            )

    def test_siginfo_linux_amd64(self):
        data = (
            # si_signo         si_errno        si_code
            "\x11\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00"
            # __pad0           si_pid          si_uid
            "\x00\x00\x00\x00\xbf\xf7\x0b\x00\xe8\x03\x00\x00"
            # si_status
            "\x0c\x00\x00\x00"
            + "\x00" * 100
        )
        expected = {
            "si_signo": 17,  # SIGCHLD
            "si_errno": 0,
            "si_code": 1,  # CLD_EXITED
            "_sifields._sigchld.si_pid": 784319,
            "_sifields._sigchld.si_uid": 1000,
            "_sifields._sigchld.si_status": 12,
            "_sifields._sigchld.si_utime": 0,
            "_sifields._sigchld.si_stime": 0,
        }
        self.do_siginfo_test("remote-linux", "basic_eh_frame.yaml", data, expected)

    def test_siginfo_linux_i386(self):
        data = (
            # si_signo         si_errno        si_code
            "\x11\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00"
            # si_pid           si_uid          si_status
            "\x49\x43\x07\x00\xe8\x03\x00\x00\x0c\x00\x00\x00"
            + "\x00" * 104
        )
        expected = {
            "si_signo": 17,  # SIGCHLD
            "si_errno": 0,
            "si_code": 1,  # CLD_EXITED
            "_sifields._sigchld.si_pid": 475977,
            "_sifields._sigchld.si_uid": 1000,
            "_sifields._sigchld.si_status": 12,
            "_sifields._sigchld.si_utime": 0,
            "_sifields._sigchld.si_stime": 0,
        }
        self.do_siginfo_test("remote-linux", "basic_eh_frame-i386.yaml", data, expected)

    def test_siginfo_freebsd_amd64(self):
        data = (
            # si_signo         si_errno        si_code
            "\x0b\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00"
            # si_pid           si_uid          si_status
            "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
            # si_addr
            "\x76\x98\xba\xdc\xfe\x00\x00\x00"
            # si_status                        si_trapno
            "\x00\x00\x00\x00\x00\x00\x00\x00\x0c\x00\x00\x00"
            + "\x00" * 36
        )

        expected = {
            "si_signo": 11,  # SIGSEGV
            "si_errno": 0,
            "si_code": 1,  # SEGV_MAPERR
            "si_addr": 0xFEDCBA9876,
            "_reason._fault._trapno": 12,
        }
        self.do_siginfo_test("remote-freebsd", "basic_eh_frame.yaml", data, expected)