chromium/third_party/wpt_tools/wpt/tools/wptrunner/wptrunner/executors/executormarionette.py

# mypy: allow-untyped-defs

import json
import os
import threading
import time
import traceback
import uuid

from urllib.parse import urljoin

errors = None
marionette = None
pytestrunner = None

here = os.path.dirname(__file__)

from .base import (CallbackHandler,
                   CrashtestExecutor,
                   RefTestExecutor,
                   RefTestImplementation,
                   TestharnessExecutor,
                   TimedRunner,
                   WdspecExecutor,
                   get_pages,
                   strip_server)
from .protocol import (AccessibilityProtocolPart,
                       ActionSequenceProtocolPart,
                       AssertsProtocolPart,
                       BaseProtocolPart,
                       TestharnessProtocolPart,
                       PrefsProtocolPart,
                       Protocol,
                       StorageProtocolPart,
                       SelectorProtocolPart,
                       ClickProtocolPart,
                       CookiesProtocolPart,
                       SendKeysProtocolPart,
                       TestDriverProtocolPart,
                       CoverageProtocolPart,
                       GenerateTestReportProtocolPart,
                       VirtualAuthenticatorProtocolPart,
                       WindowProtocolPart,
                       SetPermissionProtocolPart,
                       PrintProtocolPart,
                       DebugProtocolPart,
                       VirtualSensorProtocolPart,
                       DevicePostureProtocolPart,
                       merge_dicts)


def do_delayed_imports():
    global errors, marionette, Addons, WebAuthn

    from marionette_driver import marionette, errors
    from marionette_driver.addons import Addons
    from marionette_driver.webauthn import WebAuthn


def _switch_to_window(marionette, handle):
    """Switch to the specified window; subsequent commands will be
    directed at the new window.

    This is a workaround for issue 24924[0]; marionettedriver 3.1.0 dropped the
    'name' parameter from its switch_to_window command, but it is still needed
    for at least Firefox 79.

    [0]: https://github.com/web-platform-tests/wpt/issues/24924

    :param marionette: The Marionette instance
    :param handle: The id of the window to switch to.
    """
    marionette._send_message("WebDriver:SwitchToWindow",
                             {"handle": handle, "name": handle, "focus": True})
    marionette.window = handle


class MarionetteCallbackHandler(CallbackHandler):
    def __init__(self, logger, protocol, test_window):
        MarionetteCallbackHandler.expected_exc = (errors.MarionetteException,)
        super().__init__(logger, protocol, test_window)


class MarionetteBaseProtocolPart(BaseProtocolPart):
    def __init__(self, parent):
        super().__init__(parent)
        self.timeout = None

    def setup(self):
        self.marionette = self.parent.marionette

    def execute_script(self, script, asynchronous=False, args=None):
        method = self.marionette.execute_async_script if asynchronous else self.marionette.execute_script
        script_args = args if args is not None else []
        return method(script, script_args=script_args, new_sandbox=False, sandbox=None)

    def set_timeout(self, timeout):
        """Set the Marionette script timeout.

        :param timeout: Script timeout in seconds

        """
        if timeout != self.timeout:
            self.marionette.timeout.script = timeout
            self.timeout = timeout

    @property
    def current_window(self):
        return self.marionette.current_window_handle

    def set_window(self, handle):
        _switch_to_window(self.marionette, handle)

    def window_handles(self):
        return self.marionette.window_handles

    def load(self, url):
        self.marionette.navigate(url)

    def wait(self):
        try:
            socket_timeout = self.marionette.client.socket_timeout
        except AttributeError:
            # This can happen if there was a crash
            return
        if socket_timeout:
            try:
                self.marionette.timeout.script = socket_timeout / 2
            except OSError:
                self.logger.debug("Socket closed")
                return

        while True:
            try:
                rv = self.marionette.execute_async_script("""let callback = arguments[arguments.length - 1];
addEventListener("__test_restart", e => {e.preventDefault(); callback(true)})""")
                # None can be returned if we try to run the script again before we've completed a navigation.
                # In that case, keep retrying
                if rv is not None:
                    return rv
            except errors.NoSuchWindowException:
                # The window closed
                break
            except errors.ScriptTimeoutException:
                self.logger.debug("Script timed out")
                pass
            except errors.JavascriptException as e:
                # This can happen if we navigate, but just keep going
                self.logger.debug(e)
                pass
            except OSError:
                self.logger.debug("Socket closed")
                break
            except Exception:
                self.logger.warning(traceback.format_exc())
                break
        return False


