cpython/Doc/tools/extensions/audit_events.py

"""Support for documenting audit events."""

from __future__ import annotations

import re
from typing import TYPE_CHECKING

from docutils import nodes
from sphinx.errors import NoUri
from sphinx.locale import _ as sphinx_gettext
from sphinx.transforms.post_transforms import SphinxPostTransform
from sphinx.util import logging
from sphinx.util.docutils import SphinxDirective

if TYPE_CHECKING:
    from collections.abc import Iterator

    from sphinx.application import Sphinx
    from sphinx.builders import Builder
    from sphinx.environment import BuildEnvironment

logger = logging.getLogger(__name__)

# This list of sets are allowable synonyms for event argument names.
# If two names are in the same set, they are treated as equal for the
# purposes of warning. This won't help if the number of arguments is
# different!
_SYNONYMS = [
    frozenset({"file", "path", "fd"}),
]


class AuditEvents:
    def __init__(self) -> None:
        self.events: dict[str, list[str]] = {}
        self.sources: dict[str, list[tuple[str, str]]] = {}

    def __iter__(self) -> Iterator[tuple[str, list[str], tuple[str, str]]]:
        for name, args in self.events.items():
            for source in self.sources[name]:
                yield name, args, source

    def add_event(
        self, name, args: list[str], source: tuple[str, str]
    ) -> None:
        if name in self.events:
            self._check_args_match(name, args)
        else:
            self.events[name] = args
        self.sources.setdefault(name, []).append(source)

    def _check_args_match(self, name: str, args: list[str]) -> None:
        current_args = self.events[name]
        msg = (
            f"Mismatched arguments for audit-event {name}: "
            f"{current_args!r} != {args!r}"
        )
        if current_args == args:
            return
        if len(current_args) != len(args):
            logger.warning(msg)
            return
        for a1, a2 in zip(current_args, args, strict=False):
            if a1 == a2:
                continue
            if any(a1 in s and a2 in s for s in _SYNONYMS):
                continue
            logger.warning(msg)
            return

    def id_for(self, name) -> str:
        source_count = len(self.sources.get(name, ()))
        name_clean = re.sub(r"\W", "_", name)
        return f"audit_event_{name_clean}_{source_count}"

    def rows(self) -> Iterator[tuple[str, list[str], list[tuple[str, str]]]]:
        for name in sorted(self.events.keys()):
            yield name, self.events[name], self.sources[name]


def initialise_audit_events(app: Sphinx) -> None:
    """Initialise the audit_events attribute on the environment."""
    if not hasattr(app.env, "audit_events"):
        app.env.audit_events = AuditEvents()


def audit_events_purge(
    app: Sphinx, env: BuildEnvironment, docname: str
) -> None:
    """This is to remove traces of removed documents from env.audit_events."""
    fresh_audit_events = AuditEvents()
    for name, args, (doc, target) in env.audit_events:
        if doc != docname:
            fresh_audit_events.add_event(name, args, (doc, target))


def audit_events_merge(
    app: Sphinx,
    env: BuildEnvironment,
    docnames: list[str],
    other: BuildEnvironment,
) -> None:
    """In Sphinx parallel builds, this merges audit_events from subprocesses."""
    for name, args, source in other.audit_events:
        env.audit_events.add_event(name, args, source)


