import gdbremote_testcase
class GdbRemoteForkTestBase(gdbremote_testcase.GdbRemoteTestCaseBase):
fork_regex = (
"[$]T[0-9a-fA-F]{{2}}thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
"{}:p([0-9a-f]+)[.]([0-9a-f]+).*"
)
fork_regex_nonstop = (
"%Stop:T[0-9a-fA-F]{{2}}"
"thread:p([0-9a-f]+)[.]([0-9a-f]+);.*"
"{}:p([0-9a-f]+)[.]([0-9a-f]+).*"
)
fork_capture = {1: "parent_pid", 2: "parent_tid", 3: "child_pid", 4: "child_tid"}
stop_regex_base = "T[0-9a-fA-F]{{2}}thread:p{}.{};.*reason:signal.*"
stop_regex = "^[$]" + stop_regex_base
def start_fork_test(self, args, variant="fork", nonstop=False):
self.build()
self.prep_debug_monitor_and_inferior(inferior_args=args)
self.add_qSupported_packets(["multiprocess+", "{}-events+".format(variant)])
ret = self.expect_gdbremote_sequence()
self.assertIn("{}-events+".format(variant), ret["qSupported_response"])
self.reset_test_sequence()
# continue and expect fork
if nonstop:
self.test_sequence.add_log_lines(
[
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#00",
"send packet: $OK#00",
{
"direction": "send",
"regex": self.fork_regex_nonstop.format(variant),
"capture": self.fork_capture,
},
"read packet: $vStopped#00",
"send packet: $OK#00",
],
True,
)
else:
self.test_sequence.add_log_lines(
[
"read packet: $c#00",
{
"direction": "send",
"regex": self.fork_regex.format(variant),
"capture": self.fork_capture,
},
],
True,
)
ret = self.expect_gdbremote_sequence()
self.reset_test_sequence()
return tuple(
ret[x] for x in ("parent_pid", "parent_tid", "child_pid", "child_tid")
)
def fork_and_detach_test(self, variant, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
[variant], variant, nonstop=nonstop
)
# detach the forked child
self.test_sequence.add_log_lines(
[
"read packet: $D;{}#00".format(child_pid),
"send packet: $OK#00",
# verify that the current process is correct
"read packet: $qC#00",
"send packet: $QCp{}.{}#00".format(parent_pid, parent_tid),
# verify that the correct processes are detached/available
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $Eff#00",
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $OK#00",
],
True,
)
self.expect_gdbremote_sequence()
self.reset_test_sequence()
return parent_pid, parent_tid
def fork_and_follow_test(self, variant, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
[variant], variant, nonstop=nonstop
)
# switch to the forked child
self.test_sequence.add_log_lines(
[
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
"read packet: $Hcp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
# detach the parent
"read packet: $D;{}#00".format(parent_pid),
"send packet: $OK#00",
# verify that the correct processes are detached/available
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $Eff#00",
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
# then resume the child
"read packet: $c#00",
],
True,
)
if nonstop:
self.test_sequence.add_log_lines(
[
"send packet: $OK#00",
"send packet: %Stop:W00;process:{}#00".format(child_pid),
"read packet: $vStopped#00",
"send packet: $OK#00",
],
True,
)
else:
self.test_sequence.add_log_lines(
[
"send packet: $W00;process:{}#00".format(child_pid),
],
True,
)
self.expect_gdbremote_sequence()
def detach_all_test(self, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
["fork"], nonstop=nonstop
)
self.test_sequence.add_log_lines(
[
# double-check our PIDs
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $OK#00",
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $OK#00",
# detach all processes
"read packet: $D#00",
"send packet: $OK#00",
# verify that both PIDs are invalid now
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: $Eff#00",
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: $Eff#00",
],
True,
)
self.expect_gdbremote_sequence()
def vkill_test(self, kill_parent=False, kill_child=False, nonstop=False):
assert kill_parent or kill_child
parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
["fork"], nonstop=nonstop
)
if kill_parent:
self.test_sequence.add_log_lines(
[
# kill the process
"read packet: $vKill;{}#00".format(parent_pid),
"send packet: $OK#00",
],
True,
)
if kill_child:
self.test_sequence.add_log_lines(
[
# kill the process
"read packet: $vKill;{}#00".format(child_pid),
"send packet: $OK#00",
],
True,
)
self.test_sequence.add_log_lines(
[
# check child PID/TID
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: ${}#00".format("Eff" if kill_child else "OK"),
# check parent PID/TID
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: ${}#00".format("Eff" if kill_parent else "OK"),
],
True,
)
self.expect_gdbremote_sequence()
def resume_one_test(self, run_order, use_vCont=False, nonstop=False):
parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test(
["fork", "stop"], nonstop=nonstop
)
parent_expect = [
self.stop_regex_base.format(parent_pid, parent_tid),
"W00;process:{}#.*".format(parent_pid),
]
child_expect = [
self.stop_regex_base.format(child_pid, child_tid),
"W00;process:{}#.*".format(child_pid),
]
for x in run_order:
if x == "parent":
pidtid = (parent_pid, parent_tid)
expect = parent_expect.pop(0)
elif x == "child":
pidtid = (child_pid, child_tid)
expect = child_expect.pop(0)
else:
assert False, "unexpected x={}".format(x)
if use_vCont:
self.test_sequence.add_log_lines(
[
# continue the selected process
"read packet: $vCont;c:p{}.{}#00".format(*pidtid),
],
True,
)
else:
self.test_sequence.add_log_lines(
[
# continue the selected process
"read packet: $Hcp{}.{}#00".format(*pidtid),
"send packet: $OK#00",
"read packet: $c#00",
],
True,
)
if nonstop:
self.test_sequence.add_log_lines(
[
"send packet: $OK#00",
{"direction": "send", "regex": "%Stop:" + expect},
"read packet: $vStopped#00",
"send packet: $OK#00",
],
True,
)
else:
self.test_sequence.add_log_lines(
[
{"direction": "send", "regex": "[$]" + expect},
],
True,
)
# if at least one process remained, check both PIDs
if parent_expect or child_expect:
self.test_sequence.add_log_lines(
[
"read packet: $Hgp{}.{}#00".format(parent_pid, parent_tid),
"send packet: ${}#00".format("OK" if parent_expect else "Eff"),
"read packet: $Hgp{}.{}#00".format(child_pid, child_tid),
"send packet: ${}#00".format("OK" if child_expect else "Eff"),
],
True,
)
self.expect_gdbremote_sequence()