class MarionetteTestharnessProtocolPart(TestharnessProtocolPart):
    def __init__(self, parent):
        super().__init__(parent)
        self.runner_handle = None
        with open(os.path.join(here, "runner.js")) as f:
            self.runner_script = f.read()
        with open(os.path.join(here, "window-loaded.js")) as f:
            self.window_loaded_script = f.read()

    def setup(self):
        self.marionette = self.parent.marionette

    def load_runner(self, url_protocol):
        # Check if we previously had a test window open, and if we did make sure it's closed
        if self.runner_handle:
            self._close_windows()
        url = urljoin(self.parent.executor.server_url(url_protocol), "/testharness_runner.html")
        self.logger.debug("Loading %s" % url)
        try:
            self.dismiss_alert(lambda: self.marionette.navigate(url))
        except Exception:
            self.logger.critical(
                "Loading initial page %s failed. Ensure that the "
                "there are no other programs bound to this port and "
                "that your firewall rules or network setup does not "
                "prevent access.\n%s" % (url, traceback.format_exc()))
            raise
        self.runner_handle = self.marionette.current_window_handle
        format_map = {"title": threading.current_thread().name.replace("'", '"')}
        self.parent.base.execute_script(self.runner_script % format_map)

    def _close_windows(self):
        handles = self.marionette.window_handles
        runner_handle = None
        try:
            handles.remove(self.runner_handle)
            runner_handle = self.runner_handle
        except ValueError:
            # The runner window probably changed id but we can restore it
            # This isn't supposed to happen, but marionette ids are not yet stable
            # We assume that the first handle returned corresponds to the runner,
            # but it hopefully doesn't matter too much if that assumption is
            # wrong since we reload the runner in that tab anyway.
            runner_handle = handles.pop(0)
            self.logger.info("Changing harness_window to %s" % runner_handle)

        for handle in handles:
            try:
                self.logger.info("Closing window %s" % handle)
                _switch_to_window(self.marionette, handle)
                self.dismiss_alert(lambda: self.marionette.close())
            except errors.NoSuchWindowException:
                # We might have raced with the previous test to close this
                # window, skip it.
                pass
        _switch_to_window(self.marionette, runner_handle)
        return runner_handle

    def close_old_windows(self, url_protocol):
        runner_handle = self._close_windows()
        if runner_handle != self.runner_handle:
            self.load_runner(url_protocol)
        return self.runner_handle

    def dismiss_alert(self, f):
        while True:
            try:
                f()
            except errors.UnexpectedAlertOpen:
                alert = self.marionette.switch_to_alert()
                try:
                    alert.dismiss()
                except errors.NoAlertPresentException:
                    pass
            else:
                break

    def get_test_window(self, window_id, parent, timeout=5):
        """Find the test window amongst all the open windows.
        This is assumed to be either the named window or the one after the parent in the list of
        window handles

        :param window_id: The DOM name of the Window
        :param parent: The handle of the runner window
        :param timeout: The time in seconds to wait for the window to appear. This is because in
                        some implementations there's a race between calling window.open and the
                        window being added to the list of WebDriver accessible windows."""
        test_window = None
        end_time = time.time() + timeout
        while time.time() < end_time:
            if window_id:
                try:
                    # Try this, it's in Level 1 but nothing supports it yet
                    win_s = self.parent.base.execute_script("return window['%s'];" % self.window_id)
                    win_obj = json.loads(win_s)
                    test_window = win_obj["window-fcc6-11e5-b4f8-330a88ab9d7f"]
                except Exception:
                    pass

            if test_window is None:
                handles = self.marionette.window_handles
                if len(handles) == 2:
                    test_window = next(iter(set(handles) - {parent}))
                elif len(handles) > 2 and handles[0] == parent:
                    # Hope the first one here is the test window
                    test_window = handles[1]

            if test_window is not None:
                assert test_window != parent
                return test_window

            time.sleep(0.1)

        raise Exception("unable to find test window")

    def test_window_loaded(self):
        """Wait until the page in the new window has been loaded.

        Hereby ignore Javascript exceptions that are thrown when
        the document has been unloaded due to a process change.
        """
        while True:
            try:
                self.parent.base.execute_script(self.window_loaded_script, asynchronous=True)
                break
            except errors.JavascriptException as e:
                if e.message.startswith("Script evaluation aborted: Actor"):
                    # Special-case JavaScript errors for a JSWindowActor destroy
                    # until a decision is made on bug 1673478.
                    pass


