import os, json, struct, signal
from typing import Any, Dict
import lldb
from lldb.plugins.scripted_process import ScriptedProcess
from lldb.plugins.scripted_process import ScriptedThread
class StackCoreScriptedProcess(ScriptedProcess):
def get_module_with_name(self, target, name):
for module in target.modules:
if name in module.GetFileSpec().GetFilename():
return module
return None
def __init__(self, exe_ctx: lldb.SBExecutionContext, args: lldb.SBStructuredData):
super().__init__(exe_ctx, args)
self.corefile_target = None
self.corefile_process = None
self.backing_target_idx = args.GetValueForKey("backing_target_idx")
if self.backing_target_idx and self.backing_target_idx.IsValid():
if self.backing_target_idx.GetType() == lldb.eStructuredDataTypeInteger:
idx = self.backing_target_idx.GetIntegerValue(42)
if self.backing_target_idx.GetType() == lldb.eStructuredDataTypeString:
idx = int(self.backing_target_idx.GetStringValue(100))
self.corefile_target = self.target.GetDebugger().GetTargetAtIndex(idx)
self.corefile_process = self.corefile_target.GetProcess()
for corefile_thread in self.corefile_process:
structured_data = lldb.SBStructuredData()
structured_data.SetFromJSON(
json.dumps(
{
"backing_target_idx": idx,
"thread_idx": corefile_thread.GetIndexID(),
}
)
)
self.threads[corefile_thread.GetThreadID()] = StackCoreScriptedThread(
self, structured_data
)
if len(self.threads) == 2:
self.threads[len(self.threads) - 1].is_stopped = True
corefile_module = self.get_module_with_name(
self.corefile_target, "libbaz.dylib"
)
if not corefile_module or not corefile_module.IsValid():
return
module_path = os.path.join(
corefile_module.GetFileSpec().GetDirectory(),
corefile_module.GetFileSpec().GetFilename(),
)
if not os.path.exists(module_path):
return
module_load_addr = corefile_module.GetObjectFileHeaderAddress().GetLoadAddress(
self.corefile_target
)
self.loaded_images.append({"path": module_path, "load_addr": module_load_addr})
def get_memory_region_containing_address(
self, addr: int
) -> lldb.SBMemoryRegionInfo:
mem_region = lldb.SBMemoryRegionInfo()
error = self.corefile_process.GetMemoryRegionInfo(addr, mem_region)
if error.Fail():
return None
return mem_region
def read_memory_at_address(
self, addr: int, size: int, error: lldb.SBError
) -> lldb.SBData:
data = lldb.SBData()
bytes_read = self.corefile_process.ReadMemory(addr, size, error)
if error.Fail():
return data
data.SetDataWithOwnership(
error,
bytes_read,
self.corefile_target.GetByteOrder(),
self.corefile_target.GetAddressByteSize(),
)
return data
def get_loaded_images(self):
return self.loaded_images
def get_process_id(self) -> int:
return 42
def should_stop(self) -> bool:
return True
def is_alive(self) -> bool:
return True
def get_scripted_thread_plugin(self):
return (
StackCoreScriptedThread.__module__ + "." + StackCoreScriptedThread.__name__
)
class StackCoreScriptedThread(ScriptedThread):
def __init__(self, process, args):
super().__init__(process, args)
backing_target_idx = args.GetValueForKey("backing_target_idx")
thread_idx = args.GetValueForKey("thread_idx")
self.is_stopped = False
def extract_value_from_structured_data(data, default_val):
if data and data.IsValid():
if data.GetType() == lldb.eStructuredDataTypeInteger:
return data.GetIntegerValue(default_val)
if data.GetType() == lldb.eStructuredDataTypeString:
return int(data.GetStringValue(100))
return None
# TODO: Change to Walrus operator (:=) with oneline if assignment
# Requires python 3.8
val = extract_value_from_structured_data(thread_idx, 0)
if val is not None:
self.idx = val
self.corefile_target = None
self.corefile_process = None
self.corefile_thread = None
# TODO: Change to Walrus operator (:=) with oneline if assignment
# Requires python 3.8
val = extract_value_from_structured_data(backing_target_idx, 42)
if val is not None:
self.corefile_target = self.target.GetDebugger().GetTargetAtIndex(val)
self.corefile_process = self.corefile_target.GetProcess()
self.corefile_thread = self.corefile_process.GetThreadByIndexID(self.idx)
if self.corefile_thread:
self.id = self.corefile_thread.GetThreadID()
def get_thread_id(self) -> int:
return self.id
def get_name(self) -> str:
return StackCoreScriptedThread.__name__ + ".thread-" + str(self.id)
def get_stop_reason(self) -> Dict[str, Any]:
stop_reason = {"type": lldb.eStopReasonInvalid, "data": {}}
if (
self.corefile_thread
and self.corefile_thread.IsValid()
and self.get_thread_id() == self.corefile_thread.GetThreadID()
):
stop_reason["type"] = lldb.eStopReasonNone
if self.is_stopped:
if "arm64" in self.scripted_process.arch:
stop_reason["type"] = lldb.eStopReasonException
stop_reason["data"][
"desc"
] = self.corefile_thread.GetStopDescription(100)
elif self.scripted_process.arch == "x86_64":
stop_reason["type"] = lldb.eStopReasonSignal
stop_reason["data"]["signal"] = signal.SIGTRAP
else:
stop_reason["type"] = self.corefile_thread.GetStopReason()
return stop_reason
def get_register_context(self) -> str:
if not self.corefile_thread or self.corefile_thread.GetNumFrames() == 0:
return None
frame = self.corefile_thread.GetFrameAtIndex(0)
GPRs = None
registerSet = frame.registers # Returns an SBValueList.
for regs in registerSet:
if "general purpose" in regs.name.lower():
GPRs = regs
break
if not GPRs:
return None
for reg in GPRs:
self.register_ctx[reg.name] = int(reg.value, base=16)
return struct.pack(
"{}Q".format(len(self.register_ctx)), *self.register_ctx.values()
)
def __lldb_init_module(debugger, dict):
if not "SKIP_SCRIPTED_PROCESS_LAUNCH" in os.environ:
debugger.HandleCommand(
"process launch -C %s.%s" % (__name__, StackCoreScriptedProcess.__name__)
)
else:
print(
"Name of the class that will manage the scripted process: '%s.%s'"
% (__name__, StackCoreScriptedProcess.__name__)
)