chromium/testing/scripts/common.py

# Copyright 2014 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import argparse
import codecs
import contextlib
import json
import os
import logging
import platform
import subprocess
import sys
import tempfile
import time
import traceback

logging.basicConfig(level=logging.INFO)

sys.path.append(
    os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
# //testing imports.
import test_env
if sys.platform.startswith('linux'):
  import xvfb

SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))
SRC_DIR = os.path.abspath(
    os.path.join(SCRIPT_DIR, os.path.pardir, os.path.pardir))

# Use result_sink.py in //build/util/lib/results/ for uploading the
# results of non-isolated script tests.
BUILD_UTIL_DIR = os.path.join(SRC_DIR, 'build', 'util')
sys.path.insert(0, BUILD_UTIL_DIR)
try:
  from lib.results import result_sink
  from lib.results import result_types
except ImportError:
  # Some build-time scripts import this file and run into issues with
  # result_sink's dependency on requests since we can't depend on vpython
  # during build-time. So silently swallow the error in that case.
  result_sink = None

# run_web_tests.py returns the number of failures as the return
# code, but caps the return code at 101 to avoid overflow or colliding
# with reserved values from the shell.
MAX_FAILURES_EXIT_STATUS = 101

# Exit code to indicate infrastructure issue.
INFRA_FAILURE_EXIT_CODE = 87

# ACL might be explicitly set or inherited.
CORRECT_ACL_VARIANTS = [
    'APPLICATION PACKAGE AUTHORITY' \
    '\\ALL RESTRICTED APPLICATION PACKAGES:(OI)(CI)(RX)', \
    'APPLICATION PACKAGE AUTHORITY' \
    '\\ALL RESTRICTED APPLICATION PACKAGES:(I)(OI)(CI)(RX)'
]

# pylint: disable=useless-object-inheritance


def set_lpac_acls(acl_dir, is_test_script=False):
  """Sets LPAC ACLs on a directory. Windows 10 only."""
  if platform.release() != '10':
    return
  try:
    existing_acls = subprocess.check_output(['icacls', acl_dir],
                                            stderr=subprocess.STDOUT,
                                            universal_newlines=True)
  except subprocess.CalledProcessError as e:
    logging.error('Failed to retrieve existing ACLs for directory %s', acl_dir)
    logging.error('Command output: %s', e.output)
    sys.exit(e.returncode)
  acls_correct = False
  for acl in CORRECT_ACL_VARIANTS:
    if acl in existing_acls:
      acls_correct = True
  if not acls_correct:
    try:
      existing_acls = subprocess.check_output(
          ['icacls', acl_dir, '/grant', '*S-1-15-2-2:(OI)(CI)(RX)'],
          stderr=subprocess.STDOUT)
    except subprocess.CalledProcessError as e:
      logging.error('Failed to retrieve existing ACLs for directory %s',
                    acl_dir)
      logging.error('Command output: %s', e.output)
      sys.exit(e.returncode)
  if not is_test_script:
    return
  # Bots running on luci use hardlinks that do not have correct ACLs so these
  # must be manually overridden here.
  with temporary_file() as tempfile_path:
    subprocess.check_output(
        ['icacls', acl_dir, '/save', tempfile_path, '/t', '/q', '/c'],
        stderr=subprocess.STDOUT)
    # ACL files look like this, e.g. for c:\a\b\c\d\Release_x64
    #
    # Release_x64
    # D:AI(A;OICI;0x1200a9;;;S-1-15-2-2)(A;OICIID;FA;;;BA)
    # Release_x64\icudtl_extra.dat
    # D:AI(A;ID;0x1200a9;;;S-1-15-2-2)(A;ID;FA;;;BA)(A;ID;0x1301bf;;;BU)
    with codecs.open(tempfile_path, encoding='utf_16_le') as aclfile:
      for filename in aclfile:
        acl = next(aclfile).strip()
        full_filename = os.path.abspath(
            os.path.join(acl_dir, os.pardir, filename.strip()))
        if 'S-1-15-2-2' in acl:
          continue
        if os.path.isdir(full_filename):
          continue
        subprocess.check_output(
            ['icacls', full_filename, '/grant', '*S-1-15-2-2:(RX)'],
            stderr=subprocess.STDOUT)


