chromium/third_party/blink/tools/blinkpy/tool/commands/rebaseline_cl.py

# Copyright 2016 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A command to fetch new baselines from try jobs for the current CL."""

import collections
import contextlib
import itertools
import json
import logging
import optparse
import re
from concurrent.futures import Executor
from typing import Collection, List, MutableMapping, Optional

from blinkpy.common.memoized import memoized
from blinkpy.common.net.git_cl import BuildStatus, BuildStatuses, GitCL
from blinkpy.common.net.rpc import Build, RPCError
from blinkpy.common.net.web_test_results import (
    IncompleteResultsReason,
    WebTestResults,
)
from blinkpy.common.path_finder import PathFinder
from blinkpy.tool.commands.build_resolver import (
    BuildResolver,
    UnresolvedBuildException,
)
from blinkpy.tool.commands.command import resolve_test_patterns
from blinkpy.tool.commands.rebaseline import AbstractParallelRebaselineCommand
from blinkpy.tool.commands.rebaseline import TestBaselineSet
from blinkpy.tool.grammar import pluralize

_log = logging.getLogger(__name__)


# Maps target builds to rebaseline to the test results to use. Note that
# multiple builds may map to the same `WebTestResults` if the user chooses to
# fill incomplete results with available ones. As a corollary, the target build
# may differ from its `WebTestResults.build`, which represents the source of
# the test results.
ResultsByBuild = MutableMapping[Build, WebTestResults]

ResultsBySuite = MutableMapping[str, ResultsByBuild]