class AuditEvent(SphinxDirective):
    has_content = True
    required_arguments = 1
    optional_arguments = 2
    final_argument_whitespace = True

    _label = [
        sphinx_gettext(
            "Raises an :ref:`auditing event <auditing>` "
            "{name} with no arguments."
        ),
        sphinx_gettext(
            "Raises an :ref:`auditing event <auditing>` "
            "{name} with argument {args}."
        ),
        sphinx_gettext(
            "Raises an :ref:`auditing event <auditing>` "
            "{name} with arguments {args}."
        ),
    ]

    def run(self) -> list[nodes.paragraph]:
        name = self.arguments[0]
        if len(self.arguments) >= 2 and self.arguments[1]:
            args = [
                arg
                for argument in self.arguments[1].strip("'\"").split(",")
                if (arg := argument.strip())
            ]
        else:
            args = []
        ids = []
        try:
            target = self.arguments[2].strip("\"'")
        except (IndexError, TypeError):
            target = None
        if not target:
            target = self.env.audit_events.id_for(name)
            ids.append(target)
        self.env.audit_events.add_event(name, args, (self.env.docname, target))

        node = nodes.paragraph("", classes=["audit-hook"], ids=ids)
        self.set_source_info(node)
        if self.content:
            node.rawsource = '\n'.join(self.content)  # for gettext
            self.state.nested_parse(self.content, self.content_offset, node)
        else:
            num_args = min(2, len(args))
            text = self._label[num_args].format(
                name=f"``{name}``",
                args=", ".join(f"``{a}``" for a in args),
            )
            node.rawsource = text  # for gettext
            parsed, messages = self.state.inline_text(text, self.lineno)
            node += parsed
            node += messages
        return [node]


class audit_event_list(nodes.General, nodes.Element):  # noqa: N801
    pass


class AuditEventListDirective(SphinxDirective):
    def run(self) -> list[audit_event_list]:
        return [audit_event_list()]


class AuditEventListTransform(SphinxPostTransform):
    default_priority = 500

    def run(self) -> None:
        if self.document.next_node(audit_event_list) is None:
            return

        table = self._make_table(self.app.builder, self.env.docname)
        for node in self.document.findall(audit_event_list):
            node.replace_self(table)

    def _make_table(self, builder: Builder, docname: str) -> nodes.table:
        table = nodes.table(cols=3)
        group = nodes.tgroup(
            "",
            nodes.colspec(colwidth=30),
            nodes.colspec(colwidth=55),
            nodes.colspec(colwidth=15),
            cols=3,
        )
        head = nodes.thead()
        body = nodes.tbody()

        table += group
        group += head
        group += body

        head += nodes.row(
            "",
            nodes.entry("", nodes.paragraph("", "Audit event")),
            nodes.entry("", nodes.paragraph("", "Arguments")),
            nodes.entry("", nodes.paragraph("", "References")),
        )

        for name, args, sources in builder.env.audit_events.rows():
            body += self._make_row(builder, docname, name, args, sources)

        return table

    @staticmethod
    def _make_row(
        builder: Builder,
        docname: str,
        name: str,
        args: list[str],
        sources: list[tuple[str, str]],
    ) -> nodes.row:
        row = nodes.row()
        name_node = nodes.paragraph("", nodes.Text(name))
        row += nodes.entry("", name_node)

        args_node = nodes.paragraph()
        for arg in args:
            args_node += nodes.literal(arg, arg)
            args_node += nodes.Text(", ")
        if len(args_node.children) > 0:
            args_node.children.pop()  # remove trailing comma
        row += nodes.entry("", args_node)

        backlinks_node = nodes.paragraph()
        backlinks = enumerate(sorted(set(sources)), start=1)
        for i, (doc, label) in backlinks:
            if isinstance(label, str):
                ref = nodes.reference("", f"[{i}]", internal=True)
                try:
                    target = (
                        f"{builder.get_relative_uri(docname, doc)}#{label}"
                    )
                except NoUri:
                    continue
                else:
                    ref["refuri"] = target
                    backlinks_node += ref
        row += nodes.entry("", backlinks_node)
        return row


def setup(app: Sphinx):
    app.add_directive("audit-event", AuditEvent)
    app.add_directive("audit-event-table", AuditEventListDirective)
    app.add_post_transform(AuditEventListTransform)
    app.connect("builder-inited", initialise_audit_events)
    app.connect("env-purge-doc", audit_events_purge)
    app.connect("env-merge-info", audit_events_merge)
    return {
        "version": "1.0",
        "parallel_read_safe": True,
        "parallel_write_safe": True,
    }