def run_script(argv, funcs):

  def parse_json(path):
    with open(path) as f:
      return json.load(f)

  parser = argparse.ArgumentParser()
  parser.add_argument('--build-dir',
                      help='Absolute path to build-dir.',
                      required=True)
  parser.add_argument('--paths', type=parse_json, default={})
  # Properties describe the environment of the build, and are the same per
  # script invocation.
  parser.add_argument('--properties', type=parse_json, default={})
  # Args contains per-invocation arguments that potentially change the
  # behavior of the script.
  parser.add_argument('--args', type=parse_json, default=[])

  subparsers = parser.add_subparsers()

  run_parser = subparsers.add_parser('run')
  run_parser.add_argument('--output',
                          type=argparse.FileType('w'),
                          required=True)
  run_parser.add_argument('--filter-file', type=argparse.FileType('r'))
  run_parser.set_defaults(func=funcs['run'])

  run_parser = subparsers.add_parser('compile_targets')
  run_parser.add_argument('--output',
                          type=argparse.FileType('w'),
                          required=True)
  run_parser.set_defaults(func=funcs['compile_targets'])

  args = parser.parse_args(argv)
  return args.func(args)


def run_command(argv, env=None, cwd=None):
  print('Running %r in %r (env: %r)' % (argv, cwd, env), file=sys.stderr)
  rc = test_env.run_command(argv, env=env, cwd=cwd)
  print('Command %r returned exit code %d' % (argv, rc), file=sys.stderr)
  return rc


@contextlib.contextmanager
def temporary_file():
  fd, path = tempfile.mkstemp()
  os.close(fd)
  try:
    yield path
  finally:
    os.remove(path)


def record_local_script_results(name, output_fd, failures, valid):
  """Records to a local json file and to RDB the results of the script test.

  For legacy reasons, local script tests (ie: script tests that run
  locally and that don't conform to the isolated-test API) are expected to
  record their results using a specific format. This method encapsulates
  that format and also uploads those results to Result DB.

  Args:
    name: Name of the script test.
    output_fd: A .write()-supporting file descriptor to write results to.
    failures: List of strings representing test failures.
    valid: Whether the results are valid.
  """
  local_script_results = {'valid': valid, 'failures': failures}
  with open(output_fd.name, 'w') as fd:
    json.dump(local_script_results, fd)

  if not result_sink:
    return
  result_sink_client = result_sink.TryInitClient()
  if not result_sink_client:
    return
  status = result_types.PASS
  if not valid:
    status = result_types.UNKNOWN
  elif failures:
    status = result_types.FAIL
  test_log = '\n'.join(failures)
  result_sink_client.Post(name, status, None, test_log, None)


def parse_common_test_results(json_results, test_separator='/'):

  def convert_trie_to_flat_paths(trie, prefix=None):
    # Also see blinkpy.web_tests.layout_package.json_results_generator
    result = {}
    for name, data in trie.items():
      if prefix:
        name = prefix + test_separator + name
      if len(data) and not 'actual' in data and not 'expected' in data:
        result.update(convert_trie_to_flat_paths(data, name))
      else:
        result[name] = data
    return result

  results = {
      'passes': {},
      'unexpected_passes': {},
      'failures': {},
      'unexpected_failures': {},
      'flakes': {},
      'unexpected_flakes': {},
  }

  # TODO(dpranke): crbug.com/357866 - we should simplify the handling of
  # both the return code and parsing the actual results, below.

  passing_statuses = ('PASS', 'SLOW', 'NEEDSREBASELINE')

  for test, result in convert_trie_to_flat_paths(json_results['tests']).items():
    key = 'unexpected_' if result.get('is_unexpected') else ''
    data = result['actual']
    actual_results = data.split()
    last_result = actual_results[-1]
    expected_results = result['expected'].split()

    if (len(actual_results) > 1 and
        (last_result in expected_results or last_result in passing_statuses)):
      key += 'flakes'
    elif last_result in passing_statuses:
      key += 'passes'
      # TODO(dpranke): crbug.com/357867 ...  Why are we assigning result
      # instead of actual_result here. Do we even need these things to be
      # hashes, or just lists?
      data = result
    else:
      key += 'failures'
    results[key][test] = data

  return results


