from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
import gdbremote_testcase
class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase):
@skipIfWindows # no SIGSEGV support
@add_test_categories(["llgs"])
def test_run(self):
self.build()
self.set_inferior_startup_launch()
thread_num = 3
procs = self.prep_debug_monitor_and_inferior(
inferior_args=["thread:segfault"] + thread_num * ["thread:new"]
)
self.test_sequence.add_log_lines(
[
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#63",
"send packet: $OK#00",
],
True,
)
self.expect_gdbremote_sequence()
segv_signo = lldbutil.get_signal_number("SIGSEGV")
all_threads = set()
all_segv_threads = []
# we should get segfaults from all the threads
for segv_no in range(thread_num):
# first wait for the notification event
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
{
"direction": "send",
"regex": r"^%Stop:(T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);)",
"capture": {1: "packet", 2: "signo", 3: "thread_id"},
},
],
True,
)
m = self.expect_gdbremote_sequence()
del m["O_content"]
threads = [m]
# then we may get events for the remaining threads
# (but note that not all threads may have been started yet)
while True:
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $vStopped#00",
{
"direction": "send",
"regex": r"^\$(OK|T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);)",
"capture": {1: "packet", 2: "signo", 3: "thread_id"},
},
],
True,
)
m = self.expect_gdbremote_sequence()
if m["packet"] == "OK":
break
del m["O_content"]
threads.append(m)
segv_threads = []
other_threads = []
for t in threads:
signo = int(t["signo"], 16)
if signo == segv_signo:
segv_threads.append(t["thread_id"])
else:
self.assertEqual(signo, 0)
other_threads.append(t["thread_id"])
# verify that exactly one thread segfaulted
self.assertEqual(len(segv_threads), 1)
# we should get only one segv from every thread
self.assertNotIn(segv_threads[0], all_segv_threads)
all_segv_threads.extend(segv_threads)
# segv_threads + other_threads should always be a superset
# of all_threads, i.e. we should get states for all threads
# already started
self.assertFalse(all_threads.difference(other_threads + segv_threads))
all_threads.update(other_threads + segv_threads)
# verify that `?` returns the same result
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $?#00",
],
True,
)
threads_verify = []
while True:
self.test_sequence.add_log_lines(
[
{
"direction": "send",
"regex": r"^\$(OK|T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);)",
"capture": {1: "packet", 2: "signo", 3: "thread_id"},
},
],
True,
)
m = self.expect_gdbremote_sequence()
if m["packet"] == "OK":
break
del m["O_content"]
threads_verify.append(m)
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $vStopped#00",
],
True,
)
self.assertEqual(threads, threads_verify)
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $vCont;C{:02x}:{};c#00".format(
segv_signo, segv_threads[0]
),
"send packet: $OK#00",
],
True,
)
self.expect_gdbremote_sequence()
# finally, verify that all threads have started
self.assertEqual(len(all_threads), thread_num + 1)
@add_test_categories(["llgs"])
def test_vCtrlC(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior(inferior_args=["thread:new"])
self.test_sequence.add_log_lines(
[
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#63",
"send packet: $OK#00",
"read packet: $vCtrlC#00",
"send packet: $OK#00",
{
"direction": "send",
"regex": r"^%Stop:T",
},
],
True,
)
self.expect_gdbremote_sequence()
@add_test_categories(["llgs"])
def test_exit(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior()
self.test_sequence.add_log_lines(
[
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#63",
"send packet: $OK#00",
"send packet: %Stop:W00#00",
"read packet: $vStopped#00",
"send packet: $OK#00",
],
True,
)
self.expect_gdbremote_sequence()
@skipIfWindows # no clue, the result makes zero sense
@add_test_categories(["llgs"])
def test_exit_query(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior()
self.test_sequence.add_log_lines(
[
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#63",
"send packet: $OK#00",
"send packet: %Stop:W00#00",
"read packet: $?#00",
"send packet: $W00#00",
"read packet: $vStopped#00",
"send packet: $OK#00",
],
True,
)
self.expect_gdbremote_sequence()
def multiple_resume_test(self, second_command):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:15"])
self.test_sequence.add_log_lines(
[
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#63",
"send packet: $OK#00",
"read packet: ${}#00".format(second_command),
"send packet: $E37#00",
],
True,
)
self.expect_gdbremote_sequence()
@add_test_categories(["llgs"])
def test_multiple_C_continue_with_signal(self):
self.multiple_resume_test("C05")
@add_test_categories(["llgs"])
def test_multiple_c_continue_with_addr(self):
self.multiple_resume_test("c")
@add_test_categories(["llgs"])
def test_multiple_s_single_step_with_addr(self):
self.multiple_resume_test("s")
@skipIfWindows
@add_test_categories(["llgs"])
def test_multiple_vCont(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior(
inferior_args=["thread:new", "stop", "sleep:15"]
)
self.test_sequence.add_log_lines(
[
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#63",
"send packet: $OK#00",
{
"direction": "send",
"regex": r"^%Stop:T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+);",
"capture": {1: "tid1"},
},
"read packet: $vStopped#63",
{
"direction": "send",
"regex": r"^[$]T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+);",
"capture": {1: "tid2"},
},
"read packet: $vStopped#63",
"send packet: $OK#00",
],
True,
)
ret = self.expect_gdbremote_sequence()
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $vCont;c:{}#00".format(ret["tid1"]),
"send packet: $OK#00",
"read packet: $vCont;c:{}#00".format(ret["tid2"]),
"send packet: $E37#00",
],
True,
)
self.expect_gdbremote_sequence()
@add_test_categories(["llgs"])
def test_vCont_then_stop(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:15"])
self.test_sequence.add_log_lines(
[
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#63",
"send packet: $OK#00",
"read packet: $vCont;t#00",
"send packet: $OK#00",
],
True,
)
self.expect_gdbremote_sequence()
def vCont_then_partial_stop_test(self, run_both):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior(
inferior_args=["thread:new", "stop", "sleep:15"]
)
self.test_sequence.add_log_lines(
[
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#63",
"send packet: $OK#00",
{
"direction": "send",
"regex": r"^%Stop:T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+);",
"capture": {1: "tid1"},
},
"read packet: $vStopped#63",
{
"direction": "send",
"regex": r"^[$]T[0-9a-fA-F]{2}thread:([0-9a-fA-F]+);",
"capture": {1: "tid2"},
},
"read packet: $vStopped#63",
"send packet: $OK#00",
],
True,
)
ret = self.expect_gdbremote_sequence()
self.reset_test_sequence()
if run_both:
self.test_sequence.add_log_lines(
[
"read packet: $vCont;c#00",
],
True,
)
else:
self.test_sequence.add_log_lines(
[
"read packet: $vCont;c:{}#00".format(ret["tid1"]),
],
True,
)
self.test_sequence.add_log_lines(
[
"send packet: $OK#00",
"read packet: $vCont;t:{}#00".format(ret["tid2"]),
"send packet: $E03#00",
],
True,
)
self.expect_gdbremote_sequence()
@skipIfWindows
@add_test_categories(["llgs"])
def test_vCont_then_partial_stop(self):
self.vCont_then_partial_stop_test(False)
@skipIfWindows
@add_test_categories(["llgs"])
def test_vCont_then_partial_stop_run_both(self):
self.vCont_then_partial_stop_test(True)
@skipIfWindows
@add_test_categories(["llgs"])
def test_stdio(self):
self.build()
self.set_inferior_startup_launch()
# Since we can't easily ensure that lldb will send output in two parts,
# just put a stop in the middle. Since we don't clear vStdio,
# the second message won't be delivered immediately.
self.prep_debug_monitor_and_inferior(
inferior_args=["message 1", "stop", "message 2"]
)
self.test_sequence.add_log_lines(
[
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
"read packet: $c#63",
"send packet: $OK#00",
{"direction": "send", "regex": r"^%Stop:T.*"},
"read packet: $vStopped#00",
"send packet: $OK#00",
"read packet: $c#63",
"send packet: $OK#00",
"send packet: %Stop:W00#00",
],
True,
)
ret = self.expect_gdbremote_sequence()
# We know there will be at least two messages, but there may be more.
# Loop until we have everything. The first message waiting for us in the
# packet queue.
count = 1
output = self._server.get_raw_output_packet()
while not (b"message 2\r\n" in output):
self._server.send_packet(b"vStdio")
output += self._server.get_raw_output_packet()
count += 1
self.assertGreaterEqual(count, 2)
self.reset_test_sequence()
self.test_sequence.add_log_lines(
[
"read packet: $vStdio#00",
"send packet: $OK#00",
"read packet: $vStopped#00",
"send packet: $OK#00",
],
True,
)
self.expect_gdbremote_sequence()
@skipIfWindows
@add_test_categories(["llgs"])
def test_stop_reason_while_running(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior(
inferior_args=["thread:new", "thread:new", "stop", "sleep:15"]
)
self.test_sequence.add_log_lines(
[
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
# stop is used to synchronize starting threads
"read packet: $c#63",
"send packet: $OK#00",
{"direction": "send", "regex": "%Stop:T.*"},
"read packet: $c#63",
"send packet: $OK#00",
"read packet: $?#00",
"send packet: $OK#00",
],
True,
)
self.expect_gdbremote_sequence()
@skipIfWindows
@add_test_categories(["llgs"])
def test_leave_nonstop(self):
self.build()
self.set_inferior_startup_launch()
procs = self.prep_debug_monitor_and_inferior(
inferior_args=["thread:new", "thread:new", "stop", "sleep:15"]
)
self.test_sequence.add_log_lines(
[
"read packet: $QNonStop:1#00",
"send packet: $OK#00",
# stop is used to synchronize starting threads
"read packet: $c#63",
"send packet: $OK#00",
{"direction": "send", "regex": "%Stop:T.*"},
"read packet: $c#63",
"send packet: $OK#00",
# verify that the threads are running now
"read packet: $?#00",
"send packet: $OK#00",
"read packet: $QNonStop:0#00",
"send packet: $OK#00",
# we should issue some random request now to verify that the stub
# did not send stop reasons -- we may verify whether notification
# queue was cleared while at it
"read packet: $vStopped#00",
"send packet: $Eff#00",
"read packet: $?#00",
{"direction": "send", "regex": "[$]T.*"},
],
True,
)
self.expect_gdbremote_sequence()