llvm/lldb/packages/Python/lldbsuite/test/tools/lldb-server/fork_testbase.py

import gdbremote_testcase


class GdbRemoteForkTestBase(gdbremote_testcase.GdbRemoteTestCaseBase):
    fork_regex = (
        "[$]T[0-9a-fA-F]{{2}}thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
        "{}:p([0-9a-f]+)[.]([0-9a-f]+).*"
    )
    fork_regex_nonstop = (
        "%Stop:T[0-9a-fA-F]{{2}}"
        "thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
        "{}:p([0-9a-f]+)[.]([0-9a-f]+).*"
    )
    fork_capture = {1: "parent_pid", 2: "parent_tid", 3: "child_pid", 4: "child_tid"}
    stop_regex_base = "T[0-9a-fA-F]{{2}}thread:p{}.{};.*reason:signal.*"
    stop_regex = "^[$]" + stop_regex_base

    def start_fork_test(self, args, variant="fork", nonstop=False):
        self.build()
        self.prep_debug_monitor_and_inferior(inferior_args=args)
        self.add_qSupported_packets(["multiprocess+", "{}-events+".format(variant)])
        ret = self.expect_gdbremote_sequence()
        self.assertIn("{}-events+".format(variant), ret["qSupported_response"])
        self.reset_test_sequence()

        # continue and expect fork
        if nonstop:
            self.test_sequence.add_log_lines(
                [
                    "read packet: $QNonStop:1#00",
                    "send packet: $OK#00",
                    "read packet: $c#00",
                    "send packet: $OK#00",
                    {
                        "direction": "send",
                        "regex": self.fork_regex_nonstop.format(variant),
                        "capture": self.fork_capture,
                    },
                    "read packet: $vStopped#00",
                    "send packet: $OK#00",
                ],
                True,
            )
        else:
            self.test_sequence.add_log_lines(
                [
                    "read packet: $c#00",
                    {
                        "direction": "send",
                        "regex": self.fork_regex.format(variant),
                        "capture": self.fork_capture,
                    },
                ],
                True,
            )
        ret = self.expect_gdbremote_sequence()
        self.reset_test_sequence()

        return tuple(
            ret[x] for x in ("parent_pid", "parent_tid", "child_pid", "child_tid")
        )

    def fork_and_detach_test(self, variant, nonstop=False):
        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
            [variant], variant, nonstop=nonstop
        )

        # detach the forked child
        self.test_sequence.add_log_lines(
            [
                "read packet: $D;{}#00".format(child_pid),
                "send packet: $OK#00",
                # verify that the current process is correct
                "read packet: $qC#00",
                "send packet: $QCp{}.{}#00".format(parent_pid, parent_tid),
                # verify that the correct processes are detached/available
                "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
                "send packet: $Eff#00",
                "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
                "send packet: $OK#00",
            ],
            True,
        )
        self.expect_gdbremote_sequence()
        self.reset_test_sequence()
        return parent_pid, parent_tid

    def fork_and_follow_test(self, variant, nonstop=False):
        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
            [variant], variant, nonstop=nonstop
        )

        # switch to the forked child
        self.test_sequence.add_log_lines(
            [
                "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
                "send packet: $OK#00",
                "read packet: $Hcp{}.{}#00".format(child_pid, child_tid),
                "send packet: $OK#00",
                # detach the parent
                "read packet: $D;{}#00".format(parent_pid),
                "send packet: $OK#00",
                # verify that the correct processes are detached/available
                "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
                "send packet: $Eff#00",
                "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
                "send packet: $OK#00",
                # then resume the child
                "read packet: $c#00",
            ],
            True,
        )

        if nonstop:
            self.test_sequence.add_log_lines(
                [
                    "send packet: $OK#00",
                    "send packet: %Stop:W00;process:{}#00".format(child_pid),
                    "read packet: $vStopped#00",
                    "send packet: $OK#00",
                ],
                True,
            )
        else:
            self.test_sequence.add_log_lines(
                [
                    "send packet: $W00;process:{}#00".format(child_pid),
                ],
                True,
            )
        self.expect_gdbremote_sequence()

    def detach_all_test(self, nonstop=False):
        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
            ["fork"], nonstop=nonstop
        )

        self.test_sequence.add_log_lines(
            [
                # double-check our PIDs
                "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
                "send packet: $OK#00",
                "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
                "send packet: $OK#00",
                # detach all processes
                "read packet: $D#00",
                "send packet: $OK#00",
                # verify that both PIDs are invalid now
                "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
                "send packet: $Eff#00",
                "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
                "send packet: $Eff#00",
            ],
            True,
        )
        self.expect_gdbremote_sequence()

    def vkill_test(self, kill_parent=False, kill_child=False, nonstop=False):
        assert kill_parent or kill_child
        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
            ["fork"], nonstop=nonstop
        )

        if kill_parent:
            self.test_sequence.add_log_lines(
                [
                    # kill the process
                    "read packet: $vKill;{}#00".format(parent_pid),
                    "send packet: $OK#00",
                ],
                True,
            )
        if kill_child:
            self.test_sequence.add_log_lines(
                [
                    # kill the process
                    "read packet: $vKill;{}#00".format(child_pid),
                    "send packet: $OK#00",
                ],
                True,
            )
        self.test_sequence.add_log_lines(
            [
                # check child PID/TID
                "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
                "send packet: ${}#00".format("Eff" if kill_child else "OK"),
                # check parent PID/TID
                "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
                "send packet: ${}#00".format("Eff" if kill_parent else "OK"),
            ],
            True,
        )
        self.expect_gdbremote_sequence()

    def resume_one_test(self, run_order, use_vCont=False, nonstop=False):
        parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
            ["fork", "stop"], nonstop=nonstop
        )

        parent_expect = [
            self.stop_regex_base.format(parent_pid, parent_tid),
            "W00;process:{}#.*".format(parent_pid),
        ]
        child_expect = [
            self.stop_regex_base.format(child_pid, child_tid),
            "W00;process:{}#.*".format(child_pid),
        ]

        for x in run_order:
            if x == "parent":
                pidtid = (parent_pid, parent_tid)
                expect = parent_expect.pop(0)
            elif x == "child":
                pidtid = (child_pid, child_tid)
                expect = child_expect.pop(0)
            else:
                assert False, "unexpected x={}".format(x)

            if use_vCont:
                self.test_sequence.add_log_lines(
                    [
                        # continue the selected process
                        "read packet: $vCont;c:p{}.{}#00".format(*pidtid),
                    ],
                    True,
                )
            else:
                self.test_sequence.add_log_lines(
                    [
                        # continue the selected process
                        "read packet: $Hcp{}.{}#00".format(*pidtid),
                        "send packet: $OK#00",
                        "read packet: $c#00",
                    ],
                    True,
                )
            if nonstop:
                self.test_sequence.add_log_lines(
                    [
                        "send packet: $OK#00",
                        {"direction": "send", "regex": "%Stop:" + expect},
                        "read packet: $vStopped#00",
                        "send packet: $OK#00",
                    ],
                    True,
                )
            else:
                self.test_sequence.add_log_lines(
                    [
                        {"direction": "send", "regex": "[$]" + expect},
                    ],
                    True,
                )
            # if at least one process remained, check both PIDs
            if parent_expect or child_expect:
                self.test_sequence.add_log_lines(
                    [
                        "read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
                        "send packet: ${}#00".format("OK" if parent_expect else "Eff"),
                        "read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
                        "send packet: ${}#00".format("OK" if child_expect else "Eff"),
                    ],
                    True,
                )
        self.expect_gdbremote_sequence()