from pathlib import Path
from typing import TextIO
from analyzer import (
Instruction,
Uop,
Properties,
StackItem,
analysis_error,
)
from cwriter import CWriter
from typing import Callable, TextIO, Iterator, Iterable
from lexer import Token
from stack import Storage, StackError
# Set this to true for voluminous output showing state of stack and locals
PRINT_STACKS = False
class TokenIterator:
look_ahead: Token | None
iterator: Iterator[Token]
def __init__(self, tkns: Iterable[Token]):
self.iterator = iter(tkns)
self.look_ahead = None
def __iter__(self) -> "TokenIterator":
return self
def __next__(self) -> Token:
if self.look_ahead is None:
return next(self.iterator)
else:
res = self.look_ahead
self.look_ahead = None
return res
def peek(self) -> Token | None:
if self.look_ahead is None:
for tkn in self.iterator:
self.look_ahead = tkn
break
return self.look_ahead
ROOT = Path(__file__).parent.parent.parent.resolve()
DEFAULT_INPUT = (ROOT / "Python/bytecodes.c").as_posix()
def root_relative_path(filename: str) -> str:
try:
return Path(filename).resolve().relative_to(ROOT).as_posix()
except ValueError:
# Not relative to root, just return original path.
return filename
def type_and_null(var: StackItem) -> tuple[str, str]:
if var.type:
return var.type, "NULL"
elif var.is_array():
return "_PyStackRef *", "NULL"
else:
return "_PyStackRef", "PyStackRef_NULL"
def write_header(
generator: str, sources: list[str], outfile: TextIO, comment: str = "//"
) -> None:
outfile.write(
f"""{comment} This file is generated by {root_relative_path(generator)}
{comment} from:
{comment} {", ".join(root_relative_path(src) for src in sources)}
{comment} Do not edit!
"""
)
def emit_to(out: CWriter, tkn_iter: TokenIterator, end: str) -> Token:
parens = 0
for tkn in tkn_iter:
if tkn.kind == end and parens == 0:
return tkn
if tkn.kind == "LPAREN":
parens += 1
if tkn.kind == "RPAREN":
parens -= 1
out.emit(tkn)
raise analysis_error(f"Expecting {end}. Reached end of file", tkn)
ReplacementFunctionType = Callable[
[Token, TokenIterator, Uop, Storage, Instruction | None], bool
]
def always_true(tkn: Token | None) -> bool:
if tkn is None:
return False
return tkn.text in {"true", "1"}
class Emitter:
out: CWriter
_replacers: dict[str, ReplacementFunctionType]
def __init__(self, out: CWriter):
self._replacers = {
"EXIT_IF": self.exit_if,
"DEOPT_IF": self.deopt_if,
"ERROR_IF": self.error_if,
"ERROR_NO_POP": self.error_no_pop,
"DECREF_INPUTS": self.decref_inputs,
"DEAD": self.kill,
"INPUTS_DEAD": self.kill_inputs,
"SYNC_SP": self.sync_sp,
"SAVE_STACK": self.save_stack,
"RELOAD_STACK": self.reload_stack,
"PyStackRef_CLOSE": self.stackref_close,
"PyStackRef_CLOSE_SPECIALIZED": self.stackref_close,
"PyStackRef_AsPyObjectSteal": self.stackref_steal,
"DISPATCH": self.dispatch,
"INSTRUCTION_SIZE": self.instruction_size,
}
self.out = out
def dispatch(
self,
tkn: Token,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> bool:
self.emit(tkn)
return False
def deopt_if(
self,
tkn: Token,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> bool:
self.out.emit_at("DEOPT_IF", tkn)
lparen = next(tkn_iter)
self.emit(lparen)
assert lparen.kind == "LPAREN"
first_tkn = tkn_iter.peek()
emit_to(self.out, tkn_iter, "RPAREN")
next(tkn_iter) # Semi colon
self.out.emit(", ")
assert inst is not None
assert inst.family is not None
self.out.emit(inst.family.name)
self.out.emit(");\n")
return not always_true(first_tkn)
exit_if = deopt_if
def error_if(
self,
tkn: Token,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> bool:
lparen = next(tkn_iter)
assert lparen.kind == "LPAREN"
first_tkn = tkn_iter.peek()
unconditional = always_true(first_tkn)
if unconditional:
next(tkn_iter)
comma = next(tkn_iter)
if comma.kind != "COMMA":
raise analysis_error(f"Expected comma, got '{comma.text}'", comma)
self.out.start_line()
else:
self.out.emit_at("if ", tkn)
self.emit(lparen)
emit_to(self.out, tkn_iter, "COMMA")
self.out.emit(") ")
label = next(tkn_iter).text
next(tkn_iter) # RPAREN
next(tkn_iter) # Semi colon
storage.clear_inputs("at ERROR_IF")
c_offset = storage.stack.peek_offset()
try:
offset = -int(c_offset)
except ValueError:
offset = -1
if offset > 0:
self.out.emit(f"goto pop_{offset}_")
self.out.emit(label)
self.out.emit(";\n")
elif offset == 0:
self.out.emit("goto ")
self.out.emit(label)
self.out.emit(";\n")
else:
self.out.emit("{\n")
storage.copy().flush(self.out)
self.out.emit("goto ")
self.out.emit(label)
self.out.emit(";\n")
self.out.emit("}\n")
return not unconditional
def error_no_pop(
self,
tkn: Token,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> bool:
next(tkn_iter) # LPAREN
next(tkn_iter) # RPAREN
next(tkn_iter) # Semi colon
self.out.emit_at("goto error;", tkn)
return False
def decref_inputs(
self,
tkn: Token,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> bool:
next(tkn_iter)
next(tkn_iter)
next(tkn_iter)
self.out.emit_at("", tkn)
for var in uop.stack.inputs:
if var.name == "unused" or var.name == "null" or var.peek:
continue
if var.size:
if var.size == "1":
self.out.emit(f"PyStackRef_CLOSE({var.name}[0]);\n")
else:
self.out.emit(f"for (int _i = {var.size}; --_i >= 0;) {{\n")
self.out.emit(f"PyStackRef_CLOSE({var.name}[_i]);\n")
self.out.emit("}\n")
elif var.condition:
if var.condition == "1":
self.out.emit(f"PyStackRef_CLOSE({var.name});\n")
elif var.condition != "0":
self.out.emit(f"PyStackRef_XCLOSE({var.name});\n")
else:
self.out.emit(f"PyStackRef_CLOSE({var.name});\n")
for input in storage.inputs:
input.defined = False
return True
def kill_inputs(
self,
tkn: Token,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> bool:
next(tkn_iter)
next(tkn_iter)
next(tkn_iter)
for var in storage.inputs:
var.defined = False
return True
def kill(
self,
tkn: Token,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> bool:
next(tkn_iter)
name_tkn = next(tkn_iter)
name = name_tkn.text
next(tkn_iter)
next(tkn_iter)
for var in storage.inputs:
if var.name == name:
var.defined = False
break
else:
raise analysis_error(f"'{name}' is not a live input-only variable", name_tkn)
return True
def stackref_close(
self,
tkn: Token,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> bool:
self.out.emit(tkn)
tkn = next(tkn_iter)
assert tkn.kind == "LPAREN"
self.out.emit(tkn)
name = next(tkn_iter)
self.out.emit(name)
if name.kind == "IDENTIFIER":
for var in storage.inputs:
if var.name == name.text:
var.defined = False
rparen = emit_to(self.out, tkn_iter, "RPAREN")
self.emit(rparen)
return True
stackref_steal = stackref_close
def sync_sp(
self,
tkn: Token,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> bool:
next(tkn_iter)
next(tkn_iter)
next(tkn_iter)
storage.clear_inputs("when syncing stack")
storage.flush(self.out)
self._print_storage(storage)
return True
def emit_save(self, storage: Storage) -> None:
storage.save(self.out)
self._print_storage(storage)
def save_stack(
self,
tkn: Token,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> bool:
next(tkn_iter)
next(tkn_iter)
next(tkn_iter)
self.emit_save(storage)
return True
def emit_reload(self, storage: Storage) -> None:
storage.reload(self.out)
self._print_storage(storage)
def reload_stack(
self,
tkn: Token,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> bool:
next(tkn_iter)
next(tkn_iter)
next(tkn_iter)
self.emit_reload(storage)
return True
def instruction_size(self,
tkn: Token,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> bool:
"""Replace the INSTRUCTION_SIZE macro with the size of the current instruction."""
if uop.instruction_size is None:
raise analysis_error("The INSTRUCTION_SIZE macro requires uop.instruction_size to be set", tkn)
self.out.emit(f" {uop.instruction_size} ")
return True
def _print_storage(self, storage: Storage) -> None:
if PRINT_STACKS:
self.out.start_line()
self.emit(storage.as_comment())
self.out.start_line()
def _emit_if(
self,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> tuple[bool, Token, Storage]:
"""Returns (reachable?, closing '}', stack)."""
tkn = next(tkn_iter)
assert tkn.kind == "LPAREN"
self.out.emit(tkn)
rparen = emit_to(self.out, tkn_iter, "RPAREN")
self.emit(rparen)
if_storage = storage.copy()
reachable, rbrace, if_storage = self._emit_block(tkn_iter, uop, if_storage, inst, True)
try:
maybe_else = tkn_iter.peek()
if maybe_else and maybe_else.kind == "ELSE":
self._print_storage(storage)
self.emit(rbrace)
self.emit(next(tkn_iter))
maybe_if = tkn_iter.peek()
if maybe_if and maybe_if.kind == "IF":
#Emit extra braces around the if to get scoping right
self.emit(" {\n")
self.emit(next(tkn_iter))
else_reachable, rbrace, else_storage = self._emit_if(tkn_iter, uop, storage, inst)
self.out.start_line()
self.emit("}\n")
else:
else_reachable, rbrace, else_storage = self._emit_block(tkn_iter, uop, storage, inst, True)
if not reachable:
# Discard the if storage
reachable = else_reachable
storage = else_storage
elif not else_reachable:
# Discard the else storage
storage = if_storage
reachable = True
else:
if PRINT_STACKS:
self.emit("/* Merge */\n")
else_storage.merge(if_storage, self.out)
storage = else_storage
self._print_storage(storage)
else:
if reachable:
if PRINT_STACKS:
self.emit("/* Merge */\n")
if_storage.merge(storage, self.out)
storage = if_storage
self._print_storage(storage)
else:
# Discard the if storage
reachable = True
except StackError as ex:
self._print_storage(if_storage)
raise analysis_error(ex.args[0], rbrace) # from None
return reachable, rbrace, storage
def _emit_block(
self,
tkn_iter: TokenIterator,
uop: Uop,
storage: Storage,
inst: Instruction | None,
emit_first_brace: bool
) -> tuple[bool, Token, Storage]:
""" Returns (reachable?, closing '}', stack)."""
braces = 1
out_stores = set(uop.output_stores)
tkn = next(tkn_iter)
reload: Token | None = None
try:
reachable = True
line : int = -1
if tkn.kind != "LBRACE":
raise analysis_error(f"PEP 7: expected '{{', found: {tkn.text}", tkn)
escaping_calls = uop.properties.escaping_calls
if emit_first_brace:
self.emit(tkn)
self._print_storage(storage)
for tkn in tkn_iter:
if PRINT_STACKS and tkn.line != line:
self.out.start_line()
self.emit(storage.as_comment())
self.out.start_line()
line = tkn.line
if tkn in escaping_calls:
if tkn != reload:
self.emit_save(storage)
_, reload = escaping_calls[tkn]
elif tkn == reload:
self.emit_reload(storage)
if tkn.kind == "LBRACE":
self.out.emit(tkn)
braces += 1
elif tkn.kind == "RBRACE":
self._print_storage(storage)
braces -= 1
if braces == 0:
return reachable, tkn, storage
self.out.emit(tkn)
elif tkn.kind == "GOTO":
reachable = False;
self.out.emit(tkn)
elif tkn.kind == "IDENTIFIER":
if tkn.text in self._replacers:
if not self._replacers[tkn.text](tkn, tkn_iter, uop, storage, inst):
reachable = False
else:
if tkn in out_stores:
for out in storage.outputs:
if out.name == tkn.text:
out.defined = True
out.in_memory = False
break
if tkn.text.startswith("DISPATCH"):
self._print_storage(storage)
reachable = False
self.out.emit(tkn)
elif tkn.kind == "IF":
self.out.emit(tkn)
if_reachable, rbrace, storage = self._emit_if(tkn_iter, uop, storage, inst)
if reachable:
reachable = if_reachable
self.out.emit(rbrace)
else:
self.out.emit(tkn)
except StackError as ex:
raise analysis_error(ex.args[0], tkn) from None
raise analysis_error("Expecting closing brace. Reached end of file", tkn)
def emit_tokens(
self,
uop: Uop,
storage: Storage,
inst: Instruction | None,
) -> Storage:
tkn_iter = TokenIterator(uop.body)
self.out.start_line()
_, rbrace, storage = self._emit_block(tkn_iter, uop, storage, inst, False)
try:
self._print_storage(storage)
storage.push_outputs()
self._print_storage(storage)
except StackError as ex:
raise analysis_error(ex.args[0], rbrace)
return storage
def emit(self, txt: str | Token) -> None:
self.out.emit(txt)
def cflags(p: Properties) -> str:
flags: list[str] = []
if p.oparg:
flags.append("HAS_ARG_FLAG")
if p.uses_co_consts:
flags.append("HAS_CONST_FLAG")
if p.uses_co_names:
flags.append("HAS_NAME_FLAG")
if p.jumps:
flags.append("HAS_JUMP_FLAG")
if p.has_free:
flags.append("HAS_FREE_FLAG")
if p.uses_locals:
flags.append("HAS_LOCAL_FLAG")
if p.eval_breaker:
flags.append("HAS_EVAL_BREAK_FLAG")
if p.deopts:
flags.append("HAS_DEOPT_FLAG")
if p.side_exit:
flags.append("HAS_EXIT_FLAG")
if not p.infallible:
flags.append("HAS_ERROR_FLAG")
if p.error_without_pop:
flags.append("HAS_ERROR_NO_POP_FLAG")
if p.escapes:
flags.append("HAS_ESCAPES_FLAG")
if p.pure:
flags.append("HAS_PURE_FLAG")
if p.oparg_and_1:
flags.append("HAS_OPARG_AND_1_FLAG")
if flags:
return " | ".join(flags)
else:
return "0"