llvm/lldb/test/API/tools/lldb-server/TestGdbRemoteForkNonStop.py

from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *

from fork_testbase import GdbRemoteForkTestBase


class TestGdbRemoteForkNonStop(GdbRemoteForkTestBase):
    def setUp(self):
        GdbRemoteForkTestBase.setUp(self)
        if self.getPlatform() == "linux" and self.getArchitecture() in [
            "arm",
            "aarch64",
        ]:
            self.skipTest("Unsupported for Arm/AArch64 Linux")

    @add_test_categories(["fork"])
    def test_vfork_nonstop(self):
        parent_pid, parent_tid = self.fork_and_detach_test("vfork", nonstop=True)

        # resume the parent
        self.test_sequence.add_log_lines(
            [
                "read packet: $c#00",
                "send packet: $OK#00",
                {
                    "direction": "send",
                    "regex": r"%Stop:T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*".format(
                        parent_pid, parent_tid
                    ),
                },
                "read packet: $vStopped#00",
                "send packet: $OK#00",
                "read packet: $c#00",
                "send packet: $OK#00",
                "send packet: %Stop:W00;process:{}#00".format(parent_pid),
                "read packet: $vStopped#00",
                "send packet: $OK#00",
            ],
            True,
        )
        self.expect_gdbremote_sequence()

    @add_test_categories(["fork"])
    def test_fork_nonstop(self):
        parent_pid, _ = self.fork_and_detach_test("fork", nonstop=True)

        # resume the parent
        self.test_sequence.add_log_lines(
            [
                "read packet: $c#00",
                "send packet: $OK#00",
                "send packet: %Stop:W00;process:{}#00".format(parent_pid),
                "read packet: $vStopped#00",
                "send packet: $OK#00",
            ],
            True,
        )
        self.expect_gdbremote_sequence()

    @add_test_categories(["fork"])
    def test_fork_follow_nonstop(self):
        self.fork_and_follow_test("fork", nonstop=True)

    @add_test_categories(["fork"])
    def test_vfork_follow_nonstop(self):
        self.fork_and_follow_test("vfork", nonstop=True)

    @add_test_categories(["fork"])
    def test_detach_all_nonstop(self):
        self.detach_all_test(nonstop=True)

    @add_test_categories(["fork"])
    def test_kill_all_nonstop(self):
        parent_pid, _, child_pid, _ = self.start_fork_test(["fork"], nonstop=True)

        exit_regex = "X09;process:([0-9a-f]+)"
        # Depending on a potential race, the second kill may make it into
        # the async queue before we issue vStopped or after.  In the former
        # case, we should expect the exit status in reply to vStopped.
        # In the latter, we should expect an OK response (queue empty),
        # followed by another async notification.
        vstop_regex = "[$](OK|{})#.*".format(exit_regex)
        self.test_sequence.add_log_lines(
            [
                # kill all processes
                "read packet: $k#00",
                "send packet: $OK#00",
                {
                    "direction": "send",
                    "regex": "%Stop:{}#.*".format(exit_regex),
                    "capture": {1: "pid1"},
                },
                "read packet: $vStopped#00",
                {
                    "direction": "send",
                    "regex": vstop_regex,
                    "capture": {1: "vstop_reply", 2: "pid2"},
                },
            ],
            True,
        )
        ret = self.expect_gdbremote_sequence()
        pid1 = ret["pid1"]
        if ret["vstop_reply"] == "OK":
            self.reset_test_sequence()
            self.test_sequence.add_log_lines(
                [
                    {
                        "direction": "send",
                        "regex": "%Stop:{}#.*".format(exit_regex),
                        "capture": {1: "pid2"},
                    },
                ],
                True,
            )
            ret = self.expect_gdbremote_sequence()
        pid2 = ret["pid2"]
        self.reset_test_sequence()
        self.test_sequence.add_log_lines(
            [
                "read packet: $vStopped#00",
                "send packet: $OK#00",
            ],
            True,
        )
        self.expect_gdbremote_sequence()
        self.assertEqual(set([pid1, pid2]), set([parent_pid, child_pid]))

    @add_test_categories(["fork"])
    def test_vkill_both_nonstop(self):
        self.vkill_test(kill_parent=True, kill_child=True, nonstop=True)

    @add_test_categories(["fork"])
    def test_c_interspersed_nonstop(self):
        self.resume_one_test(
            run_order=["parent", "child", "parent", "child"], nonstop=True
        )

    @add_test_categories(["fork"])
    def test_vCont_interspersed_nonstop(self):
        self.resume_one_test(
            run_order=["parent", "child", "parent", "child"],
            use_vCont=True,
            nonstop=True,
        )

    def get_all_output_via_vStdio(self, output_test):
        # The output may be split into an arbitrary number of messages.
        # Loop until we have everything. The first message is waiting for us
        # in the packet queue.
        output = self._server.get_raw_output_packet()
        while not output_test(output):
            self._server.send_packet(b"vStdio")
            output += self._server.get_raw_output_packet()
        return output

    @add_test_categories(["fork"])
    def test_c_both_nonstop(self):
        lock1 = self.getBuildArtifact("lock1")
        lock2 = self.getBuildArtifact("lock2")
        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
            [
                "fork",
                "process:sync:" + lock1,
                "print-pid",
                "process:sync:" + lock2,
                "stop",
            ],
            nonstop=True,
        )

        self.test_sequence.add_log_lines(
            [
                "read packet: $Hcp{}.{}#00".format(parent_pid, parent_tid),
                "send packet: $OK#00",
                "read packet: $c#00",
                "send packet: $OK#00",
                "read packet: $Hcp{}.{}#00".format(child_pid, child_tid),
                "send packet: $OK#00",
                "read packet: $c#00",
                "send packet: $OK#00",
                {"direction": "send", "regex": "%Stop:T.*"},
            ],
            True,
        )
        self.expect_gdbremote_sequence()

        output = self.get_all_output_via_vStdio(
            lambda output: output.count(b"PID: ") >= 2
        )
        self.assertEqual(output.count(b"PID: "), 2)
        self.assertIn("PID: {}".format(int(parent_pid, 16)).encode(), output)
        self.assertIn("PID: {}".format(int(child_pid, 16)).encode(), output)

    @add_test_categories(["fork"])
    def test_vCont_both_nonstop(self):
        lock1 = self.getBuildArtifact("lock1")
        lock2 = self.getBuildArtifact("lock2")
        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
            [
                "fork",
                "process:sync:" + lock1,
                "print-pid",
                "process:sync:" + lock2,
                "stop",
            ],
            nonstop=True,
        )

        self.test_sequence.add_log_lines(
            [
                "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format(
                    parent_pid, parent_tid, child_pid, child_tid
                ),
                "send packet: $OK#00",
                {"direction": "send", "regex": "%Stop:T.*"},
            ],
            True,
        )
        self.expect_gdbremote_sequence()

        output = self.get_all_output_via_vStdio(
            lambda output: output.count(b"PID: ") >= 2
        )
        self.assertEqual(output.count(b"PID: "), 2)
        self.assertIn("PID: {}".format(int(parent_pid, 16)).encode(), output)
        self.assertIn("PID: {}".format(int(child_pid, 16)).encode(), output)

    def vCont_both_nonstop_test(self, vCont_packet):
        lock1 = self.getBuildArtifact("lock1")
        lock2 = self.getBuildArtifact("lock2")
        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
            [
                "fork",
                "process:sync:" + lock1,
                "print-pid",
                "process:sync:" + lock2,
                "stop",
            ],
            nonstop=True,
        )

        self.test_sequence.add_log_lines(
            [
                "read packet: ${}#00".format(vCont_packet),
                "send packet: $OK#00",
                {"direction": "send", "regex": "%Stop:T.*"},
            ],
            True,
        )
        self.expect_gdbremote_sequence()

        output = self.get_all_output_via_vStdio(
            lambda output: output.count(b"PID: ") >= 2
        )
        self.assertEqual(output.count(b"PID: "), 2)
        self.assertIn("PID: {}".format(int(parent_pid, 16)).encode(), output)
        self.assertIn("PID: {}".format(int(child_pid, 16)).encode(), output)

    @add_test_categories(["fork"])
    def test_vCont_both_implicit_nonstop(self):
        self.vCont_both_nonstop_test("vCont;c")

    @add_test_categories(["fork"])
    def test_vCont_both_minus_one_nonstop(self):
        self.vCont_both_nonstop_test("vCont;c:p-1.-1")