llvm/lldb/packages/Python/lldbsuite/test/tools/intelpt/intelpt_testcase.py

from lldbsuite.test.lldbtest import *
import os
import time
import json

ADDRESS_REGEX = "0x[0-9a-fA-F]*"


# Decorator that runs a test with both modes of USE_SB_API.
# It assumes that no tests can be executed in parallel.
def testSBAPIAndCommands(func):
    def wrapper(*args, **kwargs):
        TraceIntelPTTestCaseBase.USE_SB_API = True
        func(*args, **kwargs)
        TraceIntelPTTestCaseBase.USE_SB_API = False
        func(*args, **kwargs)

    return wrapper


# Class that should be used by all python Intel PT tests.
#
# It has a handy check that skips the test if the intel-pt plugin is not enabled.
#
# It also contains many functions that can test both the SB API or the command line version
# of the most important tracing actions.
class TraceIntelPTTestCaseBase(TestBase):
    NO_DEBUG_INFO_TESTCASE = True

    # If True, the trace test methods will use the SB API, otherwise they'll use raw commands.
    USE_SB_API = False

    def setUp(self):
        TestBase.setUp(self)
        if "intel-pt" not in configuration.enabled_plugins:
            self.skipTest("The intel-pt test plugin is not enabled")

    def skipIfPerCpuTracingIsNotSupported(self):
        def is_supported():
            try:
                with open("/proc/sys/kernel/perf_event_paranoid", "r") as permissions:
                    value = int(permissions.readlines()[0])
                    if value <= 0:
                        return True
            except:
                return False

        if not is_supported():
            self.skipTest(
                "Per cpu tracing is not supported. You need "
                "/proc/sys/kernel/perf_event_paranoid to be 0 or -1. "
                "You can use `sudo sysctl -w kernel.perf_event_paranoid=-1` for that."
            )

    def getTraceOrCreate(self):
        if not self.target().GetTrace().IsValid():
            error = lldb.SBError()
            self.target().CreateTrace(error)
        return self.target().GetTrace()

    def assertSBError(self, sberror, error=False):
        if error:
            self.assertTrue(sberror.Fail())
        else:
            self.assertSuccess(sberror)

    def createConfiguration(
        self,
        iptTraceSize=None,
        processBufferSizeLimit=None,
        enableTsc=False,
        psbPeriod=None,
        perCpuTracing=False,
    ):
        obj = {}
        if processBufferSizeLimit is not None:
            obj["processBufferSizeLimit"] = processBufferSizeLimit
        if iptTraceSize is not None:
            obj["iptTraceSize"] = iptTraceSize
        if psbPeriod is not None:
            obj["psbPeriod"] = psbPeriod
        obj["enableTsc"] = enableTsc
        obj["perCpuTracing"] = perCpuTracing

        configuration = lldb.SBStructuredData()
        configuration.SetFromJSON(json.dumps(obj))
        return configuration

    def traceStartThread(
        self,
        thread=None,
        error=False,
        substrs=None,
        iptTraceSize=None,
        enableTsc=False,
        psbPeriod=None,
    ):
        if self.USE_SB_API:
            trace = self.getTraceOrCreate()
            thread = thread if thread is not None else self.thread()
            configuration = self.createConfiguration(
                iptTraceSize=iptTraceSize, enableTsc=enableTsc, psbPeriod=psbPeriod
            )
            self.assertSBError(trace.Start(thread, configuration), error)
        else:
            command = "thread trace start"
            if thread is not None:
                command += " " + str(thread.GetIndexID())
            if iptTraceSize is not None:
                command += " -s " + str(iptTraceSize)
            if enableTsc:
                command += " --tsc"
            if psbPeriod is not None:
                command += " --psb-period " + str(psbPeriod)
            self.expect(command, error=error, substrs=substrs)

    def traceStartProcess(
        self,
        processBufferSizeLimit=None,
        error=False,
        substrs=None,
        enableTsc=False,
        psbPeriod=None,
        perCpuTracing=False,
    ):
        if self.USE_SB_API:
            trace = self.getTraceOrCreate()
            configuration = self.createConfiguration(
                processBufferSizeLimit=processBufferSizeLimit,
                enableTsc=enableTsc,
                psbPeriod=psbPeriod,
                perCpuTracing=perCpuTracing,
            )
            self.assertSBError(trace.Start(configuration), error=error)
        else:
            command = "process trace start"
            if processBufferSizeLimit is not None:
                command += " -l " + str(processBufferSizeLimit)
            if enableTsc:
                command += " --tsc"
            if psbPeriod is not None:
                command += " --psb-period " + str(psbPeriod)
            if perCpuTracing:
                command += " --per-cpu-tracing"
            self.expect(command, error=error, substrs=substrs)

    def traceStopProcess(self):
        if self.USE_SB_API:
            self.assertSuccess(self.target().GetTrace().Stop())
        else:
            self.expect("process trace stop")

    def traceStopThread(self, thread=None, error=False, substrs=None):
        if self.USE_SB_API:
            thread = thread if thread is not None else self.thread()
            self.assertSBError(self.target().GetTrace().Stop(thread), error)

        else:
            command = "thread trace stop"
            if thread is not None:
                command += " " + str(thread.GetIndexID())
            self.expect(command, error=error, substrs=substrs)

    def traceLoad(self, traceDescriptionFilePath, error=False, substrs=None):
        if self.USE_SB_API:
            traceDescriptionFile = lldb.SBFileSpec(traceDescriptionFilePath, True)
            loadTraceError = lldb.SBError()
            self.dbg.LoadTraceFromFile(loadTraceError, traceDescriptionFile)
            self.assertSBError(loadTraceError, error)
        else:
            command = f"trace load -v {traceDescriptionFilePath}"
            self.expect(command, error=error, substrs=substrs)

    def traceSave(self, traceBundleDir, compact=False, error=False, substrs=None):
        if self.USE_SB_API:
            save_error = lldb.SBError()
            self.target().GetTrace().SaveToDisk(
                save_error, lldb.SBFileSpec(traceBundleDir), compact
            )
            self.assertSBError(save_error, error)
        else:
            command = f"trace save {traceBundleDir}"
            if compact:
                command += " -c"
            self.expect(command, error=error, substrs=substrs)