from pathlib import Path
from typing import TextIO
from analyzer import (
Instruction,
Uop,
Properties,
StackItem,
analysis_error,
)
from cwriter import CWriter
from typing import Callable, Mapping, TextIO, Iterator
from lexer import Token
from stack import Stack
ROOT = Path(__file__).parent.parent.parent
DEFAULT_INPUT = (ROOT / "Python/bytecodes.c").absolute().as_posix()
def root_relative_path(filename: str) -> str:
try:
return Path(filename).absolute().relative_to(ROOT).as_posix()
except ValueError:
# Not relative to root, just return original path.
return filename
def type_and_null(var: StackItem) -> tuple[str, str]:
if var.type:
return var.type, "NULL"
elif var.is_array():
return "_PyStackRef *", "NULL"
else:
return "_PyStackRef", "PyStackRef_NULL"
def write_header(
generator: str, sources: list[str], outfile: TextIO, comment: str = "//"
) -> None:
outfile.write(
f"""{comment} This file is generated by {root_relative_path(generator)}
{comment} from:
{comment} {", ".join(root_relative_path(src) for src in sources)}
{comment} Do not edit!
"""
)
def emit_to(out: CWriter, tkn_iter: Iterator[Token], end: str) -> None:
parens = 0
for tkn in tkn_iter:
if tkn.kind == end and parens == 0:
return
if tkn.kind == "LPAREN":
parens += 1
if tkn.kind == "RPAREN":
parens -= 1
out.emit(tkn)
ReplacementFunctionType = Callable[
[Token, Iterator[Token], Uop, Stack, Instruction | None], None
]
class Emitter:
out: CWriter
_replacers: dict[str, ReplacementFunctionType]
def __init__(self, out: CWriter):
self._replacers = {
"EXIT_IF": self.exit_if,
"DEOPT_IF": self.deopt_if,
"ERROR_IF": self.error_if,
"ERROR_NO_POP": self.error_no_pop,
"DECREF_INPUTS": self.decref_inputs,
"SYNC_SP": self.sync_sp,
"PyStackRef_FromPyObjectNew": self.py_stack_ref_from_py_object_new,
}
self.out = out
def deopt_if(
self,
tkn: Token,
tkn_iter: Iterator[Token],
uop: Uop,
unused: Stack,
inst: Instruction | None,
) -> None:
self.out.emit_at("DEOPT_IF", tkn)
self.out.emit(next(tkn_iter))
emit_to(self.out, tkn_iter, "RPAREN")
next(tkn_iter) # Semi colon
self.out.emit(", ")
assert inst is not None
assert inst.family is not None
self.out.emit(inst.family.name)
self.out.emit(");\n")
exit_if = deopt_if
def error_if(
self,
tkn: Token,
tkn_iter: Iterator[Token],
uop: Uop,
stack: Stack,
inst: Instruction | None,
) -> None:
self.out.emit_at("if ", tkn)
self.out.emit(next(tkn_iter))
emit_to(self.out, tkn_iter, "COMMA")
label = next(tkn_iter).text
next(tkn_iter) # RPAREN
next(tkn_iter) # Semi colon
self.out.emit(") ")
c_offset = stack.peek_offset()
try:
offset = -int(c_offset)
except ValueError:
offset = -1
if offset > 0:
self.out.emit(f"goto pop_{offset}_")
self.out.emit(label)
self.out.emit(";\n")
elif offset == 0:
self.out.emit("goto ")
self.out.emit(label)
self.out.emit(";\n")
else:
self.out.emit("{\n")
stack.flush_locally(self.out)
self.out.emit("goto ")
self.out.emit(label)
self.out.emit(";\n")
self.out.emit("}\n")
def error_no_pop(
self,
tkn: Token,
tkn_iter: Iterator[Token],
uop: Uop,
stack: Stack,
inst: Instruction | None,
) -> None:
next(tkn_iter) # LPAREN
next(tkn_iter) # RPAREN
next(tkn_iter) # Semi colon
self.out.emit_at("goto error;", tkn)
def decref_inputs(
self,
tkn: Token,
tkn_iter: Iterator[Token],
uop: Uop,
stack: Stack,
inst: Instruction | None,
) -> None:
next(tkn_iter)
next(tkn_iter)
next(tkn_iter)
self.out.emit_at("", tkn)
for var in uop.stack.inputs:
if var.name == "unused" or var.name == "null" or var.peek:
continue
if var.size:
self.out.emit(f"for (int _i = {var.size}; --_i >= 0;) {{\n")
self.out.emit(f"PyStackRef_CLOSE({var.name}[_i]);\n")
self.out.emit("}\n")
elif var.condition:
if var.condition == "1":
self.out.emit(f"PyStackRef_CLOSE({var.name});\n")
elif var.condition != "0":
self.out.emit(f"PyStackRef_XCLOSE({var.name});\n")
else:
self.out.emit(f"PyStackRef_CLOSE({var.name});\n")
def sync_sp(
self,
tkn: Token,
tkn_iter: Iterator[Token],
uop: Uop,
stack: Stack,
inst: Instruction | None,
) -> None:
next(tkn_iter)
next(tkn_iter)
next(tkn_iter)
stack.flush(self.out)
def py_stack_ref_from_py_object_new(
self,
tkn: Token,
tkn_iter: Iterator[Token],
uop: Uop,
stack: Stack,
inst: Instruction | None,
) -> None:
self.out.emit(tkn)
emit_to(self.out, tkn_iter, "SEMI")
self.out.emit(";\n")
target = uop.deferred_refs[tkn]
if target is None:
# An assignment we don't handle, such as to a pointer or array.
return
# Flush the assignment to the stack. Note that we don't flush the
# stack pointer here, and instead are currently relying on initializing
# unused portions of the stack to NULL.
stack.flush_single_var(self.out, target, uop.stack.outputs)
def emit_tokens(
self,
uop: Uop,
stack: Stack,
inst: Instruction | None,
) -> None:
tkns = uop.body[1:-1]
if not tkns:
return
tkn_iter = iter(tkns)
self.out.start_line()
for tkn in tkn_iter:
if tkn.kind == "IDENTIFIER" and tkn.text in self._replacers:
self._replacers[tkn.text](tkn, tkn_iter, uop, stack, inst)
else:
self.out.emit(tkn)
def emit(self, txt: str | Token) -> None:
self.out.emit(txt)
def cflags(p: Properties) -> str:
flags: list[str] = []
if p.oparg:
flags.append("HAS_ARG_FLAG")
if p.uses_co_consts:
flags.append("HAS_CONST_FLAG")
if p.uses_co_names:
flags.append("HAS_NAME_FLAG")
if p.jumps:
flags.append("HAS_JUMP_FLAG")
if p.has_free:
flags.append("HAS_FREE_FLAG")
if p.uses_locals:
flags.append("HAS_LOCAL_FLAG")
if p.eval_breaker:
flags.append("HAS_EVAL_BREAK_FLAG")
if p.deopts:
flags.append("HAS_DEOPT_FLAG")
if p.side_exit:
flags.append("HAS_EXIT_FLAG")
if not p.infallible:
flags.append("HAS_ERROR_FLAG")
if p.error_without_pop:
flags.append("HAS_ERROR_NO_POP_FLAG")
if p.escapes:
flags.append("HAS_ESCAPES_FLAG")
if p.pure:
flags.append("HAS_PURE_FLAG")
if p.oparg_and_1:
flags.append("HAS_OPARG_AND_1_FLAG")
if flags:
return " | ".join(flags)
else:
return "0"