llvm/lldb/test/API/macosx/thread_suspend/TestInternalThreadSuspension.py

"""
Make sure that if threads are suspended outside of lldb, debugserver
won't make them run, even if we call an expression on the thread.
"""

import lldb
from lldbsuite.test.decorators import *
import lldbsuite.test.lldbutil as lldbutil
from lldbsuite.test.lldbtest import *


class TestSuspendedThreadHandling(TestBase):
    NO_DEBUG_INFO_TESTCASE = True

    @skipUnlessDarwin
    def test_suspended_threads(self):
        """Test that debugserver doesn't disturb the suspend count of a thread
        that has been suspended from within a program, when navigating breakpoints
        on other threads, or calling functions both on the suspended thread and
        on other threads."""
        self.build()
        self.main_source_file = lldb.SBFileSpec("main.c")
        self.suspended_thread_test()

    def setUp(self):
        # Call super's setUp().
        TestBase.setUp(self)
        # Set up your test case here. If your test doesn't need any set up then
        # remove this method from your TestCase class.

    def try_an_expression(self, thread, correct_value, test_bp):
        frame = thread.frames[0]

        value = frame.EvaluateExpression("function_to_call()")
        self.assertSuccess(value.GetError(), "Successfully called the function")
        self.assertEqual(
            value.GetValueAsSigned(), correct_value, "Got expected value for expression"
        )

        # Again, make sure we didn't let the suspend thread breakpoint run:
        self.assertEqual(
            test_bp.GetHitCount(),
            0,
            "First expression allowed the suspend thread to run",
        )

    def make_bkpt(self, pattern):
        bp = self.target.BreakpointCreateBySourceRegex(pattern, self.main_source_file)
        self.assertEqual(bp.GetNumLocations(), 1, "Locations for %s" % (pattern))
        return bp

    def suspended_thread_test(self):
        (self.target, process, thread, bkpt) = lldbutil.run_to_source_breakpoint(
            self, "Stop here to get things going", self.main_source_file
        )

        # Make in the running thread, so the we will have to stop a number of times
        # while handling breakpoints.  The first couple of times we hit it we will
        # run expressions as well.  Make sure we don't let the suspended thread run
        # during those operations.
        rt_bp = self.make_bkpt("Break here to show we can handle breakpoints")

        # Make a breakpoint that we will hit when the running thread exits:
        rt_exit_bp = self.make_bkpt("Break here after thread_join")

        # Make a breakpoint in the suspended thread.  We should not hit this till we
        # resume it after joining the running thread.
        st_bp = self.make_bkpt("We allowed the suspend thread to run")

        # Make a breakpoint after pthread_join of the suspend thread to ensure
        # that we didn't keep the thread from exiting normally
        st_exit_bp = self.make_bkpt(
            " Break here to make sure the thread exited normally"
        )

        threads = lldbutil.continue_to_breakpoint(process, rt_bp)
        self.assertEqual(len(threads), 1, "Hit the running_func breakpoint")

        # Make sure we didn't hit the suspend thread breakpoint:
        self.assertEqual(
            st_bp.GetHitCount(), 0, "Continue allowed the suspend thread to run"
        )

        # Now try an expression on the running thread:
        self.try_an_expression(threads[0], 0, st_bp)

        # Continue, and check the same things:
        threads = lldbutil.continue_to_breakpoint(process, rt_bp)
        self.assertEqual(len(threads), 1, "We didn't hit running breakpoint")

        # Try an expression on the suspended thread:
        thread = lldb.SBThread()
        for thread in process.threads:
            th_name = thread.GetName()
            if th_name is None:
                continue
            if "Look for me" in th_name:
                break
        self.assertTrue(thread.IsValid(), "We found the suspend thread.")
        self.try_an_expression(thread, 1, st_bp)

        # Now set the running thread breakpoint to auto-continue and let it
        # run a bit to make sure we still don't let the suspend thread run.
        rt_bp.SetAutoContinue(True)
        threads = lldbutil.continue_to_breakpoint(process, rt_exit_bp)
        self.assertEqual(len(threads), 1)
        self.assertEqual(
            st_bp.GetHitCount(), 0, "Continue again let suspended thread run"
        )

        # Now continue and we SHOULD hit the suspend_func breakpoint:
        threads = lldbutil.continue_to_breakpoint(process, st_bp)
        self.assertEqual(len(threads), 1, "The thread resumed successfully")

        # Finally, continue again and we should get out of the last pthread_join
        # and the process should be about to exit
        threads = lldbutil.continue_to_breakpoint(process, st_exit_bp)
        self.assertEqual(len(threads), 1, "pthread_join exited successfully")