chromium/third_party/wpt_tools/wpt/tools/wptrunner/wptrunner/executors/executorwebdriver.py

# mypy: allow-untyped-defs

import asyncio
import json
import os
import socket
import threading
import time
import traceback
import uuid
from urllib.parse import urljoin

from .base import (AsyncCallbackHandler,
                   CallbackHandler,
                   CrashtestExecutor,
                   RefTestExecutor,
                   RefTestImplementation,
                   TestharnessExecutor,
                   TimedRunner,
                   strip_server)
from .protocol import (BaseProtocolPart,
                       TestharnessProtocolPart,
                       Protocol,
                       SelectorProtocolPart,
                       AccessibilityProtocolPart,
                       ClickProtocolPart,
                       CookiesProtocolPart,
                       SendKeysProtocolPart,
                       ActionSequenceProtocolPart,
                       TestDriverProtocolPart,
                       GenerateTestReportProtocolPart,
                       SetPermissionProtocolPart,
                       VirtualAuthenticatorProtocolPart,
                       WindowProtocolPart,
                       DebugProtocolPart,
                       SPCTransactionsProtocolPart,
                       RPHRegistrationsProtocolPart,
                       FedCMProtocolPart,
                       VirtualSensorProtocolPart,
                       BidiBrowsingContextProtocolPart,
                       BidiEventsProtocolPart,
                       BidiScriptProtocolPart,
                       DevicePostureProtocolPart,
                       StorageProtocolPart,
                       merge_dicts)

from typing import List, Optional, Tuple
from webdriver.client import Session
from webdriver import error as webdriver_error
from webdriver.bidi import error as webdriver_bidi_error
from webdriver.bidi.protocol import bidi_deserialize

here = os.path.dirname(__file__)


class WebDriverCallbackHandler(CallbackHandler):
    unimplemented_exc = (NotImplementedError, webdriver_error.UnknownCommandException)
    expected_exc = (webdriver_error.WebDriverException,)


class WebDriverAsyncCallbackHandler(AsyncCallbackHandler):
    unimplemented_exc = (NotImplementedError, webdriver_bidi_error.UnknownCommandException)
    expected_exc = (webdriver_bidi_error.BidiException,)


