chromium/third_party/blink/tools/blinkpy/web_tests/controllers/web_test_finder.py

# Copyright (C) 2012 Google Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import errno
import fnmatch
import hashlib
import json
import logging
import re

from blinkpy.web_tests.layout_package.json_results_generator import convert_times_trie_to_flat_paths
from blinkpy.web_tests.models.typ_types import ResultType
from blinkpy.web_tests.port.base import Port
from collections import OrderedDict

_log = logging.getLogger(__name__)


class WebTestFinder(object):
    def __init__(self, port, options):
        self._port = port
        self._options = options
        self._filesystem = self._port.host.filesystem
        self.WEB_TESTS_DIRECTORIES = ('src', 'third_party', 'blink',
                                      'web_tests')

    def find_tests(
        self,
        args,
        test_lists=None,
        filter_files=None,
        inverted_filter_files=None,
        fastest_percentile=None,
        filters=None,
    ):
        filters = filters or []
        paths = self._strip_test_dir_prefixes(args)
        if test_lists:
            new_paths = self._read_test_list_files(
                test_lists, self._port.TEST_PATH_SEPARATOR)
            new_paths = [
                self._strip_test_dir_prefix(new_path) for new_path in new_paths
            ]
            paths += new_paths

        all_tests = []
        if not paths or fastest_percentile:
            all_tests = self._port.tests(None)

        path_tests = []
        if paths:
            path_tests = self._port.tests(paths)

        test_files = None
        running_all_tests = False
        if fastest_percentile:
            times_trie = self._times_trie()
            if times_trie:
                fastest_tests = self._fastest_tests(times_trie, all_tests,
                                                    fastest_percentile)
                test_files = list(set(fastest_tests).union(path_tests))
            else:
                _log.warning(
                    'Running all the tests the first time to generate timing data.'
                )
                test_files = all_tests
                running_all_tests = True
        elif paths:
            test_files = path_tests
        else:
            test_files = all_tests
            running_all_tests = True

        all_filters = []
        if filters:
            all_filters = [f.split('::') for f in filters]
        if filter_files:
            file_filters = self._read_filter_files(
                filter_files, self._port.TEST_PATH_SEPARATOR)
            all_filters = all_filters + file_filters
        if inverted_filter_files:
            file_filters = self._read_filter_files(
                inverted_filter_files, self._port.TEST_PATH_SEPARATOR)
            all_filters = all_filters + self._invert_filters(file_filters)

        test_files = filter_tests(test_files, all_filters)

        # de-dupe the test list and paths here before running them.
        test_files = list(OrderedDict.fromkeys(test_files))
        paths = list(OrderedDict.fromkeys(paths))
        return (paths, test_files, running_all_tests)

    def _times_trie(self):
        times_ms_path = self._port.bot_test_times_path()
        if self._filesystem.exists(times_ms_path):
            return json.loads(self._filesystem.read_text_file(times_ms_path))
        else:
            return {}

    # The following line should run the fastest 50% of tests *and*
    # the css3/flexbox tests. It should *not* run the fastest 50%
    # of the css3/flexbox tests.
    #
    # run_web_tests.py --fastest=50 css3/flexbox
    def _fastest_tests(self, times_trie, all_tests, fastest_percentile):
        times = convert_times_trie_to_flat_paths(times_trie)

        # Ignore tests with a time==0 because those are skipped tests.
        sorted_times = sorted([test for (test, time) in times.items() if time],
                              key=lambda t: (times[t], t))
        clamped_percentile = max(0, min(100, fastest_percentile))
        number_of_tests_to_return = int(
            len(sorted_times) * clamped_percentile / 100)
        fastest_tests = set(sorted_times[:number_of_tests_to_return])

        # Don't try to run tests in the times_trie that no longer exist,
        fastest_tests = fastest_tests.intersection(all_tests)

        # For fastest tests, include any tests not in the times_ms.json so that
        # new tests get run in the fast set.
        unaccounted_tests = set(all_tests) - set(times.keys())

        # Using a set to dedupe here means that --order=None won't work, but that's
        # ok because --fastest already runs in an arbitrary order.
        return list(fastest_tests.union(unaccounted_tests))

    def _strip_test_dir_prefixes(self, paths):
        return [self._strip_test_dir_prefix(path) for path in paths if path]

    def _strip_test_dir_prefix(self, path):
        # Remove src/third_party/blink/web_tests/ from the front of the test path,
        # or any subset of these.
        for i in range(len(self.WEB_TESTS_DIRECTORIES)):
            # Handle both "web_tests/foo/bar.html" and "web_tests\foo\bar.html" if
            # the filesystem uses '\\' as a directory separator
            for separator in (self._port.TEST_PATH_SEPARATOR,
                              self._filesystem.sep):
                directory_prefix = separator.join(
                    self.WEB_TESTS_DIRECTORIES[i:]) + separator
                if path.startswith(directory_prefix):
                    return path[len(directory_prefix):]
        return path

    def _read_filter_files(self, filenames, test_path_separator):
        fs = self._filesystem
        filters = []
        for filename in filenames:
            file_lines = []
            try:
                if test_path_separator != fs.sep:
                    filename = filename.replace(test_path_separator, fs.sep)
                file_contents = fs.read_text_file(filename).split('\n')
                for line in file_contents:
                    line = self._strip_comments(line)
                    if not line:
                        continue
                    file_lines.append(line)
                filters.append(file_lines)
            except IOError as error:
                if error.errno == errno.ENOENT:
                    _log.critical('')
                    _log.critical('--test-launcher-filter-file "%s" not found',
                                  filename)
                raise
        return filters

    def _invert_filters(self, filters):
        inverted_filters = []
        for terms in filters:
            inverted_filter = []
            for term in terms:
                if term.startswith('-'):
                    inverted_filter.append(term[1:])
                elif term.startswith('+'):
                    inverted_filter.append('-' + term[1:])
                else:
                    inverted_filter.append('-' + term)
            inverted_filters.append(inverted_filter)
        return inverted_filters

    def _read_test_list_files(self, filenames, test_path_separator):
        fs = self._filesystem
        positive_matches = []
        for filename in filenames:
            try:
                if test_path_separator != fs.sep:
                    filename = filename.replace(test_path_separator, fs.sep)
                file_contents = fs.read_text_file(filename).split('\n')
                for line in file_contents:
                    line = self._strip_comments(line)
                    if not line:
                        continue
                    if line[0] == '-':
                        _log.debug(
                            'test-list %s contains a negative filter %s' %
                            (filename, line))
                    positive_matches.append(line)
            except IOError as error:
                if error.errno == errno.ENOENT:
                    _log.critical('')
                    _log.critical('--test-list file "%s" not found', filename)
                raise
        return positive_matches

    @staticmethod
    def _strip_comments(line):
        commentIndex = line.find('#')
        if commentIndex == -1:
            commentIndex = len(line)

        line = re.sub(r'\s+', ' ', line[:commentIndex].strip())
        if line == '':
            return None
        else:
            return line

    def skip_tests(self, paths, all_tests_list, expectations):
        """Given a list of tests, returns the ones that should be skipped.

        A test may be skipped for many reasons, depending on the expectation
        files and options selected. The most obvious is SKIP entries in
        TestExpectations, but we also e.g. skip idlharness tests on MSAN/ASAN
        due to https://crbug.com/856601. Note that for programmatically added
        SKIPs, this function will modify the input expectations to include the
        SKIP expectation (but not write it to disk)

        Args:
            paths: the paths passed on the command-line to run_web_tests.py
            all_tests_list: all tests that we are considering running
            expectations: parsed TestExpectations data

        Returns: a set of tests that should be skipped (not run).
        """
        all_tests = set(all_tests_list)
        tests_to_skip = set()
        tests_always_skipped = set()
        for test in all_tests:
            # Manual tests and virtual tests skipped by platform config are
            # always skipped and not affected by the --skip parameter
            if (self._port.skipped_due_to_manual_test(test)
                    or self._port.virtual_test_skipped_due_to_platform_config(
                        test)
                    or self._port.skipped_due_to_exclusive_virtual_tests(test)
                    or self._port.skipped_due_to_skip_base_tests(test)):
                tests_always_skipped.update({test})
                continue

            # We always skip idlharness tests for MSAN/ASAN, even when running
            # with --no-expectations (https://crbug.com/856601). Note we will
            # run the test anyway if it is explicitly specified on the command
            # line; paths are removed from the skip list after this loop.
            if self._options.enable_sanitizer and Port.is_wpt_idlharness_test(
                    test):
                tests_to_skip.update({test})
                continue

            if self._options.no_expectations:
                # do not skip anything from TestExpectations
                continue

            expected_results = expectations.get_expectations(test).results
            if ResultType.Skip in expected_results:
                tests_to_skip.update({test})
            if self._options.skip_timeouts and ResultType.Timeout in expected_results:
                tests_to_skip.update({test})
            if self._options.skip_failing_tests and ResultType.Failure in expected_results:
                tests_to_skip.update({test})

        if self._options.skipped == 'only':
            tests_to_skip = all_tests - tests_to_skip
        elif self._options.skipped == 'ignore':
            tests_to_skip = set()
        elif self._options.skipped != 'always':
            # make sure we're explicitly running any tests passed on the command line; equivalent to 'default'.
            tests_to_skip -= set(paths)

        tests_to_skip.update(tests_always_skipped)

        return tests_to_skip

    def split_into_chunks(self, test_names):
        """split into a list to run and a set to skip, based on --shard_index and --total_shards."""
        if self._options.shard_index is None and self._options.total_shards is None:
            return test_names

        if self._options.shard_index is None:
            raise ValueError(
                'Must provide --shard-index or GTEST_SHARD_INDEX when sharding.'
            )
        if self._options.total_shards is None:
            raise ValueError(
                'Must provide --total-shards or GTEST_TOTAL_SHARDS when sharding.'
            )
        if self._options.shard_index >= self._options.total_shards:
            raise ValueError(
                'Shard index (%d) should be less than total shards (%d)!' %
                (self._options.shard_index, self._options.total_shards))

        return self._split_into_chunks(test_names, self._options.shard_index,
                                       self._options.total_shards)

    @staticmethod
    def _split_into_chunks(test_names, index, count):
        tests_and_indices = [
            (test_name,
             int(hashlib.sha256(test_name.encode('utf-8')).hexdigest(), 16) %
             count) for test_name in test_names
        ]

        tests_to_run = [
            test_name for test_name, test_index in tests_and_indices
            if test_index == index
        ]

        _log.debug('chunk %d of %d contains %d tests of %d', index, count,
                   len(tests_to_run), len(test_names))

        return tests_to_run


