import lldb
import threading
local_data = None
class BarrierContainer(threading.local):
def __init__(self, before_interrupt_barrier, after_interrupt_barrier, event):
self.event = event
self.before_interrupt_barrier = before_interrupt_barrier
self.after_interrupt_barrier = after_interrupt_barrier
class WelcomeCommand(object):
def __init__(self, debugger, session_dict):
return
def get_short_help(self):
return "A command that waits for an interrupt before returning."
def check_was_interrupted(self, debugger, use_interpreter):
if use_interpreter:
self.was_interrupted = debugger.GetCommandInterpreter().WasInterrupted()
else:
self.was_interrupted = debugger.InterruptRequested()
if local_data.event:
self.was_canceled = local_data.event.is_set()
def __call__(self, debugger, args, exe_ctx, result):
"""Command arguments:
{interp/debugger} - Whether to use SBCommandInterpreter::WasInterrupted
of SBDebugger::InterruptRequested().
check - Don't do the rendevous, just check if an interrupt was requested.
If check is not provided, we'll do the lock and then check.
poll - Should we poll once after the rendevous or spin waiting for the
interruption to happen.
For the interrupt cases, the command waits serially on the barriers
passed to it in local data, giving the test runner a chance to set the
interrupt. Once the barriers are passed, it waits for the interrupt
or the event.
If it finds an interrupt, it returns "Command was interrupted". If it gets an
event before seeing the interrupt it returns "Command was not interrupted."
For the "poll" case, it waits on the rendevous, then checks once.
For the "check" case, it doesn't wait, but just returns whether there was
an interrupt in force or not."""
if local_data is None:
result.SetError("local data was not set.")
result.SetStatus(lldb.eReturnStatusFailed)
return
use_interpreter = "interp" in args
if not use_interpreter:
if not "debugger" in args:
result.SetError("Must pass either 'interp' or 'debugger'")
result.SetStatus(lldb.eReturnStatusFailed)
return
self.was_interrupted = False
self.was_canceled = False
if "check" in args:
self.check_was_interrupted(debugger, use_interpreter)
if self.was_interrupted:
result.Print("Command was interrupted")
else:
result.Print("Command was not interrupted")
else:
# Wait here to rendevous in the test before it sets the interrupt.
local_data.before_interrupt_barrier.wait()
# Now the test will set the interrupt, and we can continue:
local_data.after_interrupt_barrier.wait()
if "poll" in args:
self.check_was_interrupted(debugger, use_interpreter)
else:
while not self.was_interrupted and not self.was_canceled:
self.check_was_interrupted(debugger, use_interpreter)
if self.was_interrupted:
result.Print("Command was interrupted")
else:
result.Print("Command was not interrupted")
if self.was_canceled:
result.Print("Command was canceled")
result.SetStatus(lldb.eReturnStatusSuccessFinishResult)
return True