chromium/components/exo/wayland/compatibility_test/wayland_protocol_construction.py

# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import argparse
import collections
import dataclasses
import functools
import io
import itertools
import sys
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple

import wayland_protocol_data_classes
import wayland_protocol_identifiers

# Short aliases for typing
Message = wayland_protocol_data_classes.Message
Interface = wayland_protocol_data_classes.Interface
Protocol = wayland_protocol_data_classes.Protocol
RequestType = wayland_protocol_data_classes.RequestType


@functools.lru_cache(maxsize=None)
def get_interface_for_name(protocols: Iterable[Protocol],
                           target_interface_name: str) -> Optional[Interface]:
    """Given a name string, gets the interface that has that name, or None."""
    for protocol in protocols:
        for interface in protocol.interfaces:
            if interface.name == target_interface_name:
                return interface
    return None


@functools.lru_cache(maxsize=None)
def get_constructor_for_interface(
        target_interface: Interface) -> Optional[Message]:
    """Gets the message to use to construct the target interface, or None."""

    # Note: We assume there is only one constructor for any interface, and
    # return the first found, but there could be a protocol that defines
    # more than one way.

    for interface in target_interface.protocol.interfaces:
        for request in interface.requests:
            for arg in request.args:
                if (arg.type == 'new_id'
                        and arg.interface == target_interface.name):
                    return request
        for event in interface.events:
            for arg in event.args:
                if (arg.type == 'new_id'
                        and arg.interface == target_interface.name):
                    return event
    return None


@dataclasses.dataclass(frozen=True)
class ConstructionStepCtor:
    """Message and related data for a ConstructionStep"""

    # The construction step for the interface needed for this step.
    interface_step: 'ConstructionStep'

    # The message on the interface used to perform the construction in this
    # step. Note that while this is normally a client request, it can
    # occasionally be a server event. One example of that is the wl_data_offer
    # in the core Wayland protocol.
    message: Message

    # For request constructors, gives the construction steps for any
    # additional objects that are needed for constructing the target.
    # There are entries only for the arguments that are objects. The rest
    # are None. For event constructors this will always be empty.
    object_args: Tuple[Optional['ConstructionStep'], ...]


@dataclasses.dataclass(frozen=True)
class ConstructionStep:
    """Represents a step in the construction path of a target interface."""

    # Wayland interface constructed by this step
    interface: Interface

    # A reasonable human-readable name that can be used to generate variable
    # and function names for this step. The names will be unique within a single
    # generated sequence of steps.
    instance_name: str

    # The details of how to construct this interface, based on other
    # construction steps. This will be None if the interface in this step
    # is a Wayland global interface.
    ctor: Optional[ConstructionStepCtor]

    # Set to the minimum version needed for this interface.
    minimum_version: int


@functools.lru_cache(maxsize=None)
def get_construction_steps(
        target_interface: Interface) -> Tuple[ConstructionStep, ...]:
    """Generates the ConstructionSteps to construct a target interface."""

    # For brevity later, get the list of protocols as a local
    protocols = target_interface.protocol.protocols.protocols

    # Helper map for constructing human readable instance names
    base_human_readable_name_map = (
        wayland_protocol_identifiers.get_base_human_readable_name_map(
            target_interface.protocol.protocols.protocols))

    # Globals that will be needed (not ordered)
    global_steps = {}

    # Non-global instances that will be needed (ordered)
    instance_steps = []

    # To help ensure unique instance names
    uniquifier = collections.Counter()

    def unique_instance_name(prefix: str, name: str) -> str:
        def dedupe_words(name: str) -> str:
            # Otherwise a generated name might be "parent_surface_surface"
            # for a wl_surface passed as a parent_surface argument.
            words = name.split("_")
            if len(words) == 1:
                return name
            words = [
                w for i, w in enumerate(words[:-1]) if w not in words[i + 1]
            ] + [words[-1]]
            return "_".join(words)

        name = prefix + base_human_readable_name_map.get(name, name)
        name = dedupe_words(name)
        suffix = str(uniquifier.get(name, ''))
        uniquifier[name] += 1
        return name + suffix + "_"

    def recursive_construction_steps(current_target: Interface, prefix: str,
                                     minimum_version: int):
        ctor_message = get_constructor_for_interface(current_target)
        ctor = None

        if ctor_message is not None:
            # If we have a message, we have to use another interface to
            # create the current target.
            ctor_interface = recursive_construction_steps(
                ctor_message.interface, prefix,
                max(minimum_version,
                    ctor_message.since if ctor_message.since else 1))
            ctor_object_args = []

            if not ctor_message.is_event:
                # We may also have to construct other interfaces as well to
                # pass as object arguments. Those interfaces can be part of
                # any protocol, though normally it is either in the same
                # protocol or the core Wayland protocol.
                for arg in ctor_message.args:
                    arg_step = None
                    if arg.type == 'object':
                        arg_interface = get_interface_for_name(
                            protocols, arg.interface)
                        arg_step = recursive_construction_steps(
                            arg_interface, f'{prefix}{arg.name}_', 1)
                    ctor_object_args.append(arg_step)

            ctor = ConstructionStepCtor(ctor_interface, ctor_message,
                                        tuple(ctor_object_args))

        # Construct the step
        step = ConstructionStep(interface=current_target,
                                instance_name=unique_instance_name(
                                    prefix if ctor is not None else '',
                                    current_target.name),
                                ctor=ctor,
                                minimum_version=minimum_version)

        if ctor is None:
            # For a global, interface, we only make/get one instance
            step = global_steps.setdefault(current_target.name, step)
        else:
            # Otherwise store each individual step
            instance_steps.append(step)

        return step

    recursive_construction_steps(target_interface, '', 1)

    return tuple([global_steps[name]
                  for name in sorted(global_steps)] + instance_steps)


@functools.lru_cache(maxsize=None)
def get_destructor(interface: Interface) -> Optional[Message]:
    """Gets the Message that acts as the interface destructor, if present."""
    for message in interface.requests:
        if message.request_type == RequestType.DESTRUCTOR.value:
            return message
    return None


def get_minimum_version_to_construct(target: Interface) -> int:
    """Gets the minimum version of the global needed to construct a target."""
    def recursive_minimum(interface: Interface, minimum_version: int) -> int:
        ctor_message = get_constructor_for_interface(interface)

        # If there is no explicit constructor for this target, it must be a
        # global
        if not ctor_message:
            # Return the global interface
            return minimum_version

        # If the constructor has a "since", it constrains the minimum version
        if ctor_message.since:
            minimum_version = max(minimum_version, ctor_message.since)

        return recursive_minimum(ctor_message.interface, minimum_version)

    # Until proven otherwise, assume the first version can create the target
    return recursive_minimum(target, 1)


def get_versions_to_test_for_event_delivery(
        interface: Interface) -> Tuple[int, ...]:
    # Get the minimum interface version
    min_version = get_minimum_version_to_construct(interface)

    # Include all versions where events are introduced
    versions = set(event.since for event in interface.events
                   if event.since and event.since > min_version)
    # Include all versions one less than where events are introduced
    versions = versions.union(event.since - 1 for event in interface.events
                              if event.since and event.since - 1 > min_version)
    # Include the minimum and maximum versions
    versions = versions.union((min_version, interface.version))

    return tuple(versions)


def is_global_interface(interface: Interface) -> bool:
    """Returns true if the interface is a global interface."""
    return get_constructor_for_interface(interface) is None