def filter_tests(tests, filters):
    """Returns a filtered list of tests to run.

    The test-filtering semantics are documented in
    https://bit.ly/chromium-test-runner-api and
    https://bit.ly/chromium-test-list-format, but are as follows:

    A test should be run only if it would be run when every flag (ie filter) is
    evaluated individually. A test should be skipped if it would be skipped if
    any flag (filter) was evaluated individually.

    Each filter is a list of potentially glob expressions, with each expression
    optionally prefixed by a "-" or "+". If the glob starts with a "-", it is a
    negative term, otherwise it is a positive term.

    If multiple filter terms in a flag (filter) match a given test name, the
    longest match takes priority (longest match wins). The order of the filters
    should not matter. It is an error to have multiple expressions of the same
    length that conflict.

    Globbing is fairly limited; "?" is not allowed, and unescaped "*" must only
    appear at the end of the glob. If multiple globs match a test, the longest
    match wins. If both globs are the same length, an error is raised.

    A test will be run only if it passes every filter.
    """

    def glob_sort_key(k):
        if k and k[0] == '-':
            return (len(k[1:]), k[1:])
        elif k and k[0] == '+':
            return (len(k[1:]), k[1:])
        else:
            return (len(k), k)

    for terms in filters:
        # Validate the filter
        for term in terms:
            if (term.startswith('-') and not term[1:]) or not term:
                raise ValueError('Empty filter entry "%s"' % (term, ))
            for i, c in enumerate(term):
                if i == len(term) - 1:
                    continue
                if c == '*' and (i == 0 or term[i - 1] != '\\'):
                    raise ValueError('Bad test filter "%s" specified; '
                                     'unescaped wildcards are only allowed at '
                                     'the end' % (term, ))
            if term.startswith('-') and term[1:] in terms:
                raise ValueError('Both "%s" and "%s" specified in test '
                                 'filter' % (term, term[1:]))

        # Separate the negative/positive globless terms and glob terms
        include_by_default = all(term.startswith('-') for term in terms)
        exact_neg_terms, exact_pos_terms, glob_terms = _extract_terms(terms)

        filtered_tests = []
        for test in tests:
            # Check for the exact test name
            if test in exact_neg_terms:
                continue
            if test in exact_pos_terms:
                filtered_tests.append(test)
                continue

            # Check globs, this could be done with a trie to avoid this loop
            # but glob terms should be lower volume than exact terms
            include = include_by_default
            for glob in sorted(glob_terms, key=glob_sort_key):
                if glob.startswith('-'):
                    include = include and not fnmatch.fnmatch(test, glob[1:])
                else:
                    include = include or fnmatch.fnmatch(
                        test, glob[1:] if glob[0] == '+' else glob)
            if include:
                filtered_tests.append(test)
        tests = filtered_tests

    return tests


def _extract_terms(filter):
    """Extract terms of the filter into exact +/- terms and glob terms.

    The +/- prefix char for exact terms will also be stripped as they are no
    longer needed to identify the term as a positive or negative filter. Any
    prefix in globbed terms (terms with a *) will preserve the sign.
    """
    exact_negative_terms = set()
    exact_positive_terms = set()
    glob_terms = set()
    for term in filter:
        is_glob = False
        for i, c in enumerate(term):
            if c == '*' and (i == 0 or term[i - 1] != '\\'):
                glob_terms.add(term)
                is_glob = True
                break
        if is_glob:
            continue
        elif term[0] == '-':
            exact_negative_terms.add(term[1:])
        elif term[0] == '+':
            exact_positive_terms.add(term[1:])
        else:
            exact_positive_terms.add(term)
    return exact_negative_terms, exact_positive_terms, list(glob_terms)