class MarionettePrefsProtocolPart(PrefsProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def set(self, name, value):
        if not isinstance(value, str):
            value = str(value)

        if value.lower() not in ("true", "false"):
            try:
                int(value)
            except ValueError:
                value = f"'{value}'"
        else:
            value = value.lower()

        self.logger.info(f"Setting pref {name} to {value}")

        script = """
            let prefInterface = Components.classes["@mozilla.org/preferences-service;1"]
                                          .getService(Components.interfaces.nsIPrefBranch);
            let pref = '%s';
            let type = prefInterface.getPrefType(pref);
            let value = %s;
            switch(type) {
                case prefInterface.PREF_STRING:
                    prefInterface.setCharPref(pref, value);
                    break;
                case prefInterface.PREF_BOOL:
                    prefInterface.setBoolPref(pref, value);
                    break;
                case prefInterface.PREF_INT:
                    prefInterface.setIntPref(pref, value);
                    break;
                case prefInterface.PREF_INVALID:
                    // Pref doesn't seem to be defined already; guess at the
                    // right way to set it based on the type of value we have.
                    switch (typeof value) {
                        case "boolean":
                            prefInterface.setBoolPref(pref, value);
                            break;
                        case "string":
                            prefInterface.setCharPref(pref, value);
                            break;
                        case "number":
                            prefInterface.setIntPref(pref, value);
                            break;
                        default:
                            throw new Error("Unknown pref value type: " + (typeof value));
                    }
                    break;
                default:
                    throw new Error("Unknown pref type " + type);
            }
            """ % (name, value)
        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            self.marionette.execute_script(script)

    def clear(self, name):
        self.logger.info(f"Clearing pref {name}")
        script = """
            let prefInterface = Components.classes["@mozilla.org/preferences-service;1"]
                                          .getService(Components.interfaces.nsIPrefBranch);
            let pref = '%s';
            prefInterface.clearUserPref(pref);
            """ % name
        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            self.marionette.execute_script(script)

    def get(self, name):
        script = """
            let prefInterface = Components.classes["@mozilla.org/preferences-service;1"]
                                          .getService(Components.interfaces.nsIPrefBranch);
            let pref = '%s';
            let type = prefInterface.getPrefType(pref);
            switch(type) {
                case prefInterface.PREF_STRING:
                    return prefInterface.getCharPref(pref);
                case prefInterface.PREF_BOOL:
                    return prefInterface.getBoolPref(pref);
                case prefInterface.PREF_INT:
                    return prefInterface.getIntPref(pref);
                case prefInterface.PREF_INVALID:
                    return null;
            }
            """ % name
        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            rv = self.marionette.execute_script(script)
        self.logger.debug(f"Got pref {name} with value {rv}")
        return rv


class MarionetteStorageProtocolPart(StorageProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def clear_origin(self, url):
        self.logger.info("Clearing origin %s" % (url))
        script = """
            let url = '%s';
            let uri = Components.classes["@mozilla.org/network/io-service;1"]
                                .getService(Ci.nsIIOService)
                                .newURI(url);
            let ssm = Components.classes["@mozilla.org/scriptsecuritymanager;1"]
                                .getService(Ci.nsIScriptSecurityManager);
            let principal = ssm.createContentPrincipal(uri, {});
            let qms = Components.classes["@mozilla.org/dom/quota-manager-service;1"]
                                .getService(Components.interfaces.nsIQuotaManagerService);
            qms.clearStoragesForOriginPrefix(principal, "default");
            """ % url
        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            self.marionette.execute_script(script)


class MarionetteAssertsProtocolPart(AssertsProtocolPart):
    def setup(self):
        self.assert_count = {"chrome": 0, "content": 0}
        self.chrome_assert_count = 0
        self.marionette = self.parent.marionette

    def get(self):
        script = """
        debug = Cc["@mozilla.org/xpcom/debug;1"].getService(Ci.nsIDebug2);
        if (debug.isDebugBuild) {
          return debug.assertionCount;
        }
        return 0;
        """

        def get_count(context, **kwargs):
            try:
                context_count = self.marionette.execute_script(script, **kwargs)
                if context_count:
                    self.parent.logger.info("Got %s assert count %s" % (context, context_count))
                    test_count = context_count - self.assert_count[context]
                    self.assert_count[context] = context_count
                    return test_count
            except errors.NoSuchWindowException:
                # If the window was already closed
                self.parent.logger.warning("Failed to get assertion count; window was closed")
            except (errors.MarionetteException, OSError):
                # This usually happens if the process crashed
                pass

        counts = []
        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            counts.append(get_count("chrome"))
        if self.parent.e10s:
            counts.append(get_count("content", sandbox="system"))

        counts = [item for item in counts if item is not None]

        if not counts:
            return None

        return sum(counts)


class MarionetteSelectorProtocolPart(SelectorProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def elements_by_selector(self, selector):
        return self.marionette.find_elements("css selector", selector)


class MarionetteClickProtocolPart(ClickProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def element(self, element):
        return element.click()


class MarionetteCookiesProtocolPart(CookiesProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def delete_all_cookies(self):
        self.logger.info("Deleting all cookies")
        return self.marionette.delete_all_cookies()

    def get_all_cookies(self):
        self.logger.info("Getting all cookies")
        return self.marionette.get_cookies()

    def get_named_cookie(self, name):
        self.logger.info("Getting cookie named %s" % name)
        try:
            return self.marionette.get_cookie(name)
        # When errors.NoSuchCookieException is supported,
        # that should be used here instead.
        except Exception:
            return None


class MarionetteSendKeysProtocolPart(SendKeysProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def send_keys(self, element, keys):
        return element.send_keys(keys)

class MarionetteWindowProtocolPart(WindowProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def minimize(self):
        return self.marionette.minimize_window()

    def set_rect(self, rect):
        self.marionette.set_window_rect(rect["x"], rect["y"], rect["height"], rect["width"])

    def get_rect(self):
        return self.marionette.window_rect

class MarionetteActionSequenceProtocolPart(ActionSequenceProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def send_actions(self, actions):
        actions = self.marionette._to_json(actions)
        self.logger.info(actions)
        self.marionette._send_message("WebDriver:PerformActions", actions)

    def release(self):
        self.marionette._send_message("WebDriver:ReleaseActions", {})


class MarionetteTestDriverProtocolPart(TestDriverProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def send_message(self, cmd_id, message_type, status, message=None):
        obj = {
            "cmd_id": cmd_id,
            "type": "testdriver-%s" % str(message_type),
            "status": str(status)
        }
        if message:
            obj["message"] = str(message)
        self.parent.base.execute_script("window.postMessage(%s, '*')" % json.dumps(obj))

    def _switch_to_frame(self, index_or_elem):
        try:
            self.marionette.switch_to_frame(index_or_elem)
        except (errors.NoSuchFrameException,
                errors.StaleElementException) as e:
            raise ValueError from e

    def _switch_to_parent_frame(self):
        self.marionette.switch_to_parent_frame()


class MarionetteCoverageProtocolPart(CoverageProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

        if not self.parent.ccov:
            self.is_enabled = False
            return

        script = """
            const {PerTestCoverageUtils} = ChromeUtils.importESModule(
                "chrome://remote/content/marionette/PerTestCoverageUtils.sys.mjs"
            );
            return PerTestCoverageUtils.enabled;
            """
        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            self.is_enabled = self.marionette.execute_script(script)

    def reset(self):
        script = """
            var callback = arguments[arguments.length - 1];

            const {PerTestCoverageUtils} = ChromeUtils.importESModule(
                "chrome://remote/content/marionette/PerTestCoverageUtils.sys.mjs"
            );
            PerTestCoverageUtils.beforeTest().then(callback, callback);
            """
        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            try:
                error = self.marionette.execute_async_script(script)
                if error is not None:
                    raise Exception('Failure while resetting counters: %s' % json.dumps(error))
            except (errors.MarionetteException, OSError):
                # This usually happens if the process crashed
                pass

    def dump(self):
        if len(self.marionette.window_handles):
            handle = self.marionette.window_handles[0]
            _switch_to_window(self.marionette, handle)

        script = """
            var callback = arguments[arguments.length - 1];

            const {PerTestCoverageUtils} = ChromeUtils.importESModule(
                "chrome://remote/content/marionette/PerTestCoverageUtils.sys.mjs"
            );
            PerTestCoverageUtils.afterTest().then(callback, callback);
            """
        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            try:
                error = self.marionette.execute_async_script(script)
                if error is not None:
                    raise Exception('Failure while dumping counters: %s' % json.dumps(error))
            except (errors.MarionetteException, OSError):
                # This usually happens if the process crashed
                pass

class MarionetteGenerateTestReportProtocolPart(GenerateTestReportProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def generate_test_report(self, config):
        raise NotImplementedError("generate_test_report not yet implemented")

class MarionetteVirtualAuthenticatorProtocolPart(VirtualAuthenticatorProtocolPart):
    def setup(self):
        self.webauthn = WebAuthn(self.parent.marionette)

    def add_virtual_authenticator(self, config):
        return self.webauthn.add_virtual_authenticator(config)

    def remove_virtual_authenticator(self, authenticator_id):
        self.webauthn.remove_virtual_authenticator(authenticator_id)

    def add_credential(self, authenticator_id, credential):
        self.webauthn.add_credential(authenticator_id, credential)

    def get_credentials(self, authenticator_id):
        return self.webauthn.get_credentials(authenticator_id)

    def remove_credential(self, authenticator_id, credential_id):
        self.webauthn.remove_credential(authenticator_id, credential_id)

    def remove_all_credentials(self, authenticator_id):
        self.webauthn.remove_all_credentials(authenticator_id)

    def set_user_verified(self, authenticator_id, uv):
        self.webauthn.set_user_verified(authenticator_id, uv)


class MarionetteSetPermissionProtocolPart(SetPermissionProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def set_permission(self, descriptor, state):
        body = {
            "descriptor": descriptor,
            "state": state,
        }
        try:
            self.marionette._send_message("WebDriver:SetPermission", body)
        except errors.UnsupportedOperationException as e:
            raise NotImplementedError("set_permission not yet implemented") from e


class MarionettePrintProtocolPart(PrintProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette
        self.runner_handle = None

    def load_runner(self):
        url = urljoin(self.parent.executor.server_url("http"), "/print_pdf_runner.html")
        self.logger.debug("Loading %s" % url)
        try:
            self.marionette.navigate(url)
        except Exception as e:
            self.logger.critical(
                "Loading initial page %s failed. Ensure that the "
                "there are no other programs bound to this port and "
                "that your firewall rules or network setup does not "
                "prevent access.\n%s" % (url, traceback.format_exc(e)))
            raise
        self.runner_handle = self.marionette.current_window_handle

    def render_as_pdf(self, width, height):
        margin = 0.5 * 2.54
        body = {
            "page": {
                "width": width,
                "height": height
            },
            "margin": {
                "left": margin,
                "right": margin,
                "top": margin,
                "bottom": margin,
            },
            "shrinkToFit": False,
            "background": True,
        }
        return self.marionette._send_message("WebDriver:Print", body, key="value")

    def pdf_to_png(self, pdf_base64, page_ranges):
        handle = self.marionette.current_window_handle
        _switch_to_window(self.marionette, self.runner_handle)
        try:
            rv = self.marionette.execute_async_script("""
let callback = arguments[arguments.length - 1];
render('%s').then(result => callback(result))""" % pdf_base64, new_sandbox=False, sandbox=None)
            page_numbers = get_pages(page_ranges, len(rv))
            rv = [item for i, item in enumerate(rv) if i + 1 in page_numbers]
            return rv
        finally:
            _switch_to_window(self.marionette, handle)


class MarionetteDebugProtocolPart(DebugProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def load_devtools(self):
        with self.marionette.using_context(self.marionette.CONTEXT_CHROME):
            self.parent.base.execute_script("""
const { DevToolsShim } = ChromeUtils.importESModule(
  "chrome://devtools-startup/content/DevToolsShim.sys.mjs"
);

const callback = arguments[arguments.length - 1];

async function loadDevTools() {
    const tab = window.gBrowser.selectedTab;
    await DevToolsShim.showToolboxForTab(tab, {
          toolId: "webconsole",
          hostType: "window"
    });
}

loadDevTools().catch((e) => console.error("Devtools failed to load", e))
              .then(callback);
""", asynchronous=True)


class MarionetteAccessibilityProtocolPart(AccessibilityProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def get_computed_label(self, element):
        return element.computed_label

    def get_computed_role(self, element):
        return element.computed_role


class MarionetteVirtualSensorProtocolPart(VirtualSensorProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def create_virtual_sensor(self, sensor_type, sensor_params):
        raise NotImplementedError("create_virtual_sensor not yet implemented")

    def update_virtual_sensor(self, sensor_type, reading):
        raise NotImplementedError("update_virtual_sensor not yet implemented")

    def remove_virtual_sensor(self, remove_parameters):
        raise NotImplementedError("remove_virtual_sensor not yet implemented")

    def get_virtual_sensor_information(self, information_parameters):
        raise NotImplementedError("get_virtual_sensor_information not yet implemented")


class MarionetteDevicePostureProtocolPart(DevicePostureProtocolPart):
    def setup(self):
        self.marionette = self.parent.marionette

    def set_device_posture(self, posture):
        raise NotImplementedError("set_device_posture not yet implemented")

    def clear_device_posture(self):
        raise NotImplementedError("clear_device_posture not yet implemented")


class MarionetteProtocol(Protocol):
    implements = [MarionetteBaseProtocolPart,
                  MarionetteTestharnessProtocolPart,
                  MarionettePrefsProtocolPart,
                  MarionetteStorageProtocolPart,
                  MarionetteSelectorProtocolPart,
                  MarionetteClickProtocolPart,
                  MarionetteCookiesProtocolPart,
                  MarionetteSendKeysProtocolPart,
                  MarionetteWindowProtocolPart,
                  MarionetteActionSequenceProtocolPart,
                  MarionetteTestDriverProtocolPart,
                  MarionetteAssertsProtocolPart,
                  MarionetteCoverageProtocolPart,
                  MarionetteGenerateTestReportProtocolPart,
                  MarionetteVirtualAuthenticatorProtocolPart,
                  MarionetteSetPermissionProtocolPart,
                  MarionettePrintProtocolPart,
                  MarionetteDebugProtocolPart,
                  MarionetteAccessibilityProtocolPart,
                  MarionetteVirtualSensorProtocolPart,
                  MarionetteDevicePostureProtocolPart]

    def __init__(self, executor, browser, capabilities=None, timeout_multiplier=1, e10s=True, ccov=False):
        do_delayed_imports()

        super().__init__(executor, browser)
        self.marionette = None
        self.marionette_port = browser.marionette_port
        self.capabilities = capabilities
        if hasattr(browser, "capabilities"):
            if self.capabilities is None:
                self.capabilities = browser.capabilities
            else:
                merge_dicts(self.capabilities, browser.capabilities)
        self.timeout_multiplier = timeout_multiplier
        self.runner_handle = None
        self.e10s = e10s
        self.ccov = ccov

    def connect(self):
        self.logger.debug("Connecting to Marionette on port %i" % self.marionette_port)
        startup_timeout = marionette.Marionette.DEFAULT_STARTUP_TIMEOUT * self.timeout_multiplier
        self.marionette = marionette.Marionette(host='127.0.0.1',
                                                port=self.marionette_port,
                                                socket_timeout=None,
                                                startup_timeout=startup_timeout)

        self.logger.debug("Waiting for Marionette connection")
        while True:
            try:
                self.marionette.raise_for_port()
                break
            except OSError:
                # When running in a debugger wait indefinitely for Firefox to start
                if self.executor.debug_info is None:
                    raise

        self.logger.debug("Starting Marionette session")
        self.marionette.start_session(self.capabilities)
        self.logger.debug("Marionette session started")

    def after_connect(self):
        pass

    def teardown(self):
        if self.marionette and self.marionette.session_id:
            try:
                self.marionette._request_in_app_shutdown()
                self.marionette.delete_session(send_request=False)
                self.marionette.cleanup()
            except Exception:
                # This is typically because the session never started
                pass
        if self.marionette is not None:
            self.marionette = None
        super().teardown()

    def is_alive(self):
        try:
            self.marionette.current_window_handle
        except Exception:
            return False
        return True

    def on_environment_change(self, old_environment, new_environment):
        #Unset all the old prefs
        for name in old_environment.get("prefs", {}).keys():
            value = self.executor.original_pref_values[name]
            if value is None:
                self.prefs.clear(name)
            else:
                self.prefs.set(name, value)

        for name, value in new_environment.get("prefs", {}).items():
            self.executor.original_pref_values[name] = self.prefs.get(name)
            self.prefs.set(name, value)

        pac = new_environment.get("pac", None)

        if pac != old_environment.get("pac", None):
            if pac is None:
                self.prefs.clear("network.proxy.type")
                self.prefs.clear("network.proxy.autoconfig_url")
            else:
                self.prefs.set("network.proxy.type", 2)
                self.prefs.set("network.proxy.autoconfig_url",
                               urljoin(self.executor.server_url("http"), pac))

class ExecuteAsyncScriptRun(TimedRunner):
    def set_timeout(self):
        timeout = self.timeout

        try:
            if timeout is not None:
                self.protocol.base.set_timeout(timeout + self.extra_timeout)
            else:
                # We just want it to never time out, really, but marionette doesn't
                # make that possible. It also seems to time out immediately if the
                # timeout is set too high. This works at least.
                self.protocol.base.set_timeout(2**28 - 1)
        except OSError:
            msg = "Lost marionette connection before starting test"
            self.logger.error(msg)
            return ("INTERNAL-ERROR", msg)

    def before_run(self):
        index = self.url.rfind("/storage/")
        if index != -1:
            # Clear storage
            self.protocol.storage.clear_origin(self.url)

    def run_func(self):
        try:
            self.result = True, self.func(self.protocol, self.url, self.timeout)
        except errors.ScriptTimeoutException:
            self.logger.debug("Got a marionette timeout")
            self.result = False, ("EXTERNAL-TIMEOUT", None)
        except OSError:
            # This can happen on a crash
            # Also, should check after the test if the firefox process is still running
            # and otherwise ignore any other result and set it to crash
            self.logger.info("IOError on command, setting status to CRASH")
            self.result = False, ("CRASH", None)
        except errors.NoSuchWindowException:
            self.logger.info("NoSuchWindowException on command, setting status to CRASH")
            self.result = False, ("CRASH", None)
        except Exception as e:
            if isinstance(e, errors.JavascriptException) and str(e).startswith("Document was unloaded"):
                message = "Document unloaded; maybe test navigated the top-level-browsing context?"
            else:
                message = getattr(e, "message", "")
                if message:
                    message += "\n"
                message += traceback.format_exc()
                self.logger.warning(traceback.format_exc())
            self.result = False, ("INTERNAL-ERROR", message)
        finally:
            self.result_flag.set()


class MarionetteTestharnessExecutor(TestharnessExecutor):
    supports_testdriver = True

    def __init__(self, logger, browser, server_config, timeout_multiplier=1,
                 close_after_done=True, debug_info=None, capabilities=None,
                 debug=False, ccov=False, debug_test=False, **kwargs):
        """Marionette-based executor for testharness.js tests"""
        TestharnessExecutor.__init__(self, logger, browser, server_config,
                                     timeout_multiplier=timeout_multiplier,
                                     debug_info=debug_info)
        self.protocol = MarionetteProtocol(self,
                                           browser,
                                           capabilities,
                                           timeout_multiplier,
                                           kwargs["e10s"],
                                           ccov)
        with open(os.path.join(here, "testharness_webdriver_resume.js")) as f:
            self.script_resume = f.read()
        self.close_after_done = close_after_done
        self.window_id = str(uuid.uuid4())
        self.debug = debug
        self.debug_test = debug_test

        self.install_extensions = browser.extensions

        self.original_pref_values = {}

        if marionette is None:
            do_delayed_imports()

    def setup(self, runner, protocol=None):
        super().setup(runner, protocol)
        for extension_path in self.install_extensions:
            self.logger.info("Installing extension from %s" % extension_path)
            addons = Addons(self.protocol.marionette)
            addons.install(extension_path)

        self.protocol.testharness.load_runner(self.last_environment["protocol"])

    def is_alive(self):
        return self.protocol.is_alive()

    def on_environment_change(self, new_environment):
        self.protocol.on_environment_change(self.last_environment, new_environment)

        if new_environment["protocol"] != self.last_environment["protocol"]:
            self.protocol.testharness.load_runner(new_environment["protocol"])

    def do_test(self, test):
        timeout = (test.timeout * self.timeout_multiplier if self.debug_info is None
                   else None)

        success, data = ExecuteAsyncScriptRun(self.logger,
                                              self.do_testharness,
                                              self.protocol,
                                              self.test_url(test),
                                              timeout,
                                              self.extra_timeout).run()
        # The format of data depends on whether the test ran to completion or not
        # For asserts we only care about the fact that if it didn't complete, the
        # status is in the first field.
        status = None
        if not success:
            status = data[0]

        extra = None
        if self.debug and (success or status not in ("CRASH", "INTERNAL-ERROR")):
            assertion_count = self.protocol.asserts.get()
            if assertion_count is not None:
                extra = {"assertion_count": assertion_count}

        if success:
            return self.convert_result(test, data, extra=extra)

        return (test.make_result(extra=extra, *data), [])

    def do_testharness(self, protocol, url, timeout):
        parent_window = protocol.testharness.close_old_windows(self.last_environment["protocol"])

        if self.protocol.coverage.is_enabled:
            self.protocol.coverage.reset()

        protocol.base.execute_script("window.open('about:blank', '%s', 'noopener')" % self.window_id)
        test_window = protocol.testharness.get_test_window(self.window_id, parent_window,
                                                           timeout=10 * self.timeout_multiplier)
        self.protocol.base.set_window(test_window)
        protocol.testharness.test_window_loaded()

        if self.debug_test and self.browser.supports_devtools:
            self.protocol.debug.load_devtools()

        handler = MarionetteCallbackHandler(self.logger, protocol, test_window)
        protocol.marionette.navigate(url)
        while True:
            result = protocol.base.execute_script(
                self.script_resume, args=[strip_server(url)], asynchronous=True)
            if result is None:
                # This can happen if we get an content process crash
                return None
            done, rv = handler(result)
            if done:
                break

        if self.protocol.coverage.is_enabled:
            self.protocol.coverage.dump()

        return rv


class MarionetteRefTestExecutor(RefTestExecutor):
    is_print = False

    def __init__(self, logger, browser, server_config, timeout_multiplier=1,
                 screenshot_cache=None, close_after_done=True,
                 debug_info=None, reftest_internal=False,
                 reftest_screenshot="unexpected", ccov=False,
                 group_metadata=None, capabilities=None, debug=False,
                 browser_version=None, debug_test=False, **kwargs):
        """Marionette-based executor for reftests"""
        RefTestExecutor.__init__(self,
                                 logger,
                                 browser,
                                 server_config,
                                 screenshot_cache=screenshot_cache,
                                 timeout_multiplier=timeout_multiplier,
                                 debug_info=debug_info)
        self.protocol = MarionetteProtocol(self, browser, capabilities,
                                           timeout_multiplier, kwargs["e10s"],
                                           ccov)
        self.implementation = self.get_implementation(reftest_internal)
        self.implementation_kwargs = {}
        if reftest_internal:
            self.implementation_kwargs["screenshot"] = reftest_screenshot
            self.implementation_kwargs["chrome_scope"] = False
            # Older versions of Gecko require switching to chrome scope to run refests
            if browser_version is not None:
                try:
                    major_version = int(browser_version.split(".")[0])
                    self.implementation_kwargs["chrome_scope"] = major_version < 82
                except ValueError:
                    pass
        self.close_after_done = close_after_done
        self.has_window = False
        self.original_pref_values = {}
        self.group_metadata = group_metadata
        self.debug = debug
        self.debug_test = debug_test

        self.install_extensions = browser.extensions

        with open(os.path.join(here, "reftest.js")) as f:
            self.script = f.read()
        with open(os.path.join(here, "test-wait.js")) as f:
            self.wait_script = f.read() % {"classname": "reftest-wait"}

    def get_implementation(self, reftest_internal):
        return (InternalRefTestImplementation if reftest_internal
                else RefTestImplementation)(self)

    def setup(self, runner, protocol=None):
        super().setup(runner, protocol)
        for extension_path in self.install_extensions:
            self.logger.info("Installing extension from %s" % extension_path)
            addons = Addons(self.protocol.marionette)
            addons.install(extension_path)

        self.implementation.setup(**self.implementation_kwargs)

    def teardown(self):
        try:
            self.implementation.teardown()
            if self.protocol.marionette and self.protocol.marionette.session_id:
                handles = self.protocol.marionette.window_handles
                if handles:
                    _switch_to_window(self.protocol.marionette, handles[0])
            super().teardown()
        except Exception:
            # Ignore errors during teardown
            self.logger.warning("Exception during reftest teardown:\n%s" %
                                traceback.format_exc())

    def reset(self):
        self.implementation.reset(**self.implementation_kwargs)

    def is_alive(self):
        return self.protocol.is_alive()

    def on_environment_change(self, new_environment):
        self.protocol.on_environment_change(self.last_environment, new_environment)

    def do_test(self, test):
        if not isinstance(self.implementation, InternalRefTestImplementation):
            if self.close_after_done and self.has_window:
                self.protocol.marionette.close()
                _switch_to_window(self.protocol.marionette,
                                 self.protocol.marionette.window_handles[-1])
                self.has_window = False

            if not self.has_window:
                self.protocol.base.execute_script(self.script)
                self.protocol.base.set_window(self.protocol.marionette.window_handles[-1])
                self.has_window = True
                self.protocol.testharness.test_window_loaded()

        if self.protocol.coverage.is_enabled:
            self.protocol.coverage.reset()

        result = self.implementation.run_test(test)

        if self.protocol.coverage.is_enabled:
            self.protocol.coverage.dump()

        if self.debug:
            assertion_count = self.protocol.asserts.get()
            if "extra" not in result:
                result["extra"] = {}
            if assertion_count is not None:
                result["extra"]["assertion_count"] = assertion_count

        if self.debug_test and result["status"] in ["PASS", "FAIL", "ERROR"] and "extra" in result:
            self.protocol.base.set_window(self.protocol.base.window_handles()[0])
            self.protocol.debug.load_reftest_analyzer(test, result)

        return self.convert_result(test, result)

    def screenshot(self, test, viewport_size, dpi, page_ranges):
        # https://github.com/web-platform-tests/wpt/issues/7135
        assert viewport_size is None
        assert dpi is None

        timeout = self.timeout_multiplier * test.timeout if self.debug_info is None else None

        test_url = self.test_url(test)

        return ExecuteAsyncScriptRun(self.logger,
                                     self._screenshot,
                                     self.protocol,
                                     test_url,
                                     timeout,
                                     self.extra_timeout).run()

    def _screenshot(self, protocol, url, timeout):
        protocol.marionette.navigate(url)

        protocol.base.execute_script(self.wait_script, asynchronous=True)

        screenshot = protocol.marionette.screenshot(full=False)
        # strip off the data:img/png, part of the url
        if screenshot.startswith("data:image/png;base64,"):
            screenshot = screenshot.split(",", 1)[1]

        return screenshot


class InternalRefTestImplementation(RefTestImplementation):
    def __init__(self, executor):
        self.timeout_multiplier = executor.timeout_multiplier
        self.executor = executor
        self.chrome_scope = False

    @property
    def logger(self):
        return self.executor.logger

    def setup(self, screenshot="unexpected", chrome_scope=False):
        data = {"screenshot": screenshot, "isPrint": self.executor.is_print}
        if self.executor.group_metadata is not None:
            data["urlCount"] = {urljoin(self.executor.server_url(key[0]), key[1]):value
                                for key, value in self.executor.group_metadata.get("url_count", {}).items()
                                if value > 1}
        self.chrome_scope = chrome_scope
        if chrome_scope:
            self.logger.debug("Using marionette Chrome scope for reftests")
            self.executor.protocol.marionette.set_context(self.executor.protocol.marionette.CONTEXT_CHROME)
        self.executor.protocol.marionette._send_message("reftest:setup", data)

    def reset(self, **kwargs):
        # this is obvious wrong; it shouldn't be a no-op
        # see https://github.com/web-platform-tests/wpt/issues/15604
        pass

    def run_test(self, test):
        references = self.get_references(test, test)
        timeout = (test.timeout * 1000) * self.timeout_multiplier
        rv = self.executor.protocol.marionette._send_message("reftest:run",
                                                             {"test": self.executor.test_url(test),
                                                              "references": references,
                                                              "expected": test.expected(),
                                                              "timeout": timeout,
                                                              "width": 800,
                                                              "height": 600,
                                                              "pageRanges": test.page_ranges})["value"]
        return rv

    def get_references(self, root_test, node):
        rv = []
        for item, relation in node.references:
            rv.append([self.executor.test_url(item), self.get_references(root_test, item), relation,
                       {"fuzzy": self.get_fuzzy(root_test, [node, item], relation)}])
        return rv

    def teardown(self):
        try:
            if self.executor.protocol.marionette and self.executor.protocol.marionette.session_id:
                self.executor.protocol.marionette._send_message("reftest:teardown", {})
                if self.chrome_scope:
                    self.executor.protocol.marionette.set_context(
                        self.executor.protocol.marionette.CONTEXT_CONTENT)
                # the reftest runner opens/closes a window with focus, so as
                # with after closing a window we need to give a new window
                # focus
                handles = self.executor.protocol.marionette.window_handles
                if handles:
                    _switch_to_window(self.executor.protocol.marionette, handles[0])
        except Exception:
            # Ignore errors during teardown
            self.logger.warning(traceback.format_exc())


class MarionetteCrashtestExecutor(CrashtestExecutor):
    def __init__(self, logger, browser, server_config, timeout_multiplier=1,
                 debug_info=None, capabilities=None, debug=False,
                 ccov=False, **kwargs):
        """Marionette-based executor for testharness.js tests"""
        CrashtestExecutor.__init__(self, logger, browser, server_config,
                                   timeout_multiplier=timeout_multiplier,
                                   debug_info=debug_info)
        self.protocol = MarionetteProtocol(self,
                                           browser,
                                           capabilities,
                                           timeout_multiplier,
                                           kwargs["e10s"],
                                           ccov)

        self.original_pref_values = {}
        self.debug = debug

        with open(os.path.join(here, "test-wait.js")) as f:
            self.wait_script = f.read() % {"classname": "test-wait"}

        if marionette is None:
            do_delayed_imports()

    def is_alive(self):
        return self.protocol.is_alive()

    def on_environment_change(self, new_environment):
        self.protocol.on_environment_change(self.last_environment, new_environment)

    def do_test(self, test):
        timeout = (test.timeout * self.timeout_multiplier if self.debug_info is None
                   else None)

        success, data = ExecuteAsyncScriptRun(self.logger,
                                              self.do_crashtest,
                                              self.protocol,
                                              self.test_url(test),
                                              timeout,
                                              self.extra_timeout).run()
        status = None
        if not success:
            status = data[0]

        extra = None
        if self.debug and (success or status not in ("CRASH", "INTERNAL-ERROR")):
            assertion_count = self.protocol.asserts.get()
            if assertion_count is not None:
                extra = {"assertion_count": assertion_count}

        if success:
            return self.convert_result(test, data)

        return (test.make_result(extra=extra, *data), [])

    def do_crashtest(self, protocol, url, timeout):
        if self.protocol.coverage.is_enabled:
            self.protocol.coverage.reset()

        protocol.base.load(url)
        protocol.base.execute_script(self.wait_script, asynchronous=True)

        if self.protocol.coverage.is_enabled:
            self.protocol.coverage.dump()

        return {"status": "PASS",
                "message": None}


class MarionettePrintRefTestExecutor(MarionetteRefTestExecutor):
    is_print = True

    def __init__(self, logger, browser, server_config, timeout_multiplier=1,
                 screenshot_cache=None, close_after_done=True,
                 debug_info=None, reftest_screenshot="unexpected", ccov=False,
                 group_metadata=None, capabilities=None, debug=False,
                 reftest_internal=False, **kwargs):
        """Marionette-based executor for reftests"""
        MarionetteRefTestExecutor.__init__(self,
                                           logger,
                                           browser,
                                           server_config,
                                           timeout_multiplier=timeout_multiplier,
                                           screenshot_cache=screenshot_cache,
                                           close_after_done=close_after_done,
                                           debug_info=debug_info,
                                           reftest_screenshot=reftest_screenshot,
                                           reftest_internal=reftest_internal,
                                           ccov=ccov,
                                           group_metadata=group_metadata,
                                           capabilities=capabilities,
                                           debug=debug,
                                           **kwargs)

    def setup(self, runner, protocol=None):
        super().setup(runner, protocol)
        if not isinstance(self.implementation, InternalRefTestImplementation):
            self.protocol.pdf_print.load_runner()

    def get_implementation(self, reftest_internal):
        return (InternalRefTestImplementation if reftest_internal
                else RefTestImplementation)(self)

    def screenshot(self, test, viewport_size, dpi, page_ranges):
        # https://github.com/web-platform-tests/wpt/issues/7140
        assert dpi is None

        self.viewport_size = viewport_size
        timeout = self.timeout_multiplier * test.timeout if self.debug_info is None else None

        test_url = self.test_url(test)
        self.page_ranges = page_ranges.get(test.url)

        return ExecuteAsyncScriptRun(self.logger,
                                     self._render,
                                     self.protocol,
                                     test_url,
                                     timeout,
                                     self.extra_timeout).run()

    def _render(self, protocol, url, timeout):
        protocol.marionette.navigate(url)

        protocol.base.execute_script(self.wait_script, asynchronous=True)

        pdf = protocol.pdf_print.render_as_pdf(*self.viewport_size)
        screenshots = protocol.pdf_print.pdf_to_png(pdf, self.page_ranges)
        for i, screenshot in enumerate(screenshots):
            # strip off the data:img/png, part of the url
            if screenshot.startswith("data:image/png;base64,"):
                screenshots[i] = screenshot.split(",", 1)[1]

        return screenshots


class MarionetteWdspecExecutor(WdspecExecutor):
    def __init__(self, logger, browser, *args, **kwargs):
        super().__init__(logger, browser, *args, **kwargs)

        args = self.capabilities["moz:firefoxOptions"].setdefault("args", [])
        args.extend(["--profile", self.browser.profile])

        for option in ["androidPackage", "androidDeviceSerial", "env"]:
            if hasattr(browser, option):
                self.capabilities["moz:firefoxOptions"][option] = getattr(browser, option)