llvm/lldb/test/API/tools/lldb-server/attach-wait/TestGdbRemoteAttachWait.py

import os
from time import sleep

import gdbremote_testcase
import lldbgdbserverutils
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbsuite.test import lldbutil


class TestGdbRemoteAttachWait(gdbremote_testcase.GdbRemoteTestCaseBase):
    def _set_up_inferior(self):
        self._exe_to_attach = "%s_%d" % (self.testMethodName, os.getpid())
        self.build(dictionary={"EXE": self._exe_to_attach, "CXX_SOURCES": "main.cpp"})

        if self.getPlatform() != "windows":
            # Use a shim to ensure that the process is ready to be attached from
            # the get-go.
            self._exe_to_run = "shim"
            self._exe_to_attach = lldbutil.install_to_target(
                self, self.getBuildArtifact(self._exe_to_attach)
            )
            self._run_args = [self._exe_to_attach]
            self.build(dictionary={"EXE": self._exe_to_run, "CXX_SOURCES": "shim.cpp"})
        else:
            self._exe_to_run = self._exe_to_attach
            self._run_args = []

    def _launch_inferior(self, args):
        inferior = self.spawnSubprocess(self.getBuildArtifact(self._exe_to_run), args)
        self.assertIsNotNone(inferior)
        self.assertGreater(inferior.pid, 0)
        self.assertTrue(lldbgdbserverutils.process_is_running(inferior.pid, True))
        return inferior

    def _launch_and_wait_for_init(self):
        sync_file_path = lldbutil.append_to_process_working_directory(
            self, "process_ready"
        )
        inferior = self._launch_inferior(self._run_args + [sync_file_path])
        lldbutil.wait_for_file_on_target(self, sync_file_path)
        return inferior

    def _attach_packet(self, packet_type):
        return "read packet: ${};{}#00".format(
            packet_type,
            lldbgdbserverutils.gdbremote_hex_encode_string(self._exe_to_attach),
        )

    @skipIfWindows  # This test is flaky on Windows
    def test_attach_with_vAttachWait(self):
        self._set_up_inferior()

        self.set_inferior_startup_attach_manually()
        server = self.connect_to_debug_monitor()
        self.do_handshake()

        # Launch the first inferior (we shouldn't attach to this one).
        self._launch_and_wait_for_init()

        self.test_sequence.add_log_lines([self._attach_packet("vAttachWait")], True)
        # Run the stream until attachWait.
        context = self.expect_gdbremote_sequence()
        self.assertIsNotNone(context)

        # Sleep so we're sure that the inferior is launched after we ask for the attach.
        sleep(1)

        # Launch the second inferior (we SHOULD attach to this one).
        inferior_to_attach = self._launch_inferior(self._run_args)

        # Make sure the attach succeeded.
        self.test_sequence.add_log_lines(
            [
                {
                    "direction": "send",
                    "regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
                    "capture": {1: "stop_signal_hex"},
                },
            ],
            True,
        )
        self.add_process_info_collection_packets()

        # Run the stream sending the response..
        context = self.expect_gdbremote_sequence()
        self.assertIsNotNone(context)

        # Gather process info response.
        process_info = self.parse_process_info_response(context)
        self.assertIsNotNone(process_info)

        # Ensure the process id matches what we expected.
        pid_text = process_info.get("pid", None)
        self.assertIsNotNone(pid_text)
        reported_pid = int(pid_text, base=16)
        self.assertEqual(reported_pid, inferior_to_attach.pid)

    @skipIfWindows  # This test is flaky on Windows
    def test_launch_before_attach_with_vAttachOrWait(self):
        self._set_up_inferior()

        self.set_inferior_startup_attach_manually()
        server = self.connect_to_debug_monitor()
        self.do_handshake()

        inferior = self._launch_and_wait_for_init()

        # Add attach packets.
        self.test_sequence.add_log_lines(
            [
                # Do the attach.
                self._attach_packet("vAttachOrWait"),
                # Expect a stop notification from the attach.
                {
                    "direction": "send",
                    "regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
                    "capture": {1: "stop_signal_hex"},
                },
            ],
            True,
        )
        self.add_process_info_collection_packets()

        # Run the stream
        context = self.expect_gdbremote_sequence()
        self.assertIsNotNone(context)

        # Gather process info response
        process_info = self.parse_process_info_response(context)
        self.assertIsNotNone(process_info)

        # Ensure the process id matches what we expected.
        pid_text = process_info.get("pid", None)
        self.assertIsNotNone(pid_text)
        reported_pid = int(pid_text, base=16)
        self.assertEqual(reported_pid, inferior.pid)

    @skipIfWindows  # This test is flaky on Windows
    def test_launch_after_attach_with_vAttachOrWait(self):
        self._set_up_inferior()

        self.set_inferior_startup_attach_manually()
        server = self.connect_to_debug_monitor()
        self.do_handshake()

        self.test_sequence.add_log_lines([self._attach_packet("vAttachOrWait")], True)
        # Run the stream until attachWait.
        context = self.expect_gdbremote_sequence()
        self.assertIsNotNone(context)

        # Sleep so we're sure that the inferior is launched after we ask for the attach.
        sleep(1)

        # Launch the inferior.
        inferior = self._launch_inferior(self._run_args)

        # Make sure the attach succeeded.
        self.test_sequence.add_log_lines(
            [
                {
                    "direction": "send",
                    "regex": r"^\$T([0-9a-fA-F]{2})[^#]*#[0-9a-fA-F]{2}$",
                    "capture": {1: "stop_signal_hex"},
                },
            ],
            True,
        )
        self.add_process_info_collection_packets()

        # Run the stream sending the response..
        context = self.expect_gdbremote_sequence()
        self.assertIsNotNone(context)

        # Gather process info response.
        process_info = self.parse_process_info_response(context)
        self.assertIsNotNone(process_info)

        # Ensure the process id matches what we expected.
        pid_text = process_info.get("pid", None)
        self.assertIsNotNone(pid_text)
        reported_pid = int(pid_text, base=16)
        self.assertEqual(reported_pid, inferior.pid)