chromium/chrome/test/enterprise/e2e/infra/chrome_ent_test_case.py

# Copyright 2019 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import os
from posixpath import join
import random
import string
import subprocess
import time

from absl import flags
from chrome_ent_test.infra.core import EnterpriseTestCase

FLAGS = flags.FLAGS
flags.DEFINE_string('chrome_installer', None,
                    'The path to the chrome installer')
flags.mark_flag_as_required('chrome_installer')

flags.DEFINE_string(
    'chromedriver', None,
    'The path to the chromedriver executable. If not specified, '
    'a chocholatey chromedriver packae will be installed and used.')

flags.DEFINE_string('omaha_installer', None,
                    'The path to the omaha 4 UpdaterSetup.exe')

flags.DEFINE_string('omaha_updater', None,
                    'The path to the omaha 4 Updater.exe')


class ChromeEnterpriseTestCase(EnterpriseTestCase):
  """Base class for Chrome enterprise test cases."""
  # dc is the domain controller host
  win_2012_config = {
      'client': 'client2012',
      'dc': 'win2012-dc',
  }

  win_2016_config = {
      'client': 'client2016',
      'dc': 'win2016-dc',
  }

  win_2019_config = {
      'client': 'client2019',
      'dc': 'win2019-dc',
  }

  win_2022_config = {
      'client': 'client2022',
      'dc': 'win2022-dc',
  }

  # Current Win Server version for testing
  win_config = win_2022_config

  def AddFirewallExclusion(self, instance_name):
    """Add-MpPreference to exclude some folders from defenser scan."""
    program_file = '"$Env:ProgramFiles"'
    program_file_x86 = '"$Env:ProgramFiles(x86)"'
    local_appdata = '"$Env:LOCALAPPDATA"'
    celab_path = r'"c:\cel\supporting_files"'
    updater_path = r'"c:\temp"'
    cmd = (r'Add-MpPreference -ExclusionPath ' + ', '.join([
        program_file, program_file_x86, local_appdata, celab_path, updater_path
    ]))
    self.clients[instance_name].RunPowershell(cmd)

  def InstallGoogleUpdater(self, instance_name):
    """Install Omaha4 client on VM."""
    if not FLAGS.omaha_installer:
      # No omaha installer/updater. Do nothing.
      logging.debug('--omaha_installer flag is empty.'
                    'Skip installing google updater.')
      return
    cmd = r'New-Item -ItemType Directory -Force -Path c:\temp'
    self.clients[instance_name].RunPowershell(cmd)
    installer = self.UploadFile(instance_name, FLAGS.omaha_installer,
                                r'c:\temp')
    cmd = installer + r' --install --system'
    self.RunCommand(instance_name, cmd)

  def WakeGoogleUpdater(self, instance_name):
    """Runs updater.exe to wake up Omaha 4 service."""
    if not FLAGS.omaha_updater:
      logging.debug('--omaha_updater flag is empty.' 'Skip run google updater.')
      return

    updater = self.UploadFile(instance_name, FLAGS.omaha_updater, r'c:\temp')
    cmd = (
        updater + r' --wake' + r' --enable-logging' +
        r' --vmodule=*/components/winhttp/*=2,*/components/update_client/*=2,'
        r'*/chrome/updater/*=2')
    self.RunCommand(instance_name, cmd)

  def GetChromeVersion(self, instance_name):
    """Get Chrome Version by querying Windows registry"""
    cmd = (
        r'reg query' +
        r' "HKLM\SOFTWARE\Google\Update\Clients\{8A69D345-D564-463C-AFF1-A69D9E530F96}"'
        + r' /reg:32 /v pv')
    chrome_version = self.RunCommand(instance_name, cmd)

    return chrome_version.decode().split()[-1]

  def RunGoogleUpdaterTaskSchedulerCommand(self, instance_name, cmd):
    """Run task scheduler powershell command to Google Updater"""
    script = r'Get-ScheduledTask -TaskPath \GoogleSystem\GoogleUpdater\ | ' + cmd
    return self.clients[instance_name].RunPowershell(script).decode().strip()

  def WaitForUpdateCheck(self, instance_name):
    """Wait for the updater task to be ready again"""
    max_wait_time_secs = 120
    total_wait_time_secs = 0
    delta_secs = 5

    while total_wait_time_secs < max_wait_time_secs:
      time.sleep(delta_secs)
      total_wait_time_secs += delta_secs
      state = self.RunGoogleUpdaterTaskSchedulerCommand(
          instance_name, 'Select -ExpandProperty "State"')
      if state == 'Ready':
        break

  def InstallChrome(self, instance_name, system_level=False):
    """Installs chrome.

    Currently supports two types of installer:
    - mini_installer.exe, and
    - *.msi

    system_level is False by default for all reporting connector
    e2e tests as it does not need Omaha client to be installed.
    For DTC e2e test, however,
    system_level should be set True so that it will be installed together
    with Omaha 4 client and then call powershell script to add
    chrome path to $Env:Path.

    Args:
      instance_name: the gcp instance.
      system_level: whether the chrome install with --system-level
        or not. By default, the value is False.
    """
    cmd = r'New-Item -ItemType Directory -Force -Path c:\temp'
    self.clients[instance_name].RunPowershell(cmd)
    file_name = self.UploadFile(instance_name, FLAGS.chrome_installer,
                                r'c:\temp')

    if file_name.lower().endswith('mini_installer.exe'):
      dir = os.path.dirname(os.path.abspath(__file__))
      self.UploadFile(instance_name, os.path.join(dir, 'installer_data'),
                      r'c:\temp')
      if system_level:
        cmd = (
            file_name + r' --installerdata=c:\temp\installer_data' +
            r' --system-level')
      else:
        cmd = file_name + r' --installerdata=c:\temp\installer_data'
    else:
      cmd = 'msiexec /i %s' % file_name

    self.RunCommand(instance_name, cmd)

    cmd = (
        r'powershell -File c:\cel\supporting_files\ensure_chromium_api_keys.ps1'
        r' -Path gs://%s/api/key') % self.gsbucket
    self.RunCommand(instance_name, cmd)
    if system_level:
      cmd = r'powershell -File c:\cel\supporting_files\add_chrome_path.ps1'
      self.RunCommand(instance_name, cmd)

  def SetPolicy(self, instance_name, policy_name, policy_value, policy_type):
    r"""Sets a Google Chrome policy in registry.

    Args:
      policy_name: the policy name.
        The name can contain \. In this case, the last segment will be the
        real policy name, while anything before will be part of the key.
    """
    segments = policy_name.split('\\')
    policy_name = segments[-1]

    # The policy will be set for both Chrome and Chromium, since only
    # googlers can build Chrome-branded executable.
    keys = [
        r'HKLM\Software\Policies\Google\Chrome',
        r'HKLM\Software\Policies\Chromium'
    ]
    for key in keys:
      if len(segments) >= 2:
        key += '\\' + '\\'.join(segments[:-1])

      cmd = (r"Set-GPRegistryValue -Name 'Default Domain Policy' "
             "-Key %s -ValueName %s -Value %s -Type %s") % (
                 key, policy_name, policy_value, policy_type)
      self.clients[instance_name].RunPowershell(cmd)

  def SetOmahaPolicy(self, instance_name, policy_name, policy_value,
                     policy_type):
    key = r'HKLM\Software\Policies\Google\Update'
    cmd = (r"Set-GPRegistryValue -Name 'Default Domain Policy' "
           "-Key %s -ValueName %s -Value %s -Type %s") % (
               key, policy_name, policy_value, policy_type)
    self.clients[instance_name].RunPowershell(cmd)

  def RemoveDeviceTrustKey(self, instance_name):
    """Removes a device trust key in registry.

    Args:
      instance_name: the name of the GCP VM machine.
    """
    cmd = (r'Remove-Item -Path HKLM:\SOFTWARE\Google\Chrome\DeviceTrust '
           '-Force -Verbose')
    self.clients[instance_name].RunPowershell(cmd)

  def RemovePolicy(self, instance_name, policy_name):
    """Removes a Google Chrome policy in registry.

    Args:
      instance_name: the name of the GCP VM machine.
      policy_name: the policy name.
    """
    segments = policy_name.split('\\')
    policy_name = segments[-1]

    keys = [
        r'HKLM\Software\Policies\Google\Chrome',
        r'HKLM\Software\Policies\Chromium'
    ]
    for key in keys:
      if len(segments) >= 2:
        key += '\\' + '\\'.join(segments[:-1])

      cmd = (r"Remove-GPRegistryValue -Name 'Default Domain Policy' "
             "-Key %s -ValueName %s") % (key, policy_name)
      self.clients[instance_name].RunPowershell(cmd)

  def GetFileFromGCSBucket(self, path):
    """Get file from GCS bucket"""
    path = "gs://%s/%s" % (self.gsbucket, path)
    cmd = r'gsutil cat ' + path
    return self.RunCommand(self.win_config['client'], cmd).rstrip().decode()

  def InstallWebDriver(self, instance_name):
    self.RunCommand(instance_name, r'md -Force c:\temp')
    self.EnsurePythonInstalled(instance_name)
    self.InstallPipPackagesLatest(instance_name,
                                  ['selenium', 'absl-py', 'pywin32', 'attrs'])

    temp_dir = 'C:\\temp\\'
    if FLAGS.chromedriver is None:
      # chromedriver flag is not specified. Install the chocolatey package.
      self.InstallChocolateyPackage(instance_name, 'chromedriver',
                                    '74.0.3729.60')
      self.RunCommand(
          instance_name, "copy %s %s" %
          (r"C:\ProgramData\chocolatey\lib\chromedriver\tools\chromedriver.exe",
           temp_dir))
    else:
      self.UploadFile(instance_name, FLAGS.chromedriver, temp_dir)

    dir = os.path.dirname(os.path.abspath(__file__))
    self.UploadFile(instance_name, os.path.join(dir, 'test_util.py'), temp_dir)

  def RunWebDriverTest(self, instance_name, test_file, args=[]):
    """Runs a python webdriver test on an instance.

    Args:
      instance_name: name of the instance.
      test_file: the path of the webdriver test file.
      args: the list of arguments passed to the test.

    Returns:
      the output."""
    # upload the test
    file_name = self.UploadFile(instance_name, test_file, r'c:\temp')

    # run the test
    args = subprocess.list2cmdline(args)
    self._pythonExecutablePath[instance_name] = (
        r'C:\ProgramData\chocolatey\lib\python\tools\python.exe')
    cmd = r'%s -u %s %s' % (self._pythonExecutablePath[instance_name],
                            file_name, args)
    return self.RunCommand(instance_name, cmd).decode()

  def EnableHistogramSupport(self, instance_name, base_path):
    """Enable histogram package support on an instance.

    Note that base_path is the path to chrome/test/enterprise/e2e/connector.

    Args:
      instance_name: name of the instance.
      base_path: the base path of the test in the chromium_src.
    """
    dest_path = join('c:', 'temp', 'histogram')
    cmd = r'New-Item -ItemType Directory -Force -Path ' + dest_path
    self.clients[instance_name].RunPowershell(cmd)

    self.UploadFile(
        self.win_config['client'],
        os.path.join(base_path, 'common', 'histogram', '__init__.py'),
        dest_path)
    self.UploadFile(self.win_config['client'],
                    os.path.join(base_path, 'common', 'histogram', 'util.py'),
                    dest_path)

  def EnableDemoAgent(self, instance_name):
    # enterprise/e2e/connector/common/demo_agent
    base_path = dir = os.path.dirname(
        os.path.dirname(os.path.abspath(__file__)))
    agent_path = os.path.join(base_path, 'connector', 'common', 'demo_agent')

    # create dest path
    dest_path = join('c:', 'temp', 'demo_agent')
    cmd = r'New-Item -ItemType Directory -Force -Path ' + dest_path
    self.clients[instance_name].RunPowershell(cmd)

    # Install Visual C++ Redistributable package as demo agent's dependency
    gspath = "gs://%s/%s" % (self.gsbucket, 'secrets/VC_redist.x64.exe')
    cmd = r'gsutil cp ' + gspath + ' ' + dest_path

    self.RunCommand(instance_name, cmd)

    cmd = r'C:\temp\demo_agent\VC_redist.x64.exe /passive'
    self.RunCommand(instance_name, cmd)

    # upload demo agent
    self.UploadFile(self.win_config['client'],
                    os.path.join(agent_path, 'agent.zip'), dest_path)
    cmd = r'Expand-Archive -Path c:\temp\demo_agent\agent.zip -DestinationPath c:\temp\demo_agent'
    self.clients[instance_name].RunPowershell(cmd)

  def RunUITest(self, instance_name, test_file, timeout=300, args=[]):
    """Runs a UI test on an instance.

    Args:
      instance_name: name of the instance.
      test_file: the path of the UI test file.
      timeout: the timeout in seconds. Default is 300,
               i.e. 5 minutes.
      args: the list of arguments passed to the test.

    Returns:
      the output."""
    # upload the test
    file_name = self.UploadFile(instance_name, test_file, r'c:\temp')

    # check for cel_ui_agent.exe running
    self._checkUIAgentRunningOnInstance(instance_name)

    # run the test.
    # note that '-u' flag is passed to enable unbuffered stdout and stderr.
    # Without this flag, if the test is killed because of timeout, we will not
    # get any output from stdout because the output is buffered. When this
    # happens it makes debugging really hard.
    args = subprocess.list2cmdline(args)
    self._pythonExecutablePath[instance_name] = (
        r'C:\ProgramData\chocolatey\lib\python\tools\python.exe')
    ui_test_cmd = r'%s -u %s %s' % (self._pythonExecutablePath[instance_name],
                                    file_name, args)
    cmd = (r'%s c:\cel\supporting_files\run_ui_test.py --timeout %s -- %s') % (
        self._pythonExecutablePath[instance_name], timeout, ui_test_cmd)
    return self.RunCommand(instance_name, cmd, timeout=timeout).decode()

  def _generatePassword(self):
    """Generates a random password."""
    s = [random.choice(string.ascii_lowercase) for _ in range(4)]
    s += [random.choice(string.ascii_uppercase) for _ in range(4)]
    s += [random.choice(string.digits) for _ in range(4)]
    random.shuffle(s)
    return ''.join(s)

  def _checkUIAgentRunningOnInstance(self, instance_name):
    self.RunCommand(instance_name, 'tasklist | findstr cel_ui_agent.exe')

  def _rebootInstance(self, instance_name):
    self.RunCommand(instance_name, 'shutdown /r /t 0')

    # wait a while for the instance to boot up
    time.sleep(2 * 60)

  def EnableUITest(self, instance_name):
    """Configures the instance so that UI tests can be run on it."""
    self.InstallWebDriver(instance_name)
    self.InstallChocolateyPackage(instance_name, 'chocolatey_core_extension',
                                  '1.3.3')
    self.InstallChocolateyPackageLatest(instance_name, 'sysinternals')
    self.InstallPipPackagesLatest(instance_name,
                                  ['pywinauto', 'pyperclip', 'requests'])

    ui_test_user = 'ui_user'
    ui_test_password = self._generatePassword()
    cmd = (r'powershell -File c:\cel\supporting_files\enable_auto_logon.ps1 '
           r'-userName %s -password %s') % (ui_test_user, ui_test_password)
    self.RunCommand(instance_name, cmd)
    self._rebootInstance(instance_name)

    cmd = (r'powershell -File c:\cel\supporting_files\set_ui_agent.ps1 '
           '-username %s') % ui_test_user
    self.RunCommand(instance_name, cmd)
    self._rebootInstance(instance_name)