from dataclasses import dataclass, field
import itertools
import lexer
import parser
import re
from typing import Optional
@dataclass
class Properties:
escaping_calls: dict[lexer.Token, tuple[lexer.Token, lexer.Token]]
error_with_pop: bool
error_without_pop: bool
deopts: bool
oparg: bool
jumps: bool
eval_breaker: bool
needs_this: bool
always_exits: bool
stores_sp: bool
uses_co_consts: bool
uses_co_names: bool
uses_locals: bool
has_free: bool
side_exit: bool
pure: bool
tier: int | None = None
oparg_and_1: bool = False
const_oparg: int = -1
needs_prev: bool = False
def dump(self, indent: str) -> None:
simple_properties = self.__dict__.copy()
del simple_properties["escaping_calls"]
text = "escaping_calls:\n"
for tkns in self.escaping_calls.values():
text += f"{indent} {tkns}\n"
text += ", ".join([f"{key}: {value}" for (key, value) in simple_properties.items()])
print(indent, text, sep="")
@staticmethod
def from_list(properties: list["Properties"]) -> "Properties":
escaping_calls: dict[lexer.Token, tuple[lexer.Token, lexer.Token]] = {}
for p in properties:
escaping_calls.update(p.escaping_calls)
return Properties(
escaping_calls=escaping_calls,
error_with_pop=any(p.error_with_pop for p in properties),
error_without_pop=any(p.error_without_pop for p in properties),
deopts=any(p.deopts for p in properties),
oparg=any(p.oparg for p in properties),
jumps=any(p.jumps for p in properties),
eval_breaker=any(p.eval_breaker for p in properties),
needs_this=any(p.needs_this for p in properties),
always_exits=any(p.always_exits for p in properties),
stores_sp=any(p.stores_sp for p in properties),
uses_co_consts=any(p.uses_co_consts for p in properties),
uses_co_names=any(p.uses_co_names for p in properties),
uses_locals=any(p.uses_locals for p in properties),
has_free=any(p.has_free for p in properties),
side_exit=any(p.side_exit for p in properties),
pure=all(p.pure for p in properties),
needs_prev=any(p.needs_prev for p in properties),
)
@property
def infallible(self) -> bool:
return not self.error_with_pop and not self.error_without_pop
@property
def escapes(self) -> bool:
return bool(self.escaping_calls)
SKIP_PROPERTIES = Properties(
escaping_calls={},
error_with_pop=False,
error_without_pop=False,
deopts=False,
oparg=False,
jumps=False,
eval_breaker=False,
needs_this=False,
always_exits=False,
stores_sp=False,
uses_co_consts=False,
uses_co_names=False,
uses_locals=False,
has_free=False,
side_exit=False,
pure=True,
)
@dataclass
class Skip:
"Unused cache entry"
size: int
@property
def name(self) -> str:
return f"unused/{self.size}"
@property
def properties(self) -> Properties:
return SKIP_PROPERTIES
class Flush:
@property
def properties(self) -> Properties:
return SKIP_PROPERTIES
@property
def name(self) -> str:
return "flush"
@property
def size(self) -> int:
return 0
@dataclass
class StackItem:
name: str
type: str | None
condition: str | None
size: str
peek: bool = False
used: bool = False
def __str__(self) -> str:
cond = f" if ({self.condition})" if self.condition else ""
size = f"[{self.size}]" if self.size else ""
type = "" if self.type is None else f"{self.type} "
return f"{type}{self.name}{size}{cond} {self.peek}"
def is_array(self) -> bool:
return self.size != ""
def get_size(self) -> str:
return self.size if self.size else "1"
@dataclass
class StackEffect:
inputs: list[StackItem]
outputs: list[StackItem]
def __str__(self) -> str:
return f"({', '.join([str(i) for i in self.inputs])} -- {', '.join([str(i) for i in self.outputs])})"
@dataclass
class CacheEntry:
name: str
size: int
def __str__(self) -> str:
return f"{self.name}/{self.size}"
@dataclass
class Uop:
name: str
context: parser.Context | None
annotations: list[str]
stack: StackEffect
caches: list[CacheEntry]
deferred_refs: dict[lexer.Token, str | None]
output_stores: list[lexer.Token]
body: list[lexer.Token]
properties: Properties
_size: int = -1
implicitly_created: bool = False
replicated = 0
replicates: "Uop | None" = None
# Size of the instruction(s), only set for uops containing the INSTRUCTION_SIZE macro
instruction_size: int | None = None
def dump(self, indent: str) -> None:
print(
indent, self.name, ", ".join(self.annotations) if self.annotations else ""
)
print(indent, self.stack, ", ".join([str(c) for c in self.caches]))
self.properties.dump(" " + indent)
@property
def size(self) -> int:
if self._size < 0:
self._size = sum(c.size for c in self.caches)
return self._size
def why_not_viable(self) -> str | None:
if self.name == "_SAVE_RETURN_OFFSET":
return None # Adjusts next_instr, but only in tier 1 code
if "INSTRUMENTED" in self.name:
return "is instrumented"
if "replaced" in self.annotations:
return "is replaced"
if self.name in ("INTERPRETER_EXIT", "JUMP_BACKWARD"):
return "has tier 1 control flow"
if self.properties.needs_this:
return "uses the 'this_instr' variable"
if len([c for c in self.caches if c.name != "unused"]) > 2:
return "has unused cache entries"
if self.properties.error_with_pop and self.properties.error_without_pop:
return "has both popping and not-popping errors"
return None
def is_viable(self) -> bool:
return self.why_not_viable() is None
def is_super(self) -> bool:
for tkn in self.body:
if tkn.kind == "IDENTIFIER" and tkn.text == "oparg1":
return True
return False
Part = Uop | Skip | Flush
@dataclass
class Instruction:
where: lexer.Token
name: str
parts: list[Part]
_properties: Properties | None
is_target: bool = False
family: Optional["Family"] = None
opcode: int = -1
@property
def properties(self) -> Properties:
if self._properties is None:
self._properties = self._compute_properties()
return self._properties
def _compute_properties(self) -> Properties:
return Properties.from_list([part.properties for part in self.parts])
def dump(self, indent: str) -> None:
print(indent, self.name, "=", ", ".join([part.name for part in self.parts]))
self.properties.dump(" " + indent)
@property
def size(self) -> int:
return 1 + sum(part.size for part in self.parts)
def is_super(self) -> bool:
if len(self.parts) != 1:
return False
uop = self.parts[0]
if isinstance(uop, Uop):
return uop.is_super()
else:
return False
@dataclass
class PseudoInstruction:
name: str
stack: StackEffect
targets: list[Instruction]
as_sequence: bool
flags: list[str]
opcode: int = -1
def dump(self, indent: str) -> None:
print(indent, self.name, "->", " or ".join([t.name for t in self.targets]))
@property
def properties(self) -> Properties:
return Properties.from_list([i.properties for i in self.targets])
@dataclass
class Family:
name: str
size: str
members: list[Instruction]
def dump(self, indent: str) -> None:
print(indent, self.name, "= ", ", ".join([m.name for m in self.members]))
@dataclass
class Analysis:
instructions: dict[str, Instruction]
uops: dict[str, Uop]
families: dict[str, Family]
pseudos: dict[str, PseudoInstruction]
opmap: dict[str, int]
have_arg: int
min_instrumented: int
def analysis_error(message: str, tkn: lexer.Token) -> SyntaxError:
# To do -- support file and line output
# Construct a SyntaxError instance from message and token
return lexer.make_syntax_error(message, tkn.filename, tkn.line, tkn.column, "")
def override_error(
name: str,
context: parser.Context | None,
prev_context: parser.Context | None,
token: lexer.Token,
) -> SyntaxError:
return analysis_error(
f"Duplicate definition of '{name}' @ {context} "
f"previous definition @ {prev_context}",
token,
)
def convert_stack_item(
item: parser.StackEffect, replace_op_arg_1: str | None
) -> StackItem:
cond = item.cond
if replace_op_arg_1 and OPARG_AND_1.match(item.cond):
cond = replace_op_arg_1
return StackItem(item.name, item.type, cond, item.size)
def analyze_stack(
op: parser.InstDef | parser.Pseudo, replace_op_arg_1: str | None = None
) -> StackEffect:
inputs: list[StackItem] = [
convert_stack_item(i, replace_op_arg_1)
for i in op.inputs
if isinstance(i, parser.StackEffect)
]
outputs: list[StackItem] = [
convert_stack_item(i, replace_op_arg_1) for i in op.outputs
]
# Mark variables with matching names at the base of the stack as "peek"
modified = False
input_names: dict[str, lexer.Token] = { i.name : i.first_token for i in op.inputs if i.name != "unused" }
for input, output in itertools.zip_longest(inputs, outputs):
if output is None:
pass
elif input is None:
if output.name in input_names:
raise analysis_error(
f"Reuse of variable '{output.name}' at different stack location",
input_names[output.name])
elif input.name == output.name:
if not modified:
input.peek = output.peek = True
else:
modified = True
if output.name in input_names:
raise analysis_error(
f"Reuse of variable '{output.name}' at different stack location",
input_names[output.name])
if isinstance(op, parser.InstDef):
output_names = [out.name for out in outputs]
for input in inputs:
if (
variable_used(op, input.name)
or variable_used(op, "DECREF_INPUTS")
or (not input.peek and input.name in output_names)
):
input.used = True
for output in outputs:
if variable_used(op, output.name):
output.used = True
return StackEffect(inputs, outputs)
def analyze_caches(inputs: list[parser.InputEffect]) -> list[CacheEntry]:
caches: list[parser.CacheEffect] = [
i for i in inputs if isinstance(i, parser.CacheEffect)
]
for cache in caches:
if cache.name == "unused":
raise analysis_error(
"Unused cache entry in op. Move to enclosing macro.", cache.tokens[0]
)
return [CacheEntry(i.name, int(i.size)) for i in caches]
def find_assignment_target(node: parser.InstDef, idx: int) -> list[lexer.Token]:
"""Find the tokens that make up the left-hand side of an assignment"""
offset = 0
for tkn in reversed(node.block.tokens[: idx]):
if tkn.kind in {"SEMI", "LBRACE", "RBRACE"}:
return node.block.tokens[idx - offset : idx]
offset += 1
return []
def find_stores_outputs(node: parser.InstDef) -> list[lexer.Token]:
res: list[lexer.Token] = []
outnames = { out.name for out in node.outputs }
innames = { out.name for out in node.inputs }
for idx, tkn in enumerate(node.block.tokens):
if tkn.kind == "AND":
name = node.block.tokens[idx+1]
if name.text in outnames:
res.append(name)
if tkn.kind != "EQUALS":
continue
lhs = find_assignment_target(node, idx)
assert lhs
while lhs and lhs[0].kind == "COMMENT":
lhs = lhs[1:]
if len(lhs) != 1 or lhs[0].kind != "IDENTIFIER":
continue
name = lhs[0]
if name.text in innames:
raise analysis_error(f"Cannot assign to input variable '{name.text}'", name)
if name.text in outnames:
res.append(name)
return res
def analyze_deferred_refs(node: parser.InstDef) -> dict[lexer.Token, str | None]:
"""Look for PyStackRef_FromPyObjectNew() calls"""
def in_frame_push(idx: int) -> bool:
for tkn in reversed(node.block.tokens[: idx - 1]):
if tkn.kind in {"SEMI", "LBRACE", "RBRACE"}:
return False
if tkn.kind == "IDENTIFIER" and tkn.text == "_PyFrame_PushUnchecked":
return True
return False
refs: dict[lexer.Token, str | None] = {}
for idx, tkn in enumerate(node.block.tokens):
if tkn.kind != "IDENTIFIER" or tkn.text != "PyStackRef_FromPyObjectNew":
continue
if idx == 0 or node.block.tokens[idx - 1].kind != "EQUALS":
if in_frame_push(idx):
# PyStackRef_FromPyObjectNew() is called in _PyFrame_PushUnchecked()
refs[tkn] = None
continue
raise analysis_error("Expected '=' before PyStackRef_FromPyObjectNew", tkn)
lhs = find_assignment_target(node, idx - 1)
if len(lhs) == 0:
raise analysis_error(
"PyStackRef_FromPyObjectNew() must be assigned to an output", tkn
)
if lhs[0].kind == "TIMES" or any(
t.kind == "ARROW" or t.kind == "LBRACKET" for t in lhs[1:]
):
# Don't handle: *ptr = ..., ptr->field = ..., or ptr[field] = ...
# Assume that they are visible to the GC.
refs[tkn] = None
continue
if len(lhs) != 1 or lhs[0].kind != "IDENTIFIER":
raise analysis_error(
"PyStackRef_FromPyObjectNew() must be assigned to an output", tkn
)
name = lhs[0].text
match = (
any(var.name == name for var in node.inputs)
or any(var.name == name for var in node.outputs)
)
if not match:
raise analysis_error(
f"PyStackRef_FromPyObjectNew() must be assigned to an input or output, not '{name}'",
tkn,
)
refs[tkn] = name
return refs
def variable_used(node: parser.InstDef, name: str) -> bool:
"""Determine whether a variable with a given name is used in a node."""
return any(
token.kind == "IDENTIFIER" and token.text == name for token in node.block.tokens
)
def oparg_used(node: parser.InstDef) -> bool:
"""Determine whether `oparg` is used in a node."""
return any(
token.kind == "IDENTIFIER" and token.text == "oparg" for token in node.tokens
)
def tier_variable(node: parser.InstDef) -> int | None:
"""Determine whether a tier variable is used in a node."""
for token in node.tokens:
if token.kind == "ANNOTATION":
if token.text == "specializing":
return 1
if re.fullmatch(r"tier\d", token.text):
return int(token.text[-1])
return None
def has_error_with_pop(op: parser.InstDef) -> bool:
return (
variable_used(op, "ERROR_IF")
or variable_used(op, "pop_1_error")
or variable_used(op, "exception_unwind")
or variable_used(op, "resume_with_error")
)
def has_error_without_pop(op: parser.InstDef) -> bool:
return (
variable_used(op, "ERROR_NO_POP")
or variable_used(op, "pop_1_error")
or variable_used(op, "exception_unwind")
or variable_used(op, "resume_with_error")
)
NON_ESCAPING_FUNCTIONS = (
"PyCFunction_GET_FLAGS",
"PyCFunction_GET_FUNCTION",
"PyCFunction_GET_SELF",
"PyCell_GetRef",
"PyCell_New",
"PyCell_SwapTakeRef",
"PyExceptionInstance_Class",
"PyException_GetCause",
"PyException_GetContext",
"PyException_GetTraceback",
"PyFloat_AS_DOUBLE",
"PyFloat_FromDouble",
"PyFunction_GET_CODE",
"PyFunction_GET_GLOBALS",
"PyList_GET_ITEM",
"PyList_GET_SIZE",
"PyList_SET_ITEM",
"PyLong_AsLong",
"PyLong_FromLong",
"PyLong_FromSsize_t",
"PySlice_New",
"PyStackRef_AsPyObjectBorrow",
"PyStackRef_AsPyObjectNew",
"PyStackRef_AsPyObjectSteal",
"PyStackRef_CLEAR",
"PyStackRef_CLOSE",
"PyStackRef_CLOSE_SPECIALIZED",
"PyStackRef_DUP",
"PyStackRef_False",
"PyStackRef_FromPyObjectImmortal",
"PyStackRef_FromPyObjectNew",
"PyStackRef_FromPyObjectSteal",
"PyStackRef_IsExactly",
"PyStackRef_IsNone",
"PyStackRef_IsTrue",
"PyStackRef_IsFalse",
"PyStackRef_IsNull",
"PyStackRef_None",
"PyStackRef_TYPE",
"PyStackRef_True",
"PyTuple_GET_ITEM",
"PyTuple_GET_SIZE",
"PyType_HasFeature",
"PyUnicode_Append",
"PyUnicode_Concat",
"PyUnicode_GET_LENGTH",
"PyUnicode_READ_CHAR",
"Py_ARRAY_LENGTH",
"Py_CLEAR",
"Py_DECREF",
"Py_FatalError",
"Py_INCREF",
"Py_IS_TYPE",
"Py_NewRef",
"Py_REFCNT",
"Py_SIZE",
"Py_TYPE",
"Py_UNREACHABLE",
"Py_Unicode_GET_LENGTH",
"Py_XDECREF",
"_PyCode_CODE",
"_PyDictValues_AddToInsertionOrder",
"_PyErr_Occurred",
"_PyEval_FrameClearAndPop",
"_PyFloat_FromDouble_ConsumeInputs",
"_PyFrame_GetCode",
"_PyFrame_IsIncomplete",
"_PyFrame_PushUnchecked",
"_PyFrame_SetStackPointer",
"_PyFrame_StackPush",
"_PyFunction_SetVersion",
"_PyGen_GetGeneratorFromFrame",
"_PyInterpreterState_GET",
"_PyList_AppendTakeRef",
"_PyList_FromStackRefSteal",
"_PyList_ITEMS",
"_PyLong_Add",
"_PyLong_CompactValue",
"_PyLong_DigitCount",
"_PyLong_IsCompact",
"_PyLong_IsNonNegativeCompact",
"_PyLong_IsZero",
"_PyLong_Multiply",
"_PyLong_Subtract",
"_PyManagedDictPointer_IsValues",
"_PyObject_GC_IS_TRACKED",
"_PyObject_GC_MAY_BE_TRACKED",
"_PyObject_GC_TRACK",
"_PyObject_GetManagedDict",
"_PyObject_InlineValues",
"_PyObject_ManagedDictPointer",
"_PyThreadState_HasStackSpace",
"_PyTuple_FromArraySteal",
"_PyTuple_FromStackRefSteal",
"_PyTuple_ITEMS",
"_PyType_HasFeature",
"_PyType_NewManagedObject",
"_PyUnicode_Equal",
"_PyUnicode_JoinArray",
"_Py_CHECK_EMSCRIPTEN_SIGNALS_PERIODICALLY",
"_Py_DECREF_NO_DEALLOC",
"_Py_DECREF_SPECIALIZED",
"_Py_EnterRecursiveCallTstateUnchecked",
"_Py_ID",
"_Py_IsImmortal",
"_Py_LeaveRecursiveCallPy",
"_Py_LeaveRecursiveCallTstate",
"_Py_NewRef",
"_Py_SINGLETON",
"_Py_STR",
"_Py_TryIncrefCompare",
"_Py_TryIncrefCompareStackRef",
"_Py_atomic_load_ptr_acquire",
"_Py_atomic_load_uintptr_relaxed",
"_Py_set_eval_breaker_bit",
"advance_backoff_counter",
"assert",
"backoff_counter_triggers",
"initial_temperature_backoff_counter",
"maybe_lltrace_resume_frame",
"restart_backoff_counter",
)
def find_stmt_start(node: parser.InstDef, idx: int) -> lexer.Token:
assert idx < len(node.block.tokens)
while True:
tkn = node.block.tokens[idx-1]
if tkn.kind in {"SEMI", "LBRACE", "RBRACE", "CMACRO"}:
break
idx -= 1
assert idx > 0
while node.block.tokens[idx].kind == "COMMENT":
idx += 1
return node.block.tokens[idx]
def find_stmt_end(node: parser.InstDef, idx: int) -> lexer.Token:
assert idx < len(node.block.tokens)
while True:
idx += 1
tkn = node.block.tokens[idx]
if tkn.kind == "SEMI":
return node.block.tokens[idx+1]
def check_escaping_calls(instr: parser.InstDef, escapes: dict[lexer.Token, tuple[lexer.Token, lexer.Token]]) -> None:
calls = {escapes[t][0] for t in escapes}
in_if = 0
tkn_iter = iter(instr.block.tokens)
for tkn in tkn_iter:
if tkn.kind == "IF":
next(tkn_iter)
in_if = 1
if tkn.kind == "IDENTIFIER" and tkn.text in ("DEOPT_IF", "ERROR_IF"):
next(tkn_iter)
in_if = 1
elif tkn.kind == "LPAREN" and in_if:
in_if += 1
elif tkn.kind == "RPAREN":
if in_if:
in_if -= 1
elif tkn in calls and in_if:
raise analysis_error(f"Escaping call '{tkn.text} in condition", tkn)
def find_escaping_api_calls(instr: parser.InstDef) -> dict[lexer.Token, tuple[lexer.Token, lexer.Token]]:
result: dict[lexer.Token, tuple[lexer.Token, lexer.Token]] = {}
tokens = instr.block.tokens
for idx, tkn in enumerate(tokens):
try:
next_tkn = tokens[idx+1]
except IndexError:
break
if tkn.kind == "SWITCH":
raise analysis_error(f"switch statements are not supported due to their complex flow control. Sorry.", tkn)
if next_tkn.kind != lexer.LPAREN:
continue
if tkn.kind == lexer.IDENTIFIER:
if tkn.text.upper() == tkn.text:
# simple macro
continue
#if not tkn.text.startswith(("Py", "_Py", "monitor")):
# continue
if tkn.text.startswith(("sym_", "optimize_")):
# Optimize functions
continue
if tkn.text.endswith("Check"):
continue
if tkn.text.startswith("Py_Is"):
continue
if tkn.text.endswith("CheckExact"):
continue
if tkn.text in NON_ESCAPING_FUNCTIONS:
continue
elif tkn.kind == "RPAREN":
prev = tokens[idx-1]
if prev.text.endswith("_t") or prev.text == "*" or prev.text == "int":
#cast
continue
elif tkn.kind != "RBRACKET":
continue
start = find_stmt_start(instr, idx)
end = find_stmt_end(instr, idx)
result[start] = tkn, end
check_escaping_calls(instr, result)
return result
EXITS = {
"DISPATCH",
"GO_TO_INSTRUCTION",
"Py_UNREACHABLE",
"DISPATCH_INLINED",
"DISPATCH_GOTO",
}
def always_exits(op: parser.InstDef) -> bool:
depth = 0
tkn_iter = iter(op.tokens)
for tkn in tkn_iter:
if tkn.kind == "LBRACE":
depth += 1
elif tkn.kind == "RBRACE":
depth -= 1
elif depth > 1:
continue
elif tkn.kind == "GOTO" or tkn.kind == "RETURN":
return True
elif tkn.kind == "KEYWORD":
if tkn.text in EXITS:
return True
elif tkn.kind == "IDENTIFIER":
if tkn.text in EXITS:
return True
if tkn.text == "DEOPT_IF" or tkn.text == "ERROR_IF":
next(tkn_iter) # '('
t = next(tkn_iter)
if t.text in ("true", "1"):
return True
return False
def stack_effect_only_peeks(instr: parser.InstDef) -> bool:
stack_inputs = [s for s in instr.inputs if not isinstance(s, parser.CacheEffect)]
if len(stack_inputs) != len(instr.outputs):
return False
if len(stack_inputs) == 0:
return False
if any(s.cond for s in stack_inputs) or any(s.cond for s in instr.outputs):
return False
return all(
(s.name == other.name and s.type == other.type and s.size == other.size)
for s, other in zip(stack_inputs, instr.outputs)
)
OPARG_AND_1 = re.compile("\\(*oparg *& *1")
def effect_depends_on_oparg_1(op: parser.InstDef) -> bool:
for effect in op.inputs:
if isinstance(effect, parser.CacheEffect):
continue
if not effect.cond:
continue
if OPARG_AND_1.match(effect.cond):
return True
for effect in op.outputs:
if not effect.cond:
continue
if OPARG_AND_1.match(effect.cond):
return True
return False
def compute_properties(op: parser.InstDef) -> Properties:
escaping_calls = find_escaping_api_calls(op)
has_free = (
variable_used(op, "PyCell_New")
or variable_used(op, "PyCell_GetRef")
or variable_used(op, "PyCell_SetTakeRef")
or variable_used(op, "PyCell_SwapTakeRef")
)
deopts_if = variable_used(op, "DEOPT_IF")
exits_if = variable_used(op, "EXIT_IF")
if deopts_if and exits_if:
tkn = op.tokens[0]
raise lexer.make_syntax_error(
"Op cannot contain both EXIT_IF and DEOPT_IF",
tkn.filename,
tkn.line,
tkn.column,
op.name,
)
error_with_pop = has_error_with_pop(op)
error_without_pop = has_error_without_pop(op)
return Properties(
escaping_calls=escaping_calls,
error_with_pop=error_with_pop,
error_without_pop=error_without_pop,
deopts=deopts_if,
side_exit=exits_if,
oparg=oparg_used(op),
jumps=variable_used(op, "JUMPBY"),
eval_breaker="CHECK_PERIODIC" in op.name,
needs_this=variable_used(op, "this_instr"),
always_exits=always_exits(op),
stores_sp=variable_used(op, "SYNC_SP"),
uses_co_consts=variable_used(op, "FRAME_CO_CONSTS"),
uses_co_names=variable_used(op, "FRAME_CO_NAMES"),
uses_locals=(variable_used(op, "GETLOCAL") or variable_used(op, "SETLOCAL"))
and not has_free,
has_free=has_free,
pure="pure" in op.annotations,
tier=tier_variable(op),
needs_prev=variable_used(op, "prev_instr"),
)
def make_uop(
name: str,
op: parser.InstDef,
inputs: list[parser.InputEffect],
uops: dict[str, Uop],
) -> Uop:
result = Uop(
name=name,
context=op.context,
annotations=op.annotations,
stack=analyze_stack(op),
caches=analyze_caches(inputs),
deferred_refs=analyze_deferred_refs(op),
output_stores=find_stores_outputs(op),
body=op.block.tokens,
properties=compute_properties(op),
)
if effect_depends_on_oparg_1(op) and "split" in op.annotations:
result.properties.oparg_and_1 = True
for bit in ("0", "1"):
name_x = name + "_" + bit
properties = compute_properties(op)
if properties.oparg:
# May not need oparg anymore
properties.oparg = any(
token.text == "oparg" for token in op.block.tokens
)
rep = Uop(
name=name_x,
context=op.context,
annotations=op.annotations,
stack=analyze_stack(op, bit),
caches=analyze_caches(inputs),
deferred_refs=analyze_deferred_refs(op),
output_stores=find_stores_outputs(op),
body=op.block.tokens,
properties=properties,
)
rep.replicates = result
uops[name_x] = rep
for anno in op.annotations:
if anno.startswith("replicate"):
result.replicated = int(anno[10:-1])
break
else:
return result
for oparg in range(result.replicated):
name_x = name + "_" + str(oparg)
properties = compute_properties(op)
properties.oparg = False
properties.const_oparg = oparg
rep = Uop(
name=name_x,
context=op.context,
annotations=op.annotations,
stack=analyze_stack(op),
caches=analyze_caches(inputs),
deferred_refs=analyze_deferred_refs(op),
output_stores=find_stores_outputs(op),
body=op.block.tokens,
properties=properties,
)
rep.replicates = result
uops[name_x] = rep
return result
def add_op(op: parser.InstDef, uops: dict[str, Uop]) -> None:
assert op.kind == "op"
if op.name in uops:
if "override" not in op.annotations:
raise override_error(
op.name, op.context, uops[op.name].context, op.tokens[0]
)
uops[op.name] = make_uop(op.name, op, op.inputs, uops)
def add_instruction(
where: lexer.Token,
name: str,
parts: list[Part],
instructions: dict[str, Instruction],
) -> None:
instructions[name] = Instruction(where, name, parts, None)
def desugar_inst(
inst: parser.InstDef, instructions: dict[str, Instruction], uops: dict[str, Uop]
) -> None:
assert inst.kind == "inst"
name = inst.name
op_inputs: list[parser.InputEffect] = []
parts: list[Part] = []
uop_index = -1
# Move unused cache entries to the Instruction, removing them from the Uop.
for input in inst.inputs:
if isinstance(input, parser.CacheEffect) and input.name == "unused":
parts.append(Skip(input.size))
else:
op_inputs.append(input)
if uop_index < 0:
uop_index = len(parts)
# Place holder for the uop.
parts.append(Skip(0))
uop = make_uop("_" + inst.name, inst, op_inputs, uops)
uop.implicitly_created = True
uops[inst.name] = uop
if uop_index < 0:
parts.append(uop)
else:
parts[uop_index] = uop
add_instruction(inst.first_token, name, parts, instructions)
def add_macro(
macro: parser.Macro, instructions: dict[str, Instruction], uops: dict[str, Uop]
) -> None:
parts: list[Part] = []
for part in macro.uops:
match part:
case parser.OpName():
if part.name == "flush":
parts.append(Flush())
else:
if part.name not in uops:
raise analysis_error(
f"No Uop named {part.name}", macro.tokens[0]
)
parts.append(uops[part.name])
case parser.CacheEffect():
parts.append(Skip(part.size))
case _:
assert False
assert parts
add_instruction(macro.first_token, macro.name, parts, instructions)
def add_family(
pfamily: parser.Family,
instructions: dict[str, Instruction],
families: dict[str, Family],
) -> None:
family = Family(
pfamily.name,
pfamily.size,
[instructions[member_name] for member_name in pfamily.members],
)
for member in family.members:
member.family = family
# The head of the family is an implicit jump target for DEOPTs
instructions[family.name].is_target = True
families[family.name] = family
def add_pseudo(
pseudo: parser.Pseudo,
instructions: dict[str, Instruction],
pseudos: dict[str, PseudoInstruction],
) -> None:
pseudos[pseudo.name] = PseudoInstruction(
pseudo.name,
analyze_stack(pseudo),
[instructions[target] for target in pseudo.targets],
pseudo.as_sequence,
pseudo.flags,
)
def assign_opcodes(
instructions: dict[str, Instruction],
families: dict[str, Family],
pseudos: dict[str, PseudoInstruction],
) -> tuple[dict[str, int], int, int]:
"""Assigns opcodes, then returns the opmap,
have_arg and min_instrumented values"""
instmap: dict[str, int] = {}
# 0 is reserved for cache entries. This helps debugging.
instmap["CACHE"] = 0
# 17 is reserved as it is the initial value for the specializing counter.
# This helps catch cases where we attempt to execute a cache.
instmap["RESERVED"] = 17
# 149 is RESUME - it is hard coded as such in Tools/build/deepfreeze.py
instmap["RESUME"] = 149
# This is an historical oddity.
instmap["BINARY_OP_INPLACE_ADD_UNICODE"] = 3
instmap["INSTRUMENTED_LINE"] = 254
instmap["ENTER_EXECUTOR"] = 255
instrumented = [name for name in instructions if name.startswith("INSTRUMENTED")]
specialized: set[str] = set()
no_arg: list[str] = []
has_arg: list[str] = []
for family in families.values():
specialized.update(inst.name for inst in family.members)
for inst in instructions.values():
name = inst.name
if name in specialized:
continue
if name in instrumented:
continue
if inst.properties.oparg:
has_arg.append(name)
else:
no_arg.append(name)
# Specialized ops appear in their own section
# Instrumented opcodes are at the end of the valid range
min_internal = 150
min_instrumented = 254 - (len(instrumented) - 1)
assert min_internal + len(specialized) < min_instrumented
next_opcode = 1
def add_instruction(name: str) -> None:
nonlocal next_opcode
if name in instmap:
return # Pre-defined name
while next_opcode in instmap.values():
next_opcode += 1
instmap[name] = next_opcode
next_opcode += 1
for name in sorted(no_arg):
add_instruction(name)
for name in sorted(has_arg):
add_instruction(name)
# For compatibility
next_opcode = min_internal
for name in sorted(specialized):
add_instruction(name)
next_opcode = min_instrumented
for name in instrumented:
add_instruction(name)
for name in instructions:
instructions[name].opcode = instmap[name]
for op, name in enumerate(sorted(pseudos), 256):
instmap[name] = op
pseudos[name].opcode = op
return instmap, len(no_arg), min_instrumented
def get_instruction_size_for_uop(instructions: dict[str, Instruction], uop: Uop) -> int | None:
"""Return the size of the instruction that contains the given uop or
`None` if the uop does not contains the `INSTRUCTION_SIZE` macro.
If there is more than one instruction that contains the uop,
ensure that they all have the same size.
"""
for tkn in uop.body:
if tkn.text == "INSTRUCTION_SIZE":
break
else:
return None
size = None
for inst in instructions.values():
if uop in inst.parts:
if size is None:
size = inst.size
if size != inst.size:
raise analysis_error(
"All instructions containing a uop with the `INSTRUCTION_SIZE` macro "
f"must have the same size: {size} != {inst.size}",
tkn
)
if size is None:
raise analysis_error(f"No instruction containing the uop '{uop.name}' was found", tkn)
return size
def analyze_forest(forest: list[parser.AstNode]) -> Analysis:
instructions: dict[str, Instruction] = {}
uops: dict[str, Uop] = {}
families: dict[str, Family] = {}
pseudos: dict[str, PseudoInstruction] = {}
for node in forest:
match node:
case parser.InstDef(name):
if node.kind == "inst":
desugar_inst(node, instructions, uops)
else:
assert node.kind == "op"
add_op(node, uops)
case parser.Macro():
pass
case parser.Family():
pass
case parser.Pseudo():
pass
case _:
assert False
for node in forest:
if isinstance(node, parser.Macro):
add_macro(node, instructions, uops)
for node in forest:
match node:
case parser.Family():
add_family(node, instructions, families)
case parser.Pseudo():
add_pseudo(node, instructions, pseudos)
case _:
pass
for uop in uops.values():
tkn_iter = iter(uop.body)
for tkn in tkn_iter:
if tkn.kind == "IDENTIFIER" and tkn.text == "GO_TO_INSTRUCTION":
if next(tkn_iter).kind != "LPAREN":
continue
target = next(tkn_iter)
if target.kind != "IDENTIFIER":
continue
if target.text in instructions:
instructions[target.text].is_target = True
for uop in uops.values():
uop.instruction_size = get_instruction_size_for_uop(instructions, uop)
# Special case BINARY_OP_INPLACE_ADD_UNICODE
# BINARY_OP_INPLACE_ADD_UNICODE is not a normal family member,
# as it is the wrong size, but we need it to maintain an
# historical optimization.
if "BINARY_OP_INPLACE_ADD_UNICODE" in instructions:
inst = instructions["BINARY_OP_INPLACE_ADD_UNICODE"]
inst.family = families["BINARY_OP"]
families["BINARY_OP"].members.append(inst)
opmap, first_arg, min_instrumented = assign_opcodes(instructions, families, pseudos)
return Analysis(
instructions, uops, families, pseudos, opmap, first_arg, min_instrumented
)
def analyze_files(filenames: list[str]) -> Analysis:
return analyze_forest(parser.parse_files(filenames))
def dump_analysis(analysis: Analysis) -> None:
print("Uops:")
for u in analysis.uops.values():
u.dump(" ")
print("Instructions:")
for i in analysis.instructions.values():
i.dump(" ")
print("Families:")
for f in analysis.families.values():
f.dump(" ")
print("Pseudos:")
for p in analysis.pseudos.values():
p.dump(" ")
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("No input")
else:
filenames = sys.argv[1:]
dump_analysis(analyze_files(filenames))