#!/usr/bin/env python
# ----------------------------------------------------------------------
# Be sure to add the python path that points to the LLDB shared library.
# On MacOSX csh, tcsh:
# setenv PYTHONPATH /Applications/Xcode.app/Contents/SharedFrameworks/LLDB.framework/Resources/Python
# On MacOSX sh, bash:
# export PYTHONPATH=/Applications/Xcode.app/Contents/SharedFrameworks/LLDB.framework/Resources/Python
# ----------------------------------------------------------------------
import optparse
import os
import platform
import re
import resource
import sys
import subprocess
import time
import types
# ----------------------------------------------------------------------
# Code that auto imports LLDB
# ----------------------------------------------------------------------
try:
# Just try for LLDB in case PYTHONPATH is already correctly setup
import lldb
except ImportError:
lldb_python_dirs = list()
# lldb is not in the PYTHONPATH, try some defaults for the current platform
platform_system = platform.system()
if platform_system == "Darwin":
# On Darwin, try the currently selected Xcode directory
xcode_dir = subprocess.check_output("xcode-select --print-path", shell=True)
if xcode_dir:
lldb_python_dirs.append(
os.path.realpath(
xcode_dir + "/../SharedFrameworks/LLDB.framework/Resources/Python"
)
)
lldb_python_dirs.append(
xcode_dir + "/Library/PrivateFrameworks/LLDB.framework/Resources/Python"
)
lldb_python_dirs.append(
"/System/Library/PrivateFrameworks/LLDB.framework/Resources/Python"
)
success = False
for lldb_python_dir in lldb_python_dirs:
if os.path.exists(lldb_python_dir):
if not (sys.path.__contains__(lldb_python_dir)):
sys.path.append(lldb_python_dir)
try:
import lldb
except ImportError:
pass
else:
print('imported lldb from: "%s"' % (lldb_python_dir))
success = True
break
if not success:
print(
"error: couldn't locate the 'lldb' module, please set PYTHONPATH correctly"
)
sys.exit(1)
class Timer:
def __enter__(self):
self.start = time.clock()
return self
def __exit__(self, *args):
self.end = time.clock()
self.interval = self.end - self.start
class Action(object):
"""Class that encapsulates actions to take when a thread stops for a reason."""
def __init__(self, callback=None, callback_owner=None):
self.callback = callback
self.callback_owner = callback_owner
def ThreadStopped(self, thread):
assert (
False
), "performance.Action.ThreadStopped(self, thread) must be overridden in a subclass"
class PlanCompleteAction(Action):
def __init__(self, callback=None, callback_owner=None):
Action.__init__(self, callback, callback_owner)
def ThreadStopped(self, thread):
if thread.GetStopReason() == lldb.eStopReasonPlanComplete:
if self.callback:
if self.callback_owner:
self.callback(self.callback_owner, thread)
else:
self.callback(thread)
return True
return False
class BreakpointAction(Action):
def __init__(
self,
callback=None,
callback_owner=None,
name=None,
module=None,
file=None,
line=None,
breakpoint=None,
):
Action.__init__(self, callback, callback_owner)
self.modules = lldb.SBFileSpecList()
self.files = lldb.SBFileSpecList()
self.breakpoints = list()
# "module" can be a list or a string
if breakpoint:
self.breakpoints.append(breakpoint)
else:
if module:
if isinstance(module, types.ListType):
for module_path in module:
self.modules.Append(lldb.SBFileSpec(module_path, False))
elif isinstance(module, types.StringTypes):
self.modules.Append(lldb.SBFileSpec(module, False))
if name:
# "file" can be a list or a string
if file:
if isinstance(file, types.ListType):
self.files = lldb.SBFileSpecList()
for f in file:
self.files.Append(lldb.SBFileSpec(f, False))
elif isinstance(file, types.StringTypes):
self.files.Append(lldb.SBFileSpec(file, False))
self.breakpoints.append(
self.target.BreakpointCreateByName(name, self.modules, self.files)
)
elif file and line:
self.breakpoints.append(
self.target.BreakpointCreateByLocation(file, line)
)
def ThreadStopped(self, thread):
if thread.GetStopReason() == lldb.eStopReasonBreakpoint:
for bp in self.breakpoints:
if bp.GetID() == thread.GetStopReasonDataAtIndex(0):
if self.callback:
if self.callback_owner:
self.callback(self.callback_owner, thread)
else:
self.callback(thread)
return True
return False
class TestCase:
"""Class that aids in running performance tests."""
def __init__(self):
self.verbose = False
self.debugger = lldb.SBDebugger.Create()
self.target = None
self.process = None
self.thread = None
self.launch_info = None
self.done = False
self.listener = self.debugger.GetListener()
self.user_actions = list()
self.builtin_actions = list()
self.bp_id_to_dict = dict()
def Setup(self, args):
self.launch_info = lldb.SBLaunchInfo(args)
def Run(self, args):
assert False, "performance.TestCase.Run(self, args) must be subclassed"
def Launch(self):
if self.target:
error = lldb.SBError()
self.process = self.target.Launch(self.launch_info, error)
if not error.Success():
print("error: %s" % error.GetCString())
if self.process:
self.process.GetBroadcaster().AddListener(
self.listener,
lldb.SBProcess.eBroadcastBitStateChanged
| lldb.SBProcess.eBroadcastBitInterrupt,
)
return True
return False
def WaitForNextProcessEvent(self):
event = None
if self.process:
while event is None:
process_event = lldb.SBEvent()
if self.listener.WaitForEvent(lldb.UINT32_MAX, process_event):
state = lldb.SBProcess.GetStateFromEvent(process_event)
if self.verbose:
print("event = %s" % (lldb.SBDebugger.StateAsCString(state)))
if lldb.SBProcess.GetRestartedFromEvent(process_event):
continue
if (
state == lldb.eStateInvalid
or state == lldb.eStateDetached
or state == lldb.eStateCrashed
or state == lldb.eStateUnloaded
or state == lldb.eStateExited
):
event = process_event
self.done = True
elif (
state == lldb.eStateConnected
or state == lldb.eStateAttaching
or state == lldb.eStateLaunching
or state == lldb.eStateRunning
or state == lldb.eStateStepping
or state == lldb.eStateSuspended
):
continue
elif state == lldb.eStateStopped:
event = process_event
call_test_step = True
fatal = False
selected_thread = False
for thread in self.process:
frame = thread.GetFrameAtIndex(0)
select_thread = False
stop_reason = thread.GetStopReason()
if self.verbose:
print(
"tid = %#x pc = %#x "
% (thread.GetThreadID(), frame.GetPC()),
end=" ",
)
if stop_reason == lldb.eStopReasonNone:
if self.verbose:
print("none")
elif stop_reason == lldb.eStopReasonTrace:
select_thread = True
if self.verbose:
print("trace")
elif stop_reason == lldb.eStopReasonPlanComplete:
select_thread = True
if self.verbose:
print("plan complete")
elif stop_reason == lldb.eStopReasonThreadExiting:
if self.verbose:
print("thread exiting")
elif stop_reason == lldb.eStopReasonExec:
if self.verbose:
print("exec")
elif stop_reason == lldb.eStopReasonInvalid:
if self.verbose:
print("invalid")
elif stop_reason == lldb.eStopReasonException:
select_thread = True
if self.verbose:
print("exception")
fatal = True
elif stop_reason == lldb.eStopReasonBreakpoint:
select_thread = True
bp_id = thread.GetStopReasonDataAtIndex(0)
bp_loc_id = thread.GetStopReasonDataAtIndex(1)
if self.verbose:
print("breakpoint id = %d.%d" % (bp_id, bp_loc_id))
elif stop_reason == lldb.eStopReasonWatchpoint:
select_thread = True
if self.verbose:
print(
"watchpoint id = %d"
% (thread.GetStopReasonDataAtIndex(0))
)
elif stop_reason == lldb.eStopReasonSignal:
select_thread = True
if self.verbose:
print(
"signal %d"
% (thread.GetStopReasonDataAtIndex(0))
)
elif stop_reason == lldb.eStopReasonFork:
if self.verbose:
print(
"fork pid = %d"
% (thread.GetStopReasonDataAtIndex(0))
)
elif stop_reason == lldb.eStopReasonVFork:
if self.verbose:
print(
"vfork pid = %d"
% (thread.GetStopReasonDataAtIndex(0))
)
elif stop_reason == lldb.eStopReasonVForkDone:
if self.verbose:
print("vfork done")
if select_thread and not selected_thread:
self.thread = thread
selected_thread = self.process.SetSelectedThread(thread)
for action in self.user_actions:
action.ThreadStopped(thread)
if fatal:
# if self.verbose:
# Xcode.RunCommand(self.debugger,"bt all",true)
sys.exit(1)
return event
class Measurement:
"""A class that encapsulates a measurement"""
def __init__(self):
object.__init__(self)
def Measure(self):
assert False, "performance.Measurement.Measure() must be subclassed"
class MemoryMeasurement(Measurement):
"""A class that can measure memory statistics for a process."""
def __init__(self, pid):
Measurement.__init__(self)
self.pid = pid
self.stats = [
"rprvt",
"rshrd",
"rsize",
"vsize",
"vprvt",
"kprvt",
"kshrd",
"faults",
"cow",
"pageins",
]
self.command = "top -l 1 -pid %u -stats %s" % (self.pid, ",".join(self.stats))
self.value = dict()
def Measure(self):
output = subprocess.getoutput(self.command).split("\n")[-1]
values = re.split("[-+\s]+", output)
for idx, stat in enumerate(values):
multiplier = 1
if stat:
if stat[-1] == "K":
multiplier = 1024
stat = stat[:-1]
elif stat[-1] == "M":
multiplier = 1024 * 1024
stat = stat[:-1]
elif stat[-1] == "G":
multiplier = 1024 * 1024 * 1024
elif stat[-1] == "T":
multiplier = 1024 * 1024 * 1024 * 1024
stat = stat[:-1]
self.value[self.stats[idx]] = int(stat) * multiplier
def __str__(self):
"""Dump the MemoryMeasurement current value"""
s = ""
for key in self.value.keys():
if s:
s += "\n"
s += "%8s = %s" % (key, self.value[key])
return s
class TesterTestCase(TestCase):
def __init__(self):
TestCase.__init__(self)
self.verbose = True
self.num_steps = 5
def BreakpointHit(self, thread):
bp_id = thread.GetStopReasonDataAtIndex(0)
loc_id = thread.GetStopReasonDataAtIndex(1)
print(
"Breakpoint %i.%i hit: %s"
% (bp_id, loc_id, thread.process.target.FindBreakpointByID(bp_id))
)
thread.StepOver()
def PlanComplete(self, thread):
if self.num_steps > 0:
thread.StepOver()
self.num_steps = self.num_steps - 1
else:
thread.process.Kill()
def Run(self, args):
self.Setup(args)
with Timer() as total_time:
self.target = self.debugger.CreateTarget(args[0])
if self.target:
with Timer() as breakpoint_timer:
bp = self.target.BreakpointCreateByName("main")
print("Breakpoint time = %.03f sec." % breakpoint_timer.interval)
self.user_actions.append(
BreakpointAction(
breakpoint=bp,
callback=TesterTestCase.BreakpointHit,
callback_owner=self,
)
)
self.user_actions.append(
PlanCompleteAction(
callback=TesterTestCase.PlanComplete, callback_owner=self
)
)
if self.Launch():
while not self.done:
self.WaitForNextProcessEvent()
else:
print("error: failed to launch process")
else:
print("error: failed to create target with '%s'" % (args[0]))
print("Total time = %.03f sec." % total_time.interval)
if __name__ == "__main__":
lldb.SBDebugger.Initialize()
test = TesterTestCase()
test.Run(sys.argv[1:])
mem = MemoryMeasurement(os.getpid())
mem.Measure()
print(str(mem))
lldb.SBDebugger.Terminate()
# print "sleeeping for 100 seconds"
# time.sleep(100)