chromium/third_party/wpt_tools/wpt/tools/wptrunner/wptrunner/executors/executorservodriver.py

# mypy: allow-untyped-defs

import json
import os
import socket
import traceback

from .base import (Protocol,
                   RefTestExecutor,
                   RefTestImplementation,
                   TestharnessExecutor,
                   TimedRunner,
                   strip_server)
from .protocol import BaseProtocolPart
from ..environment import wait_for_service

webdriver = None
ServoCommandExtensions = None

here = os.path.dirname(__file__)


def do_delayed_imports():
    global webdriver
    import webdriver

    global ServoCommandExtensions

    class ServoCommandExtensions:
        def __init__(self, session):
            self.session = session

        @webdriver.client.command
        def get_prefs(self, *prefs):
            body = {"prefs": list(prefs)}
            return self.session.send_session_command("POST", "servo/prefs/get", body)

        @webdriver.client.command
        def set_prefs(self, prefs):
            body = {"prefs": prefs}
            return self.session.send_session_command("POST", "servo/prefs/set", body)

        @webdriver.client.command
        def reset_prefs(self, *prefs):
            body = {"prefs": list(prefs)}
            return self.session.send_session_command("POST", "servo/prefs/reset", body)

        def change_prefs(self, old_prefs, new_prefs):
            # Servo interprets reset with an empty list as reset everything
            if old_prefs:
                self.reset_prefs(*old_prefs.keys())
            self.set_prefs({k: parse_pref_value(v) for k, v in new_prefs.items()})


# See parse_pref_from_command_line() in components/config/opts.rs
def parse_pref_value(value):
    if value == "true":
        return True
    if value == "false":
        return False
    try:
        return float(value)
    except ValueError:
        return value


class ServoBaseProtocolPart(BaseProtocolPart):
    def execute_script(self, script, asynchronous=False):
        pass

    def set_timeout(self, timeout):
        pass

    def wait(self):
        return False

    def set_window(self, handle):
        pass

    def window_handles(self):
        return []

    def load(self, url):
        pass


class ServoWebDriverProtocol(Protocol):
    implements = [ServoBaseProtocolPart]

    def __init__(self, executor, browser, capabilities, **kwargs):
        do_delayed_imports()
        Protocol.__init__(self, executor, browser)
        self.capabilities = capabilities
        self.host = browser.webdriver_host
        self.port = browser.webdriver_port
        self.init_timeout = browser.init_timeout
        self.session = None

    def connect(self):
        """Connect to browser via WebDriver."""
        wait_for_service(self.logger, self.host, self.port, timeout=self.init_timeout)

        self.session = webdriver.Session(self.host, self.port, extension=ServoCommandExtensions)
        self.session.start()

    def after_connect(self):
        pass

    def teardown(self):
        self.logger.debug("Hanging up on WebDriver session")
        try:
            self.session.end()
        except Exception:
            pass

    def is_alive(self):
        try:
            # Get a simple property over the connection
            self.session.window_handle
        # TODO what exception?
        except Exception:
            return False
        return True

    def wait(self):
        while True:
            try:
                return self.session.execute_async_script("""let callback = arguments[arguments.length - 1];
addEventListener("__test_restart", e => {e.preventDefault(); callback(true)})""")
            except webdriver.TimeoutException:
                pass
            except (socket.timeout, OSError):
                break
            except Exception:
                self.logger.error(traceback.format_exc())
                break
        return False


class ServoWebDriverRun(TimedRunner):
    def set_timeout(self):
        pass

    def run_func(self):
        try:
            self.result = True, self.func(self.protocol.session, self.url, self.timeout)
        except webdriver.TimeoutException:
            self.result = False, ("EXTERNAL-TIMEOUT", None)
        except (socket.timeout, OSError):
            self.result = False, ("CRASH", None)
        except Exception as e:
            message = getattr(e, "message", "")
            if message:
                message += "\n"
            message += traceback.format_exc()
            self.result = False, ("INTERNAL-ERROR", e)
        finally:
            self.result_flag.set()