def write_interrupted_test_results_to(filepath, test_start_time):
  """Writes a test results JSON file* to filepath.

  This JSON file is formatted to explain that something went wrong.

  *src/docs/testing/json_test_results_format.md

  Args:
    filepath: A path to a file to write the output to.
    test_start_time: The start time of the test run expressed as a
      floating-point offset in seconds from the UNIX epoch.
  """
  with open(filepath, 'w') as fh:
    output = {
        'interrupted': True,
        'num_failures_by_type': {},
        'seconds_since_epoch': test_start_time,
        'tests': {},
        'version': 3,
    }
    json.dump(output, fh)


def get_gtest_summary_passes(output):
  """Returns a mapping of test to boolean indicating if the test passed.

  Only partially parses the format. This code is based on code in tools/build,
  specifically
  https://chromium.googlesource.com/chromium/tools/build/+/17fef98756c5f250b20bf716829a0004857235ff/scripts/slave/recipe_modules/test_utils/util.py#189
  """
  if not output:
    return {}

  mapping = {}

  for cur_iteration_data in output.get('per_iteration_data', []):
    for test_fullname, results in cur_iteration_data.items():
      # Results is a list with one entry per test try. Last one is the final
      # result.
      last_result = results[-1]

      if last_result['status'] == 'SUCCESS':
        mapping[test_fullname] = True
      elif last_result['status'] != 'SKIPPED':
        mapping[test_fullname] = False

  return mapping


def extract_filter_list(filter_list):
  """Helper for isolated script test wrappers. Parses the
  --isolated-script-test-filter command line argument. Currently, double-colon
  ('::') is used as the separator between test names, because a single colon may
  be used in the names of perf benchmarks, which contain URLs.
  """
  return filter_list.split('::')


def add_emulator_args(parser):
  parser.add_argument(
      '--avd-config',
      type=os.path.realpath,
      help=('Path to the avd config. Required for Android products. '
            '(See //tools/android/avd/proto for message definition '
            'and existing *.textpb files.)'))
  parser.add_argument('--emulator-window',
                      action='store_true',
                      default=False,
                      help='Enable graphical window display on the emulator.')


