from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from fork_testbase import GdbRemoteForkTestBase
class TestGdbRemoteForkNonStop(GdbRemoteForkTestBase):
def setUp(self):
GdbRemoteForkTestBase.setUp(self)
if self.getPlatform() == "linux" and self.getArchitecture() in [
"arm",
"aarch64",
]:
self.skipTest("Unsupported for Arm/AArch64 Linux")
@add_test_categories(["fork"])
def test_vfork_nonstop(self):
parent_pid, parent_tid = self.fork_and_detach_test("vfork", nonstop=True)
# resume the parent
self.test_sequence.add_log_lines(
[
"read packet: $c#00",
"send packet: $OK#00",
{
"direction": "send",
"regex": r"%Stop:T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*".format(
parent_pid, parent_tid
),
},
"read packet: $vStopped#00",
"send packet: $OK#00",
"read packet: $c#00",
"send packet: $OK#00",
"send packet: %Stop:W00;process:{}#00".format(parent_pid),
"read packet: $vStopped#00",
"send packet: $OK#00",
],
True,
)
self.expect_gdbremote_sequence()
@add_test_categories(["fork"])
def test_fork_nonstop(self):
parent_pid, _ = self.fork_and_detach_test("fork", nonstop=True)
# resume the parent
self.test_sequence.add_log_lines(
[
"read packet: $c#00",
"send packet: $OK#00",
"send packet: %Stop:W00;process:{}#00".format(parent_pid),
"read packet: $vStopped#00",
"send packet: $OK#00",
],
True,
)
self.expect_gdbremote_sequence()
@add_test_categories(["fork"])
def test_fork_follow_nonstop(self):
self.fork_and_follow_test("fork", nonstop=True)
@add_test_categories(["fork"])
def test_vfork_follow_nonstop(self):
self.fork_and_follow_test("vfork", nonstop=True)
@add_test_categories(["fork"])
def test_detach_all_nonstop(self):
self.detach_all_test(nonstop=True)
@add_test_categories(["fork"])
def test_kill_all_nonstop(self):
parent_pid, _, child_pid, _ = self.start_fork_test(["fork"], nonstop=True)
exit_regex = "X09;process:([0-9a-f]+)"
# Depending on a potential race, the second kill may make it into
# the async queue before we issue vStopped or after. In the former
# case, we should expect the exit status in reply to vStopped.
# In the latter, we should expect an OK response (queue empty),
# followed by another async notification.
vstop_regex = "[$](OK|{})#.*".format(exit_regex)
self.test_sequence.add_log_lines(
[
# kill all processes
"read packet: $k#00",
"send packet: $OK#00",
{
"direction": "send",
"regex": "%Stop:{}#.*".format(exit_regex),
"capture": {1: "pid1"},
},
"read packet: $vStopped#00",
{
"direction": "send",
"regex": vstop_regex,
"capture": {1: "vstop_reply", 2: "pid2"},
},
],
True,
)
ret = self.expect_gdbremote_sequence()
pid1 = ret["pid1"]
if ret["vstop_reply"] == "OK":
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
{
"direction": "send",
"regex": "%Stop:{}#.*".format(exit_regex),
"capture": {1: "pid2"},
},
],
True,
)
ret = self.expect_gdbremote_sequence()
pid2 = ret["pid2"]
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $vStopped#00",
"send packet: $OK#00",
],
True,
)
self.expect_gdbremote_sequence()
self.assertEqual(set([pid1, pid2]), set([parent_pid, child_pid]))
@add_test_categories(["fork"])
def test_vkill_both_nonstop(self):
self.vkill_test(kill_parent=True, kill_child=True, nonstop=True)
@add_test_categories(["fork"])
def test_c_interspersed_nonstop(self):
self.resume_one_test(
run_order=["parent", "child", "parent", "child"], nonstop=True
)
@add_test_categories(["fork"])
def test_vCont_interspersed_nonstop(self):
self.resume_one_test(
run_order=["parent", "child", "parent", "child"],
use_vCont=True,
nonstop=True,
)
def get_all_output_via_vStdio(self, output_test):
# The output may be split into an arbitrary number of messages.
# Loop until we have everything. The first message is waiting for us
# in the packet queue.
output = self._server.get_raw_output_packet()
while not output_test(output):
self._server.send_packet(b"vStdio")
output += self._server.get_raw_output_packet()
return output
@add_test_categories(["fork"])
def test_c_both_nonstop(self):
lock1 = self.getBuildArtifact("lock1")
lock2 = self.getBuildArtifact("lock2")
parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
[
"fork",
"process:sync:" + lock1,
"print-pid",
"process:sync:" + lock2,
"stop",
],
nonstop=True,
)
self.test_sequence.add_log_lines(
[
"read packet: $Hcp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $OK#00",
"read packet: $c#00",
"send packet: $OK#00",
"read packet: $Hcp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
"read packet: $c#00",
"send packet: $OK#00",
{"direction": "send", "regex": "%Stop:T.*"},
],
True,
)
self.expect_gdbremote_sequence()
output = self.get_all_output_via_vStdio(
lambda output: output.count(b"PID: ") >= 2
)
self.assertEqual(output.count(b"PID: "), 2)
self.assertIn("PID: {}".format(int(parent_pid, 16)).encode(), output)
self.assertIn("PID: {}".format(int(child_pid, 16)).encode(), output)
@add_test_categories(["fork"])
def test_vCont_both_nonstop(self):
lock1 = self.getBuildArtifact("lock1")
lock2 = self.getBuildArtifact("lock2")
parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
[
"fork",
"process:sync:" + lock1,
"print-pid",
"process:sync:" + lock2,
"stop",
],
nonstop=True,
)
self.test_sequence.add_log_lines(
[
"read packet: $vCont;c:p{}.{};c:p{}.{}#00".format(
parent_pid, parent_tid, child_pid, child_tid
),
"send packet: $OK#00",
{"direction": "send", "regex": "%Stop:T.*"},
],
True,
)
self.expect_gdbremote_sequence()
output = self.get_all_output_via_vStdio(
lambda output: output.count(b"PID: ") >= 2
)
self.assertEqual(output.count(b"PID: "), 2)
self.assertIn("PID: {}".format(int(parent_pid, 16)).encode(), output)
self.assertIn("PID: {}".format(int(child_pid, 16)).encode(), output)
def vCont_both_nonstop_test(self, vCont_packet):
lock1 = self.getBuildArtifact("lock1")
lock2 = self.getBuildArtifact("lock2")
parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
[
"fork",
"process:sync:" + lock1,
"print-pid",
"process:sync:" + lock2,
"stop",
],
nonstop=True,
)
self.test_sequence.add_log_lines(
[
"read packet: ${}#00".format(vCont_packet),
"send packet: $OK#00",
{"direction": "send", "regex": "%Stop:T.*"},
],
True,
)
self.expect_gdbremote_sequence()
output = self.get_all_output_via_vStdio(
lambda output: output.count(b"PID: ") >= 2
)
self.assertEqual(output.count(b"PID: "), 2)
self.assertIn("PID: {}".format(int(parent_pid, 16)).encode(), output)
self.assertIn("PID: {}".format(int(child_pid, 16)).encode(), output)
@add_test_categories(["fork"])
def test_vCont_both_implicit_nonstop(self):
self.vCont_both_nonstop_test("vCont;c")
@add_test_categories(["fork"])
def test_vCont_both_minus_one_nonstop(self):
self.vCont_both_nonstop_test("vCont;c:p-1.-1")