chromium/chrome/test/variations/drivers/chromeos.py

# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import functools
import os
import socket
import subprocess
import sys
import threading
import time

from contextlib import closing
from contextlib import contextmanager
from typing import List, Optional

import attr
from chrome.test.variations.drivers import DriverFactory
from chrome.test.variations.test_utils import SRC_DIR
from selenium import webdriver
from selenium.common.exceptions import WebDriverException

# The module chromite is under third_party and imported relative to its root.
sys.path.append(os.path.join(SRC_DIR, 'third_party'))
# The module catapult/telemetry is under third_party and imported relative to
# its root.
sys.path.append(os.path.join(SRC_DIR, 'third_party', 'catapult', 'telemetry'))

from chromite.lib import device, vm
from chromite.lib import remote_access
from telemetry.core.exceptions import BrowserConnectionGoneException
from telemetry.internal.browser import browser_options
from telemetry.internal.platform import cros_device
from telemetry.internal.platform.cros_platform_backend import CrosPlatformBackend
from telemetry.internal.backends.chrome import cros_browser_finder


CACHE_DIR = os.path.join(SRC_DIR, "build", "cros_cache")

class _PossibleCrOSBrowser(cros_browser_finder.PossibleCrOSBrowser):
  """The CrOS browser wrapper to filter out start-up args."""

  #override
  def GetBrowserStartupArgs(self, browser_options):
    startup_args = super().GetBrowserStartupArgs(browser_options)
    removed_args = [
      # This flag disables features from the seed file. We need to remove this
      # flag so the browser can load the seed file correctly.
      '--enable-gpu-benchmarking',
    ]
    return [arg for arg in startup_args if arg not in removed_args]


def _launch_browser(browser_args: List[str]) -> 'Browser':
  finder_options = browser_options.BrowserFinderOptions()
  finder_options.browser_type = 'cros-browser'
  finder_options.verbosity = 2
  finder_options.CreateParser().parse_args(args=[])

  b_options = finder_options.browser_options
  b_options.browser_startup_timeout = 15
  b_options.AppendExtraBrowserArgs(browser_args)

  device = cros_device.CrOSDevice(
    host_name='localhost',
    ssh_port=9222,
    ssh_identity=finder_options.ssh_identity,
    is_local=False)
  platform = CrosPlatformBackend.CreatePlatformForDevice(device, None)

  possibleBrowser = _PossibleCrOSBrowser(
    'cros-chrome', finder_options, platform, is_guest=False)
  possibleBrowser.SetUpEnvironment(b_options)

  try:
    browser = possibleBrowser.Create()
  except BrowserConnectionGoneException as e:
    raise WebDriverException from e
  return browser


def _wait_for_port(
    port: int, host: str='localhost', timeout:float=5) -> bool:
  start_time = time.perf_counter()
  with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
    while(time.perf_counter() - start_time <= timeout):
      if sock.connect_ex((host, port)) == 0:
        return True
      else:
        time.sleep(0.01)
  return False


@attr.attrs()
class CrOSDriverFactory(DriverFactory):
  channel: str = attr.attrib()
  board: str = attr.attrib()
  server_port: int = attr.attrib()

  #override
  def __attrs_post_init__(self):
    super().__attrs_post_init__()
    # We use this to check whether we have started the VM before we attempt to
    # shut it down.
    self._vm_started = False

  def _launch_vm(self) -> vm.VM:
    parser = vm.VM.GetParser()
    opts = parser.parse_args([
      f'--board={self.board}',
      f'--cache-dir={CACHE_DIR}',
    ])
    _device = device.Device.Create(opts)

    # VM will usually be started on a test bot already.
    if not _device.IsRunning():
      _device.Start()
    return _device

  def _ssh_forward(self, port: int, server_port: int) -> subprocess.Popen:
    local_pfs = remote_access.PortForwardSpec(local_port=port)
    remote_pfs = remote_access.PortForwardSpec(local_port=server_port)
    tunnel = self.device.remote.agent.CreateTunnel(
      to_local=[local_pfs], to_remote=[remote_pfs])
    if not _wait_for_port(port):
      return None
    return tunnel

  def _copy_seed_file(self, seed_file: str) -> str:
    assert os.path.exists(seed_file)
    remote_seed_path = f'/tmp/{os.path.basename(seed_file)}'
    remote_device = self.device.remote
    assert remote_device.IsDirWritable('/tmp/'), 'tmp dir not writable'
    remote_device.CopyToDevice(src=seed_file,
                               dest=remote_seed_path,
                               mode='scp',
                               verbose=True)
    assert remote_device.IfFileExists(remote_seed_path), (
      'file not pushed to device'
    )

    # The default owner is root, we need to chmod to any user.
    remote_device.run(
      ['chmod', 'a+rw', remote_seed_path], remote_sudo=True, print_cmd=True)
    return remote_seed_path

  #override
  @property
  def supports_startup_timeout(self) -> bool:
    # ChromeOS is a remote driver that doesn't support browser startup timeout.
    return False

  @functools.cached_property
  def device(self) -> device.Device:
    device_ = self._launch_vm()
    self._vm_started = True
    return device_

  @property
  def vm_started(self) -> bool:
    return self._vm_started

  @contextmanager
  def tunnel_context(self, debugging_port, server_port):
    tunnel = self._ssh_forward(debugging_port, server_port)
    if not tunnel:
      raise WebDriverException(f'Unable to forward port: {debugging_port}')

    def poll():
      stat = tunnel.poll()
      while stat == None:
        stat = tunnel.poll()
    threading.Thread(target=poll).start()

    try:
      yield
    finally:
      tunnel.terminate()

  #override
  @contextmanager
  def create_driver(
    self,
    seed_file: Optional[str] = None,
    options: Optional[webdriver.ChromeOptions] = None
    ):
    # This has a side-effect to boot up the VM if not yet already.
    assert self.device, "VM fails to boot."

    browser_args = []
    if seed_file:
      remote_seed_path = self._copy_seed_file(seed_file)
      browser_args.extend([
        f'--variations-test-seed-path="{remote_seed_path}"',
        f'--fake-variations-channel={self.channel}',
        '--disable-variations-safe-mode',
        '--disable-field-trial-config',
      ])

    browser = _launch_browser(browser_args)
    debugging_port, _ = browser._browser_backend._FindDevToolsPortAndTarget()

    options = options or self.default_options
    options.debugger_address=f'localhost:{debugging_port}'

    with self.tunnel_context(debugging_port, self.server_port):
      driver = webdriver.Chrome(service=self.get_driver_service(),
                                options=options)
      # VM may not be fully ready before it returns, wait for window handle
      # to double confirm.
      self.wait_for_window(driver)
      try:
        yield driver
      finally:
        driver.quit()

  def close(self):
    if self.vm_started and self.device.IsRunning():
      self.device.Stop()
    pass