class BaseIsolatedScriptArgsAdapter:
  """The base class for all script adapters that need to translate flags
  set by isolated script test contract into the specific test script's flags.
  """

  def __init__(self):
    self._parser = argparse.ArgumentParser()
    self._options = None
    self._rest_args = None
    self._script_writes_output_json = None
    self._parser.add_argument(
        '--isolated-outdir',
        type=str,
        required=False,
        help='value of $ISOLATED_OUTDIR from swarming task')
    self._parser.add_argument('--isolated-script-test-output',
                              type=os.path.abspath,
                              required=False,
                              help='path to write test results JSON object to')
    self._parser.add_argument('--isolated-script-test-filter',
                              type=str,
                              required=False)
    self._parser.add_argument('--isolated-script-test-repeat',
                              type=int,
                              required=False)
    self._parser.add_argument('--isolated-script-test-launcher-retry-limit',
                              type=int,
                              required=False)
    self._parser.add_argument('--isolated-script-test-also-run-disabled-tests',
                              default=False,
                              action='store_true',
                              required=False)

    self._parser.add_argument(
        '--xvfb',
        help='start xvfb. Ignored on unsupported platforms',
        action='store_true')
    # Used to create the correct subclass.
    self._parser.add_argument('--script-type',
                              choices=['isolated', 'typ', 'bare'],
                              help='Which script adapter to use')

    # Arguments that are ignored, but added here because it's easier to ignore
    # them to to update bot configs to not pass them.
    self._parser.add_argument('--isolated-script-test-chartjson-output')
    self._parser.add_argument('--isolated-script-test-perf-output')

  def parse_args(self, args=None):
    self._options, self._rest_args = self._parser.parse_known_args(args)

  @property
  def parser(self):
    return self._parser

  @property
  def options(self):
    return self._options

  @property
  def rest_args(self):
    return self._rest_args

  def generate_test_output_args(self, output):
    del output  # unused
    return []

  def generate_test_filter_args(self, test_filter_str):
    del test_filter_str  # unused
    raise RuntimeError('Flag not supported.')

  def generate_test_repeat_args(self, repeat_count):
    del repeat_count  # unused
    raise RuntimeError('Flag not supported.')

  def generate_test_launcher_retry_limit_args(self, retry_limit):
    del retry_limit  # unused
    raise RuntimeError('Flag not supported.')

  def generate_sharding_args(self, total_shards, shard_index):
    del total_shards, shard_index  # unused
    raise RuntimeError('Flag not supported.')

  def generate_test_also_run_disabled_tests_args(self):
    raise RuntimeError('Flag not supported.')

  def select_python_executable(self):
    return sys.executable

  def generate_isolated_script_cmd(self):
    isolated_script_cmd = [self.select_python_executable()] + self.rest_args

    if self.options.isolated_script_test_output:
      output_args = self.generate_test_output_args(
          self.options.isolated_script_test_output)
      self._script_writes_output_json = bool(output_args)
      isolated_script_cmd += output_args

    # Augment test filter args if needed
    if self.options.isolated_script_test_filter:
      isolated_script_cmd += self.generate_test_filter_args(
          self.options.isolated_script_test_filter)

    # Augment test repeat if needed
    if self.options.isolated_script_test_repeat is not None:
      isolated_script_cmd += self.generate_test_repeat_args(
          self.options.isolated_script_test_repeat)

    # Augment test launcher retry limit args if needed
    if self.options.isolated_script_test_launcher_retry_limit is not None:
      isolated_script_cmd += self.generate_test_launcher_retry_limit_args(
          self.options.isolated_script_test_launcher_retry_limit)

    # Augment test also run disable tests args if needed
    if self.options.isolated_script_test_also_run_disabled_tests:
      isolated_script_cmd += self.generate_test_also_run_disabled_tests_args()

    # Augment shard args if needed
    env = os.environ.copy()

    total_shards = None
    shard_index = None

    if 'GTEST_TOTAL_SHARDS' in env:
      total_shards = int(env['GTEST_TOTAL_SHARDS'])
    if 'GTEST_SHARD_INDEX' in env:
      shard_index = int(env['GTEST_SHARD_INDEX'])
    if total_shards is not None and shard_index is not None:
      isolated_script_cmd += self.generate_sharding_args(
          total_shards, shard_index)

    return isolated_script_cmd

  def clean_up_after_test_run(self):
    pass

  def do_pre_test_run_tasks(self):
    pass

  def do_post_test_run_tasks(self):
    pass

  def _write_simple_test_results(self, start_time, exit_code):
    if exit_code is None:
      failure_type = 'CRASH'
    elif exit_code == 0:
      failure_type = 'PASS'
    else:
      failure_type = 'FAIL'

    test_name = os.path.basename(self._rest_args[0])
    # See //docs/testing/json_test_results_format.md
    results_json = {
        'version': 3,
        'interrupted': False,
        'num_failures_by_type': {
            failure_type: 1
        },
        'path_delimiter': '/',
        'seconds_since_epoch': start_time,
        'tests': {
            test_name: {
                'expected': 'PASS',
                'actual': failure_type,
                'time': time.time() - start_time,
            },
        },
    }
    with open(self.options.isolated_script_test_output, 'w') as fp:
      json.dump(results_json, fp)

  def run_test(self, cwd=None):
    self.parse_args()
    cmd = self.generate_isolated_script_cmd()

    self.do_pre_test_run_tasks()

    env = os.environ.copy()

    env['CHROME_HEADLESS'] = '1'
    print('Running command: %s\nwith env: %r' % (' '.join(cmd), env))
    sys.stdout.flush()
    start_time = time.time()
    try:
      if self.options.xvfb and sys.platform.startswith('linux'):
        exit_code = xvfb.run_executable(cmd, env, cwd=cwd)
      else:
        exit_code = test_env.run_command(cmd, env=env, cwd=cwd, log=False)
      print('Command returned exit code %d' % exit_code)
      sys.stdout.flush()
      self.do_post_test_run_tasks()
    except Exception:  # pylint: disable=broad-except
      traceback.print_exc()
      exit_code = None
    finally:
      self.clean_up_after_test_run()

    if (self.options.isolated_script_test_output
        and not self._script_writes_output_json):
      self._write_simple_test_results(start_time, exit_code)

    return exit_code if exit_code is not None else 2