class WebDriverBaseProtocolPart(BaseProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def execute_script(self, script, asynchronous=False, args=None):
        method = self.webdriver.execute_async_script if asynchronous else self.webdriver.execute_script
        return method(script, args=args)

    def set_timeout(self, timeout):
        try:
            self.webdriver.timeouts.script = timeout
        except webdriver_error.WebDriverException:
            # workaround https://bugs.chromium.org/p/chromedriver/issues/detail?id=2057
            body = {"type": "script", "ms": timeout * 1000}
            self.webdriver.send_session_command("POST", "timeouts", body)

    @property
    def current_window(self):
        return self.webdriver.window_handle

    def set_window(self, handle):
        self.webdriver.window_handle = handle

    def window_handles(self):
        return self.webdriver.handles

    def load(self, url):
        self.webdriver.url = url

    def wait(self):
        while True:
            try:
                self.webdriver.execute_async_script("""let callback = arguments[arguments.length - 1];
addEventListener("__test_restart", e => {e.preventDefault(); callback(true)})""")
                self.webdriver.execute_async_script("")
            except (webdriver_error.TimeoutException,
                    webdriver_error.ScriptTimeoutException,
                    webdriver_error.JavascriptErrorException):
                # A JavascriptErrorException will happen when we navigate;
                # by ignoring it it's possible to reload the test whilst the
                # harness remains paused
                pass
            except (socket.timeout, webdriver_error.NoSuchWindowException, webdriver_error.UnknownErrorException, OSError):
                break
            except Exception:
                message = "Uncaught exception in WebDriverBaseProtocolPart.wait:\n"
                message += traceback.format_exc()
                self.logger.error(message)
                break
        return False


class WebDriverBidiBrowsingContextProtocolPart(BidiBrowsingContextProtocolPart):
    def __init__(self, parent):
        super().__init__(parent)
        self.webdriver = None

    def setup(self):
        self.webdriver = self.parent.webdriver

    async def handle_user_prompt(self,
                                 context: str,
                                 accept: Optional[bool] = None,
                                 user_text: Optional[str] = None) -> None:
        await self.webdriver.bidi_session.browsing_context.handle_user_prompt(
            context=context, accept=accept, user_text=user_text)


class WebDriverBidiEventsProtocolPart(BidiEventsProtocolPart):
    _subscriptions: List[Tuple[List[str], Optional[List[str]]]] = []

    def __init__(self, parent):
        super().__init__(parent)
        self.webdriver = None

    def setup(self):
        self.webdriver = self.parent.webdriver

    async def _contexts_to_top_contexts(self, contexts: Optional[List[str]]) -> Optional[List[str]]:
        """Gathers the list of top-level contexts for the given list of contexts."""
        if contexts is None:
            # Global subscription.
            return None
        top_contexts = set()
        for context in contexts:
            maybe_top_context = await self._get_top_context(context)
            if maybe_top_context is not None:
                # The context is found. Add its top-level context to the result set.
                top_contexts.add(maybe_top_context)
        return list(top_contexts)

    async def _get_top_context(self, context: str) -> Optional[str]:
        """Returns the top context id for the given context id."""
        # It is done in suboptimal way by calling `getTree` for each parent context until reaches the top context.
        # TODO: optimise. Construct the tree once and then traverse it.
        get_tree_result = await self.webdriver.bidi_session.browsing_context.get_tree(root=context)
        if not get_tree_result:
            # The context is not found. Nothing to do.
            return None
        assert len(get_tree_result) == 1, "The context should be unique."
        context_info = get_tree_result[0]
        if context_info["parent"] is None:
            # The context is top-level. Return its ID.
            return context
        return await self._get_top_context(context_info["parent"])

    async def subscribe(self, events, contexts):
        self.logger.info("Subscribing to events %s in %s" % (events, contexts))
        # The BiDi subscriptions are done for top context even if the sub-context is provided. We need to get the
        # top-level contexts list to handle the scenario when subscription is done for a sub-context which is closed
        # afterwards. However, the subscription itself is done for the provided contexts in order to throw in case of
        # the sub-context is removed.
        top_contexts = await self._contexts_to_top_contexts(contexts)
        result = await self.webdriver.bidi_session.session.subscribe(events=events, contexts=contexts)
        # The `subscribe` method either raises an exception or adds subscription. The command is atomic, meaning in case
        # of exception no subscription is added.
        self._subscriptions.append((events, top_contexts))
        return result

    async def unsubscribe_all(self):
        self.logger.info("Unsubscribing from all the events")
        while self._subscriptions:
            events, contexts = self._subscriptions.pop()
            self.logger.debug("Unsubscribing from events %s in %s" % (events, contexts))
            try:
                await self.webdriver.bidi_session.session.unsubscribe(events=events, contexts=contexts)
            except webdriver_bidi_error.NoSuchFrameException:
                # The browsing context is already removed. Nothing to do.
                pass
            except Exception as e:
                self.logger.error("Failed to unsubscribe from events %s in %s: %s" % (events, contexts, e))
                # Re-raise the exception to identify regressions.
                # TODO: consider to continue the loop in case of the exception.
                raise e

    def add_event_listener(self, name, fn):
        self.logger.info("adding event listener %s" % name)
        return self.webdriver.bidi_session.add_event_listener(name=name, fn=fn)


class WebDriverBidiScriptProtocolPart(BidiScriptProtocolPart):
    def __init__(self, parent):
        super().__init__(parent)
        self.webdriver = None

    def setup(self):
        self.webdriver = self.parent.webdriver

    async def call_function(self, function_declaration, target, arguments=None):
        return await self.webdriver.bidi_session.script.call_function(
            function_declaration=function_declaration,
            arguments=arguments,
            target=target,
            await_promise=True)


class WebDriverTestharnessProtocolPart(TestharnessProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver
        self.runner_handle = None
        with open(os.path.join(here, "runner.js")) as f:
            self.runner_script = f.read()
        with open(os.path.join(here, "window-loaded.js")) as f:
            self.window_loaded_script = f.read()

    def load_runner(self, url_protocol):
        if self.runner_handle:
            self.webdriver.window_handle = self.runner_handle
        url = urljoin(self.parent.executor.server_url(url_protocol),
                      "/testharness_runner.html")
        self.logger.debug("Loading %s" % url)

        self.webdriver.url = url
        self.runner_handle = self.webdriver.window_handle
        format_map = {"title": threading.current_thread().name.replace("'", '"')}
        self.parent.base.execute_script(self.runner_script % format_map)

    def close_old_windows(self):
        self.webdriver.actions.release()
        handles = [item for item in self.webdriver.handles if item != self.runner_handle]
        for handle in handles:
            self._close_window(handle)
        self.webdriver.window_handle = self.runner_handle
        return self.runner_handle

    def _close_window(self, window_handle):
        try:
            self.webdriver.window_handle = window_handle
            self.webdriver.window.close()
        except webdriver_error.NoSuchWindowException:
            pass

    def open_test_window(self, window_id):
        self.webdriver.execute_script(
            "window.open('about:blank', '%s', 'noopener')" % window_id)

    def get_test_window(self, window_id, parent, timeout=5):
        """Find the test window amongst all the open windows.
        This is assumed to be either the named window or the one after the parent in the list of
        window handles

        :param window_id: The DOM name of the Window
        :param parent: The handle of the runner window
        :param timeout: The time in seconds to wait for the window to appear. This is because in
                        some implementations there's a race between calling window.open and the
                        window being added to the list of WebDriver accessible windows."""
        test_window = None
        end_time = time.time() + timeout
        while time.time() < end_time:
            try:
                # Try using the JSON serialization of the WindowProxy object,
                # it's in Level 1 but nothing supports it yet
                win_s = self.webdriver.execute_script("return window['%s'];" % window_id)
                win_obj = json.loads(win_s)
                test_window = win_obj["window-fcc6-11e5-b4f8-330a88ab9d7f"]
            except Exception:
                pass

            if test_window is None:
                test_window = self._poll_handles_for_test_window(parent)

            if test_window is not None:
                assert test_window != parent
                return test_window

            time.sleep(0.1)

        raise Exception("unable to find test window")

    def _poll_handles_for_test_window(self, parent):
        test_window = None
        after = self.webdriver.handles
        if len(after) == 2:
            test_window = next(iter(set(after) - {parent}))
        elif after[0] == parent and len(after) > 2:
            # Hope the first one here is the test window
            test_window = after[1]
        return test_window

    def test_window_loaded(self):
        """Wait until the page in the new window has been loaded.

        Hereby ignore Javascript execptions that are thrown when
        the document has been unloaded due to a process change.
        """
        while True:
            try:
                self.webdriver.execute_script(self.window_loaded_script, asynchronous=True)
                break
            except webdriver_error.JavascriptErrorException:
                pass


class WebDriverSelectorProtocolPart(SelectorProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def elements_by_selector(self, selector):
        return self.webdriver.find.css(selector)


class WebDriverAccessibilityProtocolPart(AccessibilityProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def get_computed_label(self, element):
        return element.get_computed_label()

    def get_computed_role(self, element):
        return element.get_computed_role()


class WebDriverClickProtocolPart(ClickProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def element(self, element):
        self.logger.info("click " + repr(element))
        return element.click()


class WebDriverCookiesProtocolPart(CookiesProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def delete_all_cookies(self):
        self.logger.info("Deleting all cookies")
        return self.webdriver.send_session_command("DELETE", "cookie")

    def get_all_cookies(self):
        self.logger.info("Getting all cookies")
        return self.webdriver.send_session_command("GET", "cookie")

    def get_named_cookie(self, name):
        self.logger.info("Getting cookie named %s" % name)
        try:
            return self.webdriver.send_session_command("GET", "cookie/%s" % name)
        except webdriver_error.NoSuchCookieException:
            return None


class WebDriverWindowProtocolPart(WindowProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def minimize(self):
        self.logger.info("Minimizing")
        return self.webdriver.window.minimize()

    def set_rect(self, rect):
        self.logger.info("Restoring")
        self.webdriver.window.rect = rect

    def get_rect(self):
        self.logger.info("Getting rect")
        return self.webdriver.window.rect

class WebDriverSendKeysProtocolPart(SendKeysProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def send_keys(self, element, keys):
        try:
            return element.send_keys(keys)
        except webdriver_error.UnknownErrorException as e:
            # workaround https://bugs.chromium.org/p/chromedriver/issues/detail?id=1999
            if (e.http_status != 500 or
                e.status_code != "unknown error"):
                raise
            return element.send_element_command("POST", "value", {"value": list(keys)})


class WebDriverActionSequenceProtocolPart(ActionSequenceProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def send_actions(self, actions):
        self.webdriver.actions.perform(actions['actions'])

    def release(self):
        self.webdriver.actions.release()


class WebDriverTestDriverProtocolPart(TestDriverProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def send_message(self, cmd_id, message_type, status, message=None):
        obj = {
            "cmd_id": cmd_id,
            "type": "testdriver-%s" % str(message_type),
            "status": str(status)
        }
        if message:
            obj["message"] = str(message)
        self.webdriver.execute_script("window.postMessage(%s, '*')" % json.dumps(obj))

    def _switch_to_frame(self, index_or_elem):
        try:
            self.webdriver.switch_frame(index_or_elem)
        except (webdriver_error.StaleElementReferenceException,
                webdriver_error.NoSuchFrameException) as e:
            raise ValueError from e

    def _switch_to_parent_frame(self):
        self.webdriver.switch_frame("parent")


class WebDriverGenerateTestReportProtocolPart(GenerateTestReportProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def generate_test_report(self, message):
        json_message = {"message": message}
        self.webdriver.send_session_command("POST", "reporting/generate_test_report", json_message)


class WebDriverSetPermissionProtocolPart(SetPermissionProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def set_permission(self, descriptor, state):
        permission_params_dict = {
            "descriptor": descriptor,
            "state": state,
        }
        self.webdriver.send_session_command("POST", "permissions", permission_params_dict)


class WebDriverVirtualAuthenticatorProtocolPart(VirtualAuthenticatorProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def add_virtual_authenticator(self, config):
        return self.webdriver.send_session_command("POST", "webauthn/authenticator", config)

    def remove_virtual_authenticator(self, authenticator_id):
        return self.webdriver.send_session_command("DELETE", "webauthn/authenticator/%s" % authenticator_id)

    def add_credential(self, authenticator_id, credential):
        return self.webdriver.send_session_command("POST", "webauthn/authenticator/%s/credential" % authenticator_id, credential)

    def get_credentials(self, authenticator_id):
        return self.webdriver.send_session_command("GET", "webauthn/authenticator/%s/credentials" % authenticator_id)

    def remove_credential(self, authenticator_id, credential_id):
        return self.webdriver.send_session_command("DELETE", f"webauthn/authenticator/{authenticator_id}/credentials/{credential_id}")

    def remove_all_credentials(self, authenticator_id):
        return self.webdriver.send_session_command("DELETE", "webauthn/authenticator/%s/credentials" % authenticator_id)

    def set_user_verified(self, authenticator_id, uv):
        return self.webdriver.send_session_command("POST", "webauthn/authenticator/%s/uv" % authenticator_id, uv)


class WebDriverSPCTransactionsProtocolPart(SPCTransactionsProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def set_spc_transaction_mode(self, mode):
        body = {"mode": mode}
        return self.webdriver.send_session_command("POST", "secure-payment-confirmation/set-mode", body)

class WebDriverRPHRegistrationsProtocolPart(RPHRegistrationsProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def set_rph_registration_mode(self, mode):
        body = {"mode": mode}
        return self.webdriver.send_session_command("POST", "custom-handlers/set-mode", body)

class WebDriverFedCMProtocolPart(FedCMProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def cancel_fedcm_dialog(self):
        return self.webdriver.send_session_command("POST", "fedcm/canceldialog")

    def click_fedcm_dialog_button(self, dialog_button):
        body = {"dialogButton": dialog_button}
        return self.webdriver.send_session_command("POST", "fedcm/clickdialogbutton", body)

    def select_fedcm_account(self, account_index):
        body = {"accountIndex": account_index}
        return self.webdriver.send_session_command("POST", "fedcm/selectaccount", body)

    def get_fedcm_account_list(self):
        return self.webdriver.send_session_command("GET", "fedcm/accountlist")

    def get_fedcm_dialog_title(self):
        return self.webdriver.send_session_command("GET", "fedcm/gettitle")

    def get_fedcm_dialog_type(self):
        return self.webdriver.send_session_command("GET", "fedcm/getdialogtype")

    def set_fedcm_delay_enabled(self, enabled):
        body = {"enabled": enabled}
        return self.webdriver.send_session_command("POST", "fedcm/setdelayenabled", body)

    def reset_fedcm_cooldown(self):
        return self.webdriver.send_session_command("POST", "fedcm/resetcooldown")


class WebDriverDebugProtocolPart(DebugProtocolPart):
    def load_devtools(self):
        raise NotImplementedError()


class WebDriverVirtualSensorPart(VirtualSensorProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def create_virtual_sensor(self, sensor_type, sensor_params):
        body = {"type": sensor_type}
        body.update(sensor_params)
        return self.webdriver.send_session_command("POST", "sensor", body)

    def update_virtual_sensor(self, sensor_type, reading):
        body = {"reading": reading}
        return self.webdriver.send_session_command("POST", "sensor/%s" % sensor_type, body)

    def remove_virtual_sensor(self, sensor_type):
        return self.webdriver.send_session_command("DELETE", "sensor/%s" % sensor_type)

    def get_virtual_sensor_information(self, sensor_type):
        return self.webdriver.send_session_command("GET", "sensor/%s" % sensor_type)

class WebDriverDevicePostureProtocolPart(DevicePostureProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def set_device_posture(self, posture):
        body = {"posture": posture}
        return self.webdriver.send_session_command("POST", "deviceposture", body)

    def clear_device_posture(self):
        return self.webdriver.send_session_command("DELETE", "deviceposture")

class WebDriverStorageProtocolPart(StorageProtocolPart):
    def setup(self):
        self.webdriver = self.parent.webdriver

    def run_bounce_tracking_mitigations(self):
        return self.webdriver.send_session_command("DELETE", "storage/run_bounce_tracking_mitigations")

class WebDriverProtocol(Protocol):
    enable_bidi = False
    implements = [WebDriverBaseProtocolPart,
                  WebDriverTestharnessProtocolPart,
                  WebDriverSelectorProtocolPart,
                  WebDriverAccessibilityProtocolPart,
                  WebDriverClickProtocolPart,
                  WebDriverCookiesProtocolPart,
                  WebDriverSendKeysProtocolPart,
                  WebDriverWindowProtocolPart,
                  WebDriverActionSequenceProtocolPart,
                  WebDriverTestDriverProtocolPart,
                  WebDriverGenerateTestReportProtocolPart,
                  WebDriverSetPermissionProtocolPart,
                  WebDriverVirtualAuthenticatorProtocolPart,
                  WebDriverSPCTransactionsProtocolPart,
                  WebDriverRPHRegistrationsProtocolPart,
                  WebDriverFedCMProtocolPart,
                  WebDriverDebugProtocolPart,
                  WebDriverVirtualSensorPart,
                  WebDriverDevicePostureProtocolPart,
                  WebDriverStorageProtocolPart]

    def __init__(self, executor, browser, capabilities, **kwargs):
        super().__init__(executor, browser)
        self.capabilities = capabilities
        if hasattr(browser, "capabilities"):
            if self.capabilities is None:
                self.capabilities = browser.capabilities
            else:
                merge_dicts(self.capabilities, browser.capabilities)

        pac = browser.pac
        if pac is not None:
            if self.capabilities is None:
                self.capabilities = {}
            merge_dicts(self.capabilities, {"proxy":
                {
                    "proxyType": "pac",
                    "proxyAutoconfigUrl": urljoin(executor.server_url("http"), pac)
                }
            })

        self.url = browser.webdriver_url
        self.webdriver = None

    def connect(self):
        """Connect to browser via WebDriver and crete a WebDriver session."""
        self.logger.debug("Connecting to WebDriver on URL: %s" % self.url)

        host, port = self.url.split(":")[1].strip("/"), self.url.split(':')[-1].strip("/")

        capabilities = {"alwaysMatch": self.capabilities}
        self.webdriver = Session(host, port, capabilities=capabilities, enable_bidi=self.enable_bidi)
        self.webdriver.start()

    def teardown(self):
        self.logger.debug("Hanging up on WebDriver session")
        try:
            self.webdriver.end()
        except Exception as e:
            message = str(getattr(e, "message", ""))
            if message:
                message += "\n"
            message += traceback.format_exc()
            self.logger.debug(message)
        self.webdriver = None

    def is_alive(self) -> bool:
        if not self.webdriver:
            return False
        try:
            # Get a simple property over the connection, with 2 seconds of timeout
            # that should be more than enough to check if the WebDriver its
            # still alive, and allows to complete the check within the testrunner
            # 5 seconds of extra_timeout we have as maximum to end the test before
            # the external timeout from testrunner triggers.
            self.webdriver.send_session_command("GET", "window", timeout=2)
        except (OSError, webdriver_error.WebDriverException, socket.timeout,
                webdriver_error.UnknownErrorException,
                webdriver_error.InvalidSessionIdException):
            return False
        return True

    def after_connect(self):
        self.testharness.load_runner(self.executor.last_environment["protocol"])


class WebDriverBidiProtocol(WebDriverProtocol):
    enable_bidi = True
    implements = [WebDriverBidiBrowsingContextProtocolPart,
                  WebDriverBidiEventsProtocolPart,
                  WebDriverBidiScriptProtocolPart,
                  *(part for part in WebDriverProtocol.implements)
                  ]

    def __init__(self, executor, browser, capabilities, **kwargs):
        super().__init__(executor, browser, capabilities, **kwargs)
        self.loop = asyncio.new_event_loop()

    def connect(self):
        super().connect()
        self.loop.run_until_complete(self.webdriver.bidi_session.start(self.loop))

    def teardown(self):
        try:
            self.loop.run_until_complete(self.webdriver.bidi_session.end())
        except Exception as e:
            message = str(getattr(e, "message", ""))
            if message:
                message += "\n"
            message += traceback.format_exc()
            self.logger.debug(message)
        self.loop.stop()
        super().teardown()


class WebDriverRun(TimedRunner):
    def set_timeout(self):
        try:
            self.protocol.base.set_timeout(self.timeout + self.extra_timeout)
        except webdriver_error.UnknownErrorException:
            msg = "Lost WebDriver connection"
            self.logger.error(msg)
            return ("INTERNAL-ERROR", msg)

    def run_func(self):
        try:
            self.result = True, self.func(self.protocol, self.url, self.timeout)
        except (webdriver_error.TimeoutException, webdriver_error.ScriptTimeoutException):
            self.result = False, ("EXTERNAL-TIMEOUT", None)
        except socket.timeout:
            # Checking if the browser is alive below is likely to hang, so mark
            # this case as a CRASH unconditionally.
            self.result = False, ("CRASH", None)
        except Exception as e:
            if (isinstance(e, webdriver_error.WebDriverException) and
                    e.http_status == 408 and
                    e.status_code == "asynchronous script timeout"):
                # workaround for https://bugs.chromium.org/p/chromedriver/issues/detail?id=2001
                self.result = False, ("EXTERNAL-TIMEOUT", None)
            else:
                status = "INTERNAL-ERROR" if self.protocol.is_alive() else "CRASH"
                message = str(getattr(e, "message", ""))
                if message:
                    message += "\n"
                message += traceback.format_exc()
                self.result = False, (status, message)
        finally:
            self.result_flag.set()


class WebDriverTestharnessExecutor(TestharnessExecutor):
    supports_testdriver = True
    protocol_cls = WebDriverProtocol
    _get_next_message = None

    def __init__(self, logger, browser, server_config, timeout_multiplier=1,
                 close_after_done=True, capabilities=None, debug_info=None,
                 cleanup_after_test=True, **kwargs):
        """WebDriver-based executor for testharness.js tests"""
        TestharnessExecutor.__init__(self, logger, browser, server_config,
                                     timeout_multiplier=timeout_multiplier,
                                     debug_info=debug_info)
        self.protocol = self.protocol_cls(self, browser, capabilities)
        with open(os.path.join(here, "testharness_webdriver_resume.js")) as f:
            self.script_resume = f.read()
        with open(os.path.join(here, "window-loaded.js")) as f:
            self.window_loaded_script = f.read()

        if hasattr(self.protocol, 'bidi_script'):
            # If `bidi_script` is available, the messages can be handled via BiDi.
            self._get_next_message = self._get_next_message_bidi
        else:
            self._get_next_message = self._get_next_message_classic

        self.close_after_done = close_after_done
        self.window_id = str(uuid.uuid4())
        self.cleanup_after_test = cleanup_after_test

    def is_alive(self):
        return self.protocol.is_alive()

    def on_environment_change(self, new_environment):
        if new_environment["protocol"] != self.last_environment["protocol"]:
            self.protocol.testharness.load_runner(new_environment["protocol"])

    def do_test(self, test):
        url = self.test_url(test)

        success, data = WebDriverRun(self.logger,
                                     self.do_testharness,
                                     self.protocol,
                                     url,
                                     test.timeout * self.timeout_multiplier,
                                     self.extra_timeout).run()

        if success:
            data, extra = data
            return self.convert_result(test, data, extra=extra)

        return (test.make_result(*data), [])

    def do_testharness(self, protocol, url, timeout):
        # The previous test may not have closed its old windows (if something
        # went wrong or if cleanup_after_test was False), so clean up here.
        parent_window = protocol.testharness.close_old_windows()

        # If protocol implements `bidi_events`, remove all the existing subscriptions.
        if hasattr(protocol, 'bidi_events'):
            # Use protocol loop to run the async cleanup.
            protocol.loop.run_until_complete(protocol.bidi_events.unsubscribe_all())

        # Now start the test harness
        protocol.testharness.open_test_window(self.window_id)
        test_window = protocol.testharness.get_test_window(self.window_id,
                                                           parent_window,
                                                           timeout=5*self.timeout_multiplier)
        self.protocol.base.set_window(test_window)

        # Wait until about:blank has been loaded
        protocol.base.execute_script(self.window_loaded_script, asynchronous=True)

        # Exceptions occurred outside the main loop.
        unexpected_exceptions = []

        if hasattr(protocol, 'bidi_events'):
            # If protocol implements `bidi_events`, forward all the events to test_driver.
            async def process_bidi_event(method, params):
                try:
                    self.logger.debug(f"Received bidi event: {method}, {params}")
                    if hasattr(protocol, 'bidi_browsing_context') and method == "browsingContext.userPromptOpened" and \
                            params["context"] == test_window:
                        # User prompts of the test window are handled separately. In classic implementation, this user
                        # prompt always causes an exception when `_get_next_message` is called. In BiDi it's not a case,
                        # as the BiDi protocol allows sending commands even with the user prompt opened. However, the
                        # user prompt can block the testdriver JS execution and cause the dead loop. To overcome this
                        # issue, the user prompt of the test window is always dismissed and the test is failing.
                        try:
                            await protocol.bidi_browsing_context.handle_user_prompt(params["context"])
                        except Exception as e:
                            if "no such alert" in str(e):
                                # The user prompt is already dismissed by WebDriver BiDi server. Ignore the exception.
                                pass
                            else:
                                # The exception is unexpected. Re-raising it to handle it in the main loop.
                                raise e
                        raise Exception("Unexpected user prompt in test window: %s" % params)
                    else:
                        protocol.testdriver.send_message(-1, "event", method, json.dumps({
                            "params": params,
                            "method": method}))
                except Exception as e:
                    # As the event listener is async, the exceptions should be added to the list to be processed in the
                    # main loop.
                    self.logger.error("BiDi event processing failed: %s" % e)
                    unexpected_exceptions.append(e)

            protocol.bidi_events.add_event_listener(None, process_bidi_event)
            protocol.loop.run_until_complete(protocol.bidi_events.subscribe(['browsingContext.userPromptOpened'], None))

        # If possible, support async actions.
        if hasattr(protocol, 'loop'):
            handler = WebDriverAsyncCallbackHandler(self.logger, protocol, test_window, protocol.loop)
        else:
            handler = WebDriverCallbackHandler(self.logger, protocol, test_window)

        protocol.webdriver.url = url

        while True:
            if len(unexpected_exceptions) > 0:
                # TODO: what to do if there are more then 1 unexpected exceptions?
                raise unexpected_exceptions[0]

            test_driver_message = self._get_next_message(protocol, url, test_window)
            self.logger.debug("Receive message from testdriver: %s" % test_driver_message)

            # As of 2019-03-29, WebDriver does not define expected behavior for
            # cases where the browser crashes during script execution:
            #
            # https://github.com/w3c/webdriver/issues/1308
            if not isinstance(test_driver_message, list) or len(test_driver_message) != 3:
                try:
                    is_alive = self.is_alive()
                except webdriver_error.WebDriverException:
                    is_alive = False
                if not is_alive:
                    raise Exception("Browser crashed during script execution.")

            # In case of WebDriver Classic, a user prompt created after starting execution of the resume script will
            # resolve the script with `null` [1, 2]. In that case, cycle this event loop and handle the prompt the next
            # time the resume script executes.
            #
            # [1]: Step 5.3 of https://www.w3.org/TR/webdriver/#execute-async-script
            # [2]: https://www.w3.org/TR/webdriver/#dfn-execute-a-function-body
            if test_driver_message is None:
                continue

            done, rv = handler(test_driver_message)
            if done:
                break

        # If protocol implements `bidi_events`, remove all the existing subscriptions.
        if hasattr(protocol, 'bidi_events'):
            # Use protocol loop to run the async cleanup.
            protocol.loop.run_until_complete(protocol.bidi_events.unsubscribe_all())

        extra = {}
        if (leak_part := getattr(protocol, "leak", None)) and (counters := leak_part.check()):
            extra["leak_counters"] = counters

        # Attempt to clean up any leftover windows, if allowed. This is
        # preferable as it will blame the correct test if something goes wrong
        # closing windows, but if the user wants to see the test results we
        # have to leave the window(s) open.
        if self.cleanup_after_test:
            protocol.testharness.close_old_windows()

        if len(unexpected_exceptions) > 0:
            # TODO: what to do if there are more then 1 unexpected exceptions?
            raise unexpected_exceptions[0]

        return rv, extra

    def _get_next_message_classic(self, protocol, url, _):
        """
        Get the next message from the test_driver using the classic WebDriver async script execution. This will block
        the event loop until the test_driver send a message.
        :param window:
        """
        return protocol.base.execute_script(self.script_resume, asynchronous=True, args=[strip_server(url)])

    def _get_next_message_bidi(self, protocol, url, test_window):
        """
        Get the next message from the test_driver using async call. This will not block the event loop, which allows for
        processing the events from the test_runner to test_driver while waiting for the next test_driver commands.
        """
        # As long as we want to be able to use scripts both in bidi and in classic mode, the script should
        # be wrapped to some harness to emulate the WebDriver Classic async script execution. The script
        # will be provided with the `resolve` delegate, which finishes the execution. After that the
        # coroutine is finished as well.
        wrapped_script = """async function(...args){
                        return new Promise((resolve, reject) => {
                            args.push(resolve);
                            (async function(){
                                %s
                            }).apply(null, args);
                        })
                    }""" % self.script_resume

        bidi_url_argument = {
            "type": "string",
            "value": strip_server(url)
        }

        # `run_until_complete` allows processing BiDi events in the same loop while waiting for the next message.
        message = protocol.loop.run_until_complete(protocol.bidi_script.call_function(
            wrapped_script, target={
                "context": test_window
            },
            arguments=[bidi_url_argument]))
        # The message is in WebDriver BiDi format. Deserialize it.
        deserialized_message = bidi_deserialize(message)
        return deserialized_message


class WebDriverRefTestExecutor(RefTestExecutor):
    protocol_cls = WebDriverProtocol

    def __init__(self, logger, browser, server_config, timeout_multiplier=1,
                 screenshot_cache=None, close_after_done=True,
                 debug_info=None, capabilities=None, debug_test=False,
                 reftest_screenshot="unexpected", **kwargs):
        """WebDriver-based executor for reftests"""
        RefTestExecutor.__init__(self,
                                 logger,
                                 browser,
                                 server_config,
                                 screenshot_cache=screenshot_cache,
                                 timeout_multiplier=timeout_multiplier,
                                 debug_info=debug_info,
                                 reftest_screenshot=reftest_screenshot)
        self.protocol = self.protocol_cls(self,
                                          browser,
                                          capabilities=capabilities)
        self.implementation = RefTestImplementation(self)
        self.close_after_done = close_after_done
        self.has_window = False
        self.debug_test = debug_test

        with open(os.path.join(here, "test-wait.js")) as f:
            self.wait_script = f.read() % {"classname": "reftest-wait"}

    def reset(self):
        self.implementation.reset()

    def is_alive(self):
        return self.protocol.is_alive()

    def do_test(self, test):
        width_offset, height_offset = self.protocol.webdriver.execute_script(
            """return [window.outerWidth - window.innerWidth,
                       window.outerHeight - window.innerHeight];"""
        )
        # width_offset and height_offset should never be negative
        width_offset = max(width_offset, 0)
        height_offset = max(height_offset, 0)
        try:
            self.protocol.webdriver.window.position = (0, 0)
        except webdriver_error.InvalidArgumentException:
            # Safari 12 throws with 0 or 1, treating them as bools; fixed in STP
            self.protocol.webdriver.window.position = (2, 2)
        self.protocol.webdriver.window.size = (800 + width_offset, 600 + height_offset)

        result = self.implementation.run_test(test)

        if (leak_part := getattr(self.protocol, "leak", None)) and (counters := leak_part.check()):
            result.setdefault("extra", {})["leak_counters"] = counters

        if self.debug_test and result["status"] in ["PASS", "FAIL", "ERROR"] and "extra" in result:
            self.protocol.debug.load_reftest_analyzer(test, result)

        return self.convert_result(test, result)

    def screenshot(self, test, viewport_size, dpi, page_ranges):
        # https://github.com/web-platform-tests/wpt/issues/7135
        assert viewport_size is None
        assert dpi is None

        return WebDriverRun(self.logger,
                            self._screenshot,
                            self.protocol,
                            self.test_url(test),
                            test.timeout,
                            self.extra_timeout).run()

    def _screenshot(self, protocol, url, timeout):
        self.protocol.base.load(url)
        self.protocol.base.execute_script(self.wait_script, True)

        screenshot = self.protocol.webdriver.screenshot()
        if screenshot is None:
            raise ValueError('screenshot is None')

        # strip off the data:img/png, part of the url
        if screenshot.startswith("data:image/png;base64,"):
            screenshot = screenshot.split(",", 1)[1]

        return screenshot


class WebDriverCrashtestExecutor(CrashtestExecutor):
    protocol_cls = WebDriverProtocol

    def __init__(self, logger, browser, server_config, timeout_multiplier=1,
                 screenshot_cache=None, close_after_done=True,
                 debug_info=None, capabilities=None, **kwargs):
        """WebDriver-based executor for crashtests"""
        CrashtestExecutor.__init__(self,
                                   logger,
                                   browser,
                                   server_config,
                                   screenshot_cache=screenshot_cache,
                                   timeout_multiplier=timeout_multiplier,
                                   debug_info=debug_info)
        self.protocol = self.protocol_cls(self,
                                          browser,
                                          capabilities=capabilities)

        with open(os.path.join(here, "test-wait.js")) as f:
            self.wait_script = f.read() % {"classname": "test-wait"}

    def do_test(self, test):
        timeout = (test.timeout * self.timeout_multiplier if self.debug_info is None
                   else None)

        success, data = WebDriverRun(self.logger,
                                     self.do_crashtest,
                                     self.protocol,
                                     self.test_url(test),
                                     timeout,
                                     self.extra_timeout).run()

        if success:
            return self.convert_result(test, data)

        return (test.make_result(*data), [])

    def do_crashtest(self, protocol, url, timeout):
        protocol.base.load(url)
        protocol.base.execute_script(self.wait_script, asynchronous=True)
        result = {"status": "PASS", "message": None}
        if (leak_part := getattr(protocol, "leak", None)) and (counters := leak_part.check()):
            result["extra"] = {"leak_counters": counters}
        return result