class ServoWebDriverTestharnessExecutor(TestharnessExecutor):
    supports_testdriver = True

    def __init__(self, logger, browser, server_config, timeout_multiplier=1,
                 close_after_done=True, capabilities=None, debug_info=None,
                 **kwargs):
        TestharnessExecutor.__init__(self, logger, browser, server_config, timeout_multiplier=1,
                                     debug_info=None)
        self.protocol = ServoWebDriverProtocol(self, browser, capabilities=capabilities)
        with open(os.path.join(here, "testharness_servodriver.js")) as f:
            self.script = f.read()
        self.timeout = None

    def on_protocol_change(self, new_protocol):
        pass

    def is_alive(self):
        return self.protocol.is_alive()

    def do_test(self, test):
        url = self.test_url(test)

        timeout = test.timeout * self.timeout_multiplier + self.extra_timeout

        if timeout != self.timeout:
            try:
                self.protocol.session.timeouts.script = timeout
                self.timeout = timeout
            except OSError:
                msg = "Lost WebDriver connection"
                self.logger.error(msg)
                return ("INTERNAL-ERROR", msg)

        success, data = ServoWebDriverRun(self.logger,
                                          self.do_testharness,
                                          self.protocol,
                                          url,
                                          timeout,
                                          self.extra_timeout).run()

        if success:
            return self.convert_result(test, data)

        return (test.make_result(*data), [])

    def do_testharness(self, session, url, timeout):
        session.url = url
        result = json.loads(
            session.execute_async_script(
                self.script % {"abs_url": url,
                               "url": strip_server(url),
                               "timeout_multiplier": self.timeout_multiplier,
                               "timeout": timeout * 1000}))
        # Prevent leaking every page in history until Servo develops a more sane
        # page cache
        session.back()
        return result

    def on_environment_change(self, new_environment):
        self.protocol.session.extension.change_prefs(
            self.last_environment.get("prefs", {}),
            new_environment.get("prefs", {})
        )


class TimeoutError(Exception):
    pass


class ServoWebDriverRefTestExecutor(RefTestExecutor):
    def __init__(self, logger, browser, server_config, timeout_multiplier=1,
                 screenshot_cache=None, capabilities=None, debug_info=None,
                 **kwargs):
        """Selenium WebDriver-based executor for reftests"""
        RefTestExecutor.__init__(self,
                                 logger,
                                 browser,
                                 server_config,
                                 screenshot_cache=screenshot_cache,
                                 timeout_multiplier=timeout_multiplier,
                                 debug_info=debug_info)
        self.protocol = ServoWebDriverProtocol(self, browser,
                                               capabilities=capabilities)
        self.implementation = RefTestImplementation(self)
        self.timeout = None
        with open(os.path.join(here, "test-wait.js")) as f:
            self.wait_script = f.read() % {"classname": "reftest-wait"}

    def reset(self):
        self.implementation.reset()

    def is_alive(self):
        return self.protocol.is_alive()

    def do_test(self, test):
        try:
            result = self.implementation.run_test(test)
            return self.convert_result(test, result)
        except OSError:
            return test.make_result("CRASH", None), []
        except TimeoutError:
            return test.make_result("TIMEOUT", None), []
        except Exception as e:
            message = getattr(e, "message", "")
            if message:
                message += "\n"
            message += traceback.format_exc()
            return test.make_result("INTERNAL-ERROR", message), []

    def screenshot(self, test, viewport_size, dpi, page_ranges):
        # https://github.com/web-platform-tests/wpt/issues/7135
        assert viewport_size is None
        assert dpi is None

        timeout = (test.timeout * self.timeout_multiplier + self.extra_timeout
                   if self.debug_info is None else None)

        if self.timeout != timeout:
            try:
                self.protocol.session.timeouts.script = timeout
                self.timeout = timeout
            except OSError:
                msg = "Lost webdriver connection"
                self.logger.error(msg)
                return ("INTERNAL-ERROR", msg)

        return ServoWebDriverRun(self.logger,
                                 self._screenshot,
                                 self.protocol,
                                 self.test_url(test),
                                 timeout,
                                 self.extra_timeout).run()

    def _screenshot(self, session, url, timeout):
        session.url = url
        session.execute_async_script(self.wait_script)
        return session.screenshot()

    def on_environment_change(self, new_environment):
        self.protocol.session.extension.change_prefs(
            self.last_environment.get("prefs", {}),
            new_environment.get("prefs", {})
        )