chromium/third_party/wpt_tools/wpt/tools/manifest/typedata.py

from typing import (Any, Dict, Iterator, List, Optional, MutableMapping, Set, Text, Tuple,
                    Type, TYPE_CHECKING, Union)

from .item import ManifestItem

if TYPE_CHECKING:
    # avoid actually importing these, they're only used by type comments
    from .manifest import Manifest

TypeDataType = MutableMapping[Tuple[str, ...], Set[ManifestItem]]
PathHashType = MutableMapping[Tuple[str, ...], str]


class TypeData(TypeDataType):
    def __init__(self, m: "Manifest", type_cls: Type[ManifestItem]) -> None:
        """Dict-like object containing the TestItems for each test type.

        Loading an actual Item class for each test is unnecessarily
        slow, so this class allows lazy-loading of the test
        items. When the manifest is loaded we store the raw json
        corresponding to the test type, and only create an Item
        subclass when the test is accessed. In order to remain
        API-compatible with consumers that depend on getting an Item
        from iteration, we do egerly load all items when iterating
        over the class."""
        self._manifest = m
        self._type_cls: Type[ManifestItem] = type_cls
        self._json_data: Dict[Text, Any] = {}
        self._data: Dict[Text, Any] = {}
        self._hashes: Dict[Tuple[Text, ...], Text] = {}
        self.hashes = PathHash(self)

    def _delete_node(self, data: Dict[Text, Any], key: Tuple[Text, ...]) -> None:
        """delete a path from a Dict data with a given key"""
        path = []
        node = data
        for pathseg in key[:-1]:
            path.append((node, pathseg))
            node = node[pathseg]
            if not isinstance(node, dict):
                raise KeyError(key)

        del node[key[-1]]
        while path:
            node, pathseg = path.pop()
            if len(node[pathseg]) == 0:
                del node[pathseg]
            else:
                break

    def __getitem__(self, key: Tuple[Text, ...]) -> Set[ManifestItem]:
        node: Union[Dict[Text, Any], Set[ManifestItem], List[Any]] = self._data
        for pathseg in key:
            if isinstance(node, dict) and pathseg in node:
                node = node[pathseg]
            else:
                break
        else:
            if isinstance(node, set):
                return node
            else:
                raise KeyError(key)

        node = self._json_data
        found = False
        for pathseg in key:
            if isinstance(node, dict) and pathseg in node:
                node = node[pathseg]
            else:
                break
        else:
            found = True

        if not found:
            raise KeyError(key)

        if not isinstance(node, list):
            raise KeyError(key)

        self._hashes[key] = node[0]

        data = set()
        path = "/".join(key)
        for test in node[1:]:
            manifest_item = self._type_cls.from_json(self._manifest, path, test)
            data.add(manifest_item)

        node = self._data
        assert isinstance(node, dict)
        for pathseg in key[:-1]:
            node = node.setdefault(pathseg, {})
            assert isinstance(node, dict)
        assert key[-1] not in node
        node[key[-1]] = data

        self._delete_node(self._json_data, key)

        return data

    def __setitem__(self, key: Tuple[Text, ...], value: Set[ManifestItem]) -> None:
        try:
            self._delete_node(self._json_data, key)
        except KeyError:
            pass

        node = self._data
        for i, pathseg in enumerate(key[:-1]):
            node = node.setdefault(pathseg, {})
            if not isinstance(node, dict):
                raise KeyError(f"{key!r} is a child of a test ({key[:i+1]!r})")
        node[key[-1]] = value

    def __delitem__(self, key: Tuple[Text, ...]) -> None:
        try:
            self._delete_node(self._data, key)
        except KeyError:
            self._delete_node(self._json_data, key)
        else:
            try:
                del self._hashes[key]
            except KeyError:
                pass

    def __iter__(self) -> Iterator[Tuple[Text, ...]]:
        """Iterator over keys in the TypeData in codepoint order"""
        data_node: Optional[Union[Dict[Text, Any], Set[ManifestItem]]] = self._data
        json_node: Optional[Union[Dict[Text, Any], List[Any]]] = self._json_data
        path: Tuple[Text, ...] = tuple()
        stack = [(data_node, json_node, path)]
        while stack:
            data_node, json_node, path = stack.pop()
            if isinstance(data_node, set) or isinstance(json_node, list):
                assert data_node is None or json_node is None
                yield path
            else:
                assert data_node is None or isinstance(data_node, dict)
                assert json_node is None or isinstance(json_node, dict)

                keys: Set[Text] = set()
                if data_node is not None:
                    keys |= set(iter(data_node))
                if json_node is not None:
                    keys |= set(iter(json_node))

                for key in sorted(keys, reverse=True):
                    stack.append((data_node.get(key) if data_node is not None else None,
                                  json_node.get(key) if json_node is not None else None,
                                  path + (key,)))

    def __len__(self) -> int:
        count = 0

        stack: List[Union[Dict[Text, Any], Set[ManifestItem]]] = [self._data]
        while stack:
            v = stack.pop()
            if isinstance(v, set):
                count += 1
            else:
                stack.extend(v.values())

        json_stack: List[Union[Dict[Text, Any], List[Any]]] = [self._json_data]
        while json_stack:
            json_v = json_stack.pop()
            if isinstance(json_v, list):
                count += 1
            else:
                json_stack.extend(json_v.values())

        return count

    def __nonzero__(self) -> bool:
        return bool(self._data) or bool(self._json_data)

    __bool__ = __nonzero__

    def __contains__(self, key: Any) -> bool:
        # we provide our own impl of this to avoid calling __getitem__ and generating items for
        # those in self._json_data
        node = self._data
        for pathseg in key:
            if pathseg in node:
                node = node[pathseg]
            else:
                break
        else:
            return bool(isinstance(node, set))

        node = self._json_data
        for pathseg in key:
            if pathseg in node:
                node = node[pathseg]
            else:
                break
        else:
            return bool(isinstance(node, list))

        return False

    def clear(self) -> None:
        # much, much simpler/quicker than that defined in MutableMapping
        self._json_data.clear()
        self._data.clear()
        self._hashes.clear()

    def set_json(self, json_data: Dict[Text, Any]) -> None:
        """Provide the object with a raw JSON blob

        Note that this object graph is assumed to be owned by the TypeData
        object after the call, so the caller must not mutate any part of the
        graph.
        """
        if self._json_data:
            raise ValueError("set_json call when JSON data is not empty")

        self._json_data = json_data

    def to_json(self) -> Dict[Text, Any]:
        """Convert the current data to JSON

        Note that the returned object may contain references to the internal
        data structures, and is only guaranteed to be valid until the next
        __getitem__, __setitem__, __delitem__ call, so the caller must not
        mutate any part of the returned object graph.

        """
        json_rv = self._json_data.copy()

        def safe_sorter(element: Tuple[str,str]) -> Tuple[str,str]:
            """key function to sort lists with None values."""
            if element and not element[0]:
                return ("", element[1])
            else:
                return element

        stack: List[Tuple[Dict[Text, Any], Dict[Text, Any], Tuple[Text, ...]]] = [(self._data, json_rv, tuple())]
        while stack:
            data_node, json_node, par_full_key = stack.pop()
            for k, v in data_node.items():
                full_key = par_full_key + (k,)
                if isinstance(v, set):
                    assert k not in json_node
                    json_node[k] = [self._hashes.get(
                        full_key)] + [t for t in sorted((test.to_json() for test in v), key=safe_sorter)]
                else:
                    json_node[k] = json_node.get(k, {}).copy()
                    stack.append((v, json_node[k], full_key))

        return json_rv


class PathHash(PathHashType):
    def __init__(self, data: TypeData) -> None:
        self._data = data

    def __getitem__(self, k: Tuple[Text, ...]) -> Text:
        if k not in self._data:
            raise KeyError

        if k in self._data._hashes:
            return self._data._hashes[k]

        node = self._data._json_data
        for pathseg in k:
            if pathseg in node:
                node = node[pathseg]
            else:
                break
        else:
            return node[0]  # type: ignore

        assert False, "unreachable"
        raise KeyError

    def __setitem__(self, k: Tuple[Text, ...], v: Text) -> None:
        if k not in self._data:
            raise KeyError

        if k in self._data._hashes:
            self._data._hashes[k] = v

        node = self._data._json_data
        for pathseg in k:
            if pathseg in node:
                node = node[pathseg]
            else:
                break
        else:
            node[0] = v  # type: ignore
            return

        self._data._hashes[k] = v

    def __delitem__(self, k: Tuple[Text, ...]) -> None:
        raise ValueError("keys here must match underlying data")

    def __iter__(self) -> Iterator[Tuple[Text, ...]]:
        return iter(self._data)

    def __len__(self) -> int:
        return len(self._data)