class RebaselineCL(AbstractParallelRebaselineCommand):
    name = 'rebaseline-cl'
    help_text = 'Fetches new baselines for a CL from test runs on try bots.'
    long_help = (
        'This command downloads new baselines for failing web '
        'tests from archived try job test results. Cross-platform '
        'baselines are deduplicated after downloading.  Without '
        'positional parameters or --test-name-file, all failing tests '
        'are rebaselined. If positional parameters are provided, '
        'they are interpreted as test names to rebaseline.')

    show_in_main_help = True
    argument_names = '[testname,...]'

    only_changed_tests_option = optparse.make_option(
        '--only-changed-tests',
        action='store_true',
        default=False,
        help='Only update files for tests directly modified in the CL.')
    no_trigger_jobs_option = optparse.make_option(
        '--no-trigger-jobs',
        dest='trigger_jobs',
        action='store_false',
        default=True,
        help='Do not trigger any try jobs.')
    patchset_option = optparse.make_option(
        '--patchset',
        default=None,
        type='int',
        help='Patchset number to fetch try results from (defaults to latest).')

    def __init__(self, tool, io_pool: Optional[Executor] = None):
        super().__init__(options=[
            self.only_changed_tests_option,
            self.no_trigger_jobs_option,
            self.test_name_file_option,
            optparse.make_option(
                '--builders',
                default=set(),
                type='string',
                callback=self._check_builders,
                action='callback',
                help=('Comma-separated-list of builders to pull new baselines '
                      'from (can also be provided multiple times).')),
            self.patchset_option,
            self.no_optimize_option,
            self.dry_run_option,
            self.results_directory_option,
            *self.wpt_options,
        ])
        self._tool = tool
        # Use a separate thread pool for parallel network I/O in the main
        # process because `message_pool.get(...)` must know all tasks in
        # advance; it has no API for submitting new tasks after the pool runs.
        # Also, because communication is asynchronous (callback-based), a worker
        # cannot return a value for a specific task without a custom tracking
        # mechanism.
        self._io_pool = io_pool
        self.git_cl = None
        self._builders = []

    def _check_builders(self, option, _opt_str, value, parser):
        selected_builders = getattr(parser.values, option.dest, set())
        # This set includes CQ builders, whereas `builder_for_rebaselining()`
        # does not.
        allowed_builders = self._tool.builders.all_try_builder_names()
        for builder in value.split(','):
            if builder in allowed_builders:
                selected_builders.add(builder)
            else:
                lines = [
                    "'%s' is not a try builder." % builder,
                    '',
                    "The try builders that 'rebaseline-cl' recognizes are:",
                ]
                lines.extend('  * %s' % builder
                             for builder in sorted(allowed_builders))
                raise optparse.OptionValueError('\n'.join(lines))
        setattr(parser.values, option.dest, selected_builders)

    def execute(self, options, args, tool):
        self._dry_run = options.dry_run
        self.git_cl = self.git_cl or GitCL(tool)

        # '--dry-run' implies '--no-trigger-jobs'.
        options.trigger_jobs = options.trigger_jobs and not self._dry_run
        if args and options.test_name_file:
            _log.error('Aborted: Cannot combine --test-name-file and '
                       'positional parameters.')
            return 1

        if not self.check_ok_to_run():
            return 1

        self._builders = options.builders

        build_resolver = BuildResolver(
            self._tool,
            self.git_cl,
            self._io_pool,
            can_trigger_jobs=(options.trigger_jobs and not self._dry_run))
        builds = [Build(builder) for builder in self.selected_try_bots]
        try:
            build_statuses = build_resolver.resolve_builds(
                builds, options.patchset)
        except RPCError as error:
            _log.error('%s', error)
            _log.error('Request payload: %s',
                       json.dumps(error.request_body, indent=2))
            return 1
        except UnresolvedBuildException as error:
            _log.error('%s', error)
            return 1

        results_by_suite = self._fetch_results(build_statuses)
        incomplete_results = self._partition_incomplete_results(
            results_by_suite)
        if incomplete_results:
            self._log_incomplete_results(incomplete_results)
            if self._tool.user.confirm(
                    'Would you like to fill in missing results '
                    'with available results?\n'
                    'This is generally not suggested unless the results are '
                    'platform agnostic or the needed results happen to be not '
                    'missing.'):
                results_by_suite = self.fill_in_missing_results(
                    incomplete_results, results_by_suite)
            else:
                _log.info('Aborting. Please retry builders with no results.')
                return 1

        if options.test_name_file:
            test_baseline_set = self._make_test_baseline_set_from_file(
                options.test_name_file, results_by_suite)
        elif args:
            test_baseline_set = self._make_test_baseline_set_for_tests(
                args, results_by_suite)
        else:
            test_baseline_set = self._make_test_baseline_set(
                results_by_suite, options.only_changed_tests)

        with self._io_pool or contextlib.nullcontext():
            return self.rebaseline(options, test_baseline_set)

    def _partition_incomplete_results(
            self, results_by_suite: ResultsBySuite) -> ResultsBySuite:
        """Remove and return incomplete results from the given results map.

        Arguments:
            results_by_suite: Test results keyed by suite name, then by build.
                This map is mutated to leave only complete results when this
                method returns.
        """
        incomplete_results_by_suite = collections.defaultdict(dict)
        for suite, results_by_build in results_by_suite.items():
            for target_build, results in list(results_by_build.items()):
                if results.incomplete_reason:
                    del results_by_build[target_build]
                    incomplete_results_by_suite[suite][target_build] = results
        return incomplete_results_by_suite

    def _log_incomplete_results(self, results_by_suite: ResultsBySuite):
        _log.warning('Some builds have incomplete results:')
        for suite in sorted(results_by_suite):
            for build in sorted(results_by_suite[suite],
                                key=lambda build: build.builder_name):
                _log.warning(
                    f'  {build}, "{suite}": '
                    f'{results_by_suite[suite][build].incomplete_reason}')
        # TODO(crbug.com/352762538): Link to the document about handling bot
        # timeouts if it's one of the reasons.

    def check_ok_to_run(self):
        unstaged_baselines = self.unstaged_baselines()
        if unstaged_baselines:
            _log.error('Aborting: there are unstaged baselines:')
            for path in unstaged_baselines:
                _log.error('  %s', path)
            return False
        return True

    @property
    def selected_try_bots(self):
        if self._builders:
            return set(self._builders)
        return self._tool.builders.builders_for_rebaselining()

    def _fetch_results(self, build_statuses: BuildStatuses) -> ResultsBySuite:
        """Fetches results for all of the given builds.

        There should be a one-to-one correspondence between Builds, supported
        platforms, and try bots. If not all of the builds can be fetched, then
        continuing with rebaselining may yield incorrect results, when the new
        baselines are deduped, an old baseline may be kept for the platform
        that's missing results.
        """
        results_fetcher = self._tool.results_fetcher
        results_by_suite = collections.defaultdict(dict)
        build_steps_to_fetch = []

        for build, status in build_statuses.items():
            for step in self._tool.builders.step_names_for_builder(
                    build.builder_name):
                if status is BuildStatus.TEST_FAILURE:
                    # Only completed failed builds will contain actual failed
                    # web tests to download baselines for.
                    build_steps_to_fetch.append((build, step))
                    continue

                incomplete_reason = None
                if status is BuildStatus.SUCCESS:
                    _log.debug(
                        f'No baselines to download for passing {build}.')
                else:
                    incomplete_reason = IncompleteResultsReason(status)
                # These empty results are a no-op when constructing the
                # `TestBaselineSet` later.
                results_by_suite[step][build] = WebTestResults(
                    [],
                    step_name=step,
                    build=build,
                    incomplete_reason=incomplete_reason)

        _log.info('Fetching test results for '
                  f'{pluralize("suite", len(build_steps_to_fetch))}.')
        map_fn = self._io_pool.map if self._io_pool else map
        for results in map_fn(
                lambda build_step: results_fetcher.gather_results(*build_step),
                build_steps_to_fetch):
            results_by_suite[results.step_name()][results.build] = results
        return results_by_suite

    def _make_test_baseline_set_from_file(
        self,
        filename: str,
        results_by_suite: ResultsBySuite,
    ) -> TestBaselineSet:
        tests = set()
        try:
            _log.info('Reading list of tests to rebaseline from %s', filename)
            tests = self._host_port.tests_from_file(filename)
        except IOError:
            _log.info('Could not read test names from %s', filename)
        return self._make_test_baseline_set_for_tests(tests, results_by_suite)

    def _make_test_baseline_set_for_tests(
        self,
        test_patterns: Collection[str],
        results_by_suite: ResultsBySuite,
    ) -> TestBaselineSet:
        """Determines the set of test baselines to fetch from a list of tests.

        Args:
            tests_patterns: A list of test patterns (e.g., directories).
        """
        test_baseline_set = TestBaselineSet(self._tool.builders)
        tests = resolve_test_patterns(self._host_port, test_patterns)
        for test, (suite, results_by_build) in itertools.product(
                tests, results_by_suite.items()):
            for target_build, results in results_by_build.items():
                if not results.result_for_test(test):
                    # No need to rebaseline tests that ran expectedly. Expected
                    # results aren't fetched from ResultDB.
                    continue
                target_port = self._tool.builders.port_name_for_builder_name(
                    target_build.builder_name)
                test_baseline_set.add(test, results.build, suite, target_port)
        return test_baseline_set

    def _make_test_baseline_set(
        self,
        results_by_suite: ResultsBySuite,
        only_changed_tests: bool,
    ) -> TestBaselineSet:
        """Determines the set of test baselines to fetch.

        The list of tests are not explicitly provided, so all failing tests or
        modified tests will be rebaselined (depending on only_changed_tests).

        Args:
            only_changed_tests: Whether to only include baselines for tests that
               are changed in this CL. If False, all new baselines for failing
               tests will be downloaded, even for tests that were not modified.
        """
        if only_changed_tests:
            files_in_cl = self._tool.git().changed_files(diff_filter='AM')
            # In the changed files list from Git, paths always use "/" as
            # the path separator, and they're always relative to repo root.
            test_base = self._test_base_path()
            tests_in_cl = {
                f[len(test_base):]
                for f in files_in_cl if f.startswith(test_base)
            }

        builders = self._tool.builders
        test_baseline_set = TestBaselineSet(self._tool.builders)
        for suite, results_by_build in results_by_suite.items():
            for target_build, results in results_by_build.items():
                # Here we have a concrete list of tests so we don't need prefix lookup.
                for test in self._tests_to_rebaseline(results):
                    if only_changed_tests and test not in tests_in_cl:
                        continue
                    target_port = builders.port_name_for_builder_name(
                        target_build.builder_name)
                    test_baseline_set.add(test, results.build, suite,
                                          target_port)

        # Validate test existence, since the builder may run tests not found
        # locally. `Port.tests()` performs an expensive filesystem walk, so
        # filter out all invalid tests here instead of filtering at each build
        # step.
        tests = set(self._host_port.tests(test_baseline_set.all_tests()))
        missing_tests, valid_set = set(), TestBaselineSet(self._tool.builders)
        for task in test_baseline_set:
            if task.test in tests:
                valid_set.add(*task)
            else:
                missing_tests.add(task.test)
        if missing_tests:
            _log.warning(
                'Skipping rebaselining for %s missing from the local checkout:',
                pluralize('test', len(missing_tests)))
            for test in sorted(missing_tests):
                _log.warning(f'  {test}')
            _log.warning('You may want to rebase or trigger new builds.')
        return valid_set

    def _test_base_path(self):
        """Returns the relative path from the repo root to the web tests."""
        finder = PathFinder(self._tool.filesystem)
        return self._tool.filesystem.relpath(
            finder.web_tests_dir(), finder.path_from_chromium_base()) + '/'

    def _tests_to_rebaseline(self,
                             web_test_results: WebTestResults) -> List[str]:
        """Fetches a list of tests that should be rebaselined for some build.

        Returns:
            A sorted list of tests to rebaseline for this build.
        """
        unexpected_results = web_test_results.didnt_run_as_expected_results()
        tests = sorted(r.test_name() for r in unexpected_results
                       if r.is_missing_baseline() or r.has_mismatch())
        if not tests:
            # no need to fetch retry summary in this case
            return []

        test_suite = re.sub('\s*\(.*\)$', '', web_test_results.step_name())
        new_failures = self._fetch_tests_with_new_failures(
            web_test_results.build, test_suite)
        if new_failures is None:
            _log.warning('No retry summary available for (%s, "%s").',
                         web_test_results.build, test_suite)
        else:
            tests = [t for t in tests if t in new_failures]
        return tests

    def _fetch_tests_with_new_failures(self, build, test_suite):
        """For a given test suite in the try job, lists tests that only failed
        with the patch.

        If a test failed only with the patch but not without, then that
        indicates that the failure is actually related to the patch and
        is not failing at HEAD.

        If the list of new failures could not be obtained, this returns None.
        """
        results_fetcher = self._tool.results_fetcher
        content = results_fetcher.fetch_retry_summary_json(build, test_suite)
        if content is None:
            return None
        try:
            retry_summary = json.loads(content)
            return set(retry_summary['failures'])
        except (ValueError, KeyError):
            _log.warning('Unexpected retry summary content:\n%s', content)
            return None

    def fill_in_missing_results(
        self,
        incomplete_results: ResultsBySuite,
        complete_results: ResultsBySuite,
    ) -> ResultsBySuite:
        """Create a map of complete results to rebaseline with.

        Returns:
            The union of `incomplete_results` and `complete_results`, where the
            former's entries have been substituted with those selected from the
            latter. The selection prioritizes same-OS substitutions.
        """
        # Store complete and filled-in results separately to avoid mutating
        # `complete_results` while it's being used to fill in results.
        merged_results = {
            suite: dict(results_by_build)
            for suite, results_by_build in complete_results.items()
        }
        for suite in incomplete_results:
            for incomplete_build in list(incomplete_results[suite]):
                fill_results = self._select_fill_results(
                    incomplete_build, suite, complete_results)
                if fill_results is None:
                    continue
                merged_results[suite][incomplete_build] = fill_results
                _log.debug('Using %s, "%s" for %s, "%s".', fill_results.build,
                           fill_results.step_name(), incomplete_build, suite)
        return merged_results

    def _select_fill_results(
        self,
        incomplete_build: Optional[Build],
        suite: str,
        complete_results: ResultsBySuite,
    ) -> Optional[WebTestResults]:
        # TODO(crbug.com/350775866): Consider allowing substitutions across
        # related suites (e.g., ordinary `blink_wpt_tests` and
        # `not_site_per_process_blink_wpt_tests`).
        complete_results_by_build = complete_results.get(suite, {})
        # Some platforms only run a small subset of tests. Don't allow them to
        # provide results to other platforms that run all tests.
        candidate_builds = {
            build
            for build in complete_results_by_build
            if not self._default_smoke_test_only(build)
        }
        if not candidate_builds:
            return None
        target_os = self._os_name(incomplete_build)
        # If no builds have the same OS, choose an arbitrary one, which is OK
        # for platform-agnostic tests.
        source_build = max(candidate_builds,
                           key=lambda build: self._os_name(build) == target_os)
        return complete_results_by_build[source_build]

    @memoized
    def _default_smoke_test_only(self, build: Build) -> bool:
        port_name = self._tool.builders.port_name_for_builder_name(
            build.builder_name)
        port = self._tool.port_factory.get(port_name)
        return port.default_smoke_test_only()

    @memoized
    def _os_name(self, build: Build) -> str:
        # A full port name should normally always be of the form <os>-<version>;
        # for example "win-win11", or "mac-mac13-arm64". For the test port used
        # in unit tests, though, the full port name may be
        # "test-<os>-<version>".
        port_name = self._tool.builders.port_name_for_builder_name(
            build.builder_name)
        return self._tool.port_factory.get(port_name).operating_system()