chromium/third_party/blink/tools/blinkpy/tool/commands/rebaseline.py

# Copyright (c) 2010 Google Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR/ OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import collections
import enum
import functools
import logging
import optparse
import re
from dataclasses import dataclass
from typing import (
    ClassVar,
    Collection,
    Dict,
    Iterator,
    List,
    NamedTuple,
    Optional,
    Set,
    Tuple,
    get_args,
)
from urllib.parse import urlparse

from blinkpy.common import message_pool
from blinkpy.common.checkout.baseline_copier import BaselineCopier
from blinkpy.common.host import Host
from blinkpy.common.path_finder import WEB_TESTS_LAST_COMPONENT
from blinkpy.common.memoized import memoized
from blinkpy.common.net.results_fetcher import Build
from blinkpy.common.net.web_test_results import (
    Artifact,
    BaselineSuffix,
    WebTestResult,
)
from blinkpy.tool.commands.command import (
    Command,
    check_dir_option,
    check_file_option,
)
from blinkpy.tool.grammar import pluralize
from blinkpy.web_tests.models import test_failures, testharness_results
from blinkpy.web_tests.models.test_expectations import SystemConfigurationEditor, TestExpectations
from blinkpy.web_tests.models.typ_types import RESULT_TAGS, ResultType
from blinkpy.web_tests.port import factory
from blinkpy.web_tests.port.base import Port

_log = logging.getLogger(__name__)


class AbstractRebaseliningCommand(Command):
    """Base class for rebaseline-related commands."""
    # pylint: disable=abstract-method; not overriding `execute()`

    # Generic option groups (list of options):
    platform_options = factory.platform_options(use_globs=True)
    wpt_options = factory.wpt_options()

    no_optimize_option = optparse.make_option(
        '--no-optimize',
        dest='optimize',
        action='store_false',
        default=True,
        help=
        ('Do not optimize (de-duplicate) the expectations after rebaselining '
         '(default is to de-dupe automatically). You can use "blink_tool.py '
         'optimize-baselines" to optimize separately.'))
    dry_run_option = optparse.make_option(
        '--dry-run',
        action='store_true',
        default=False,
        help=('Dry run mode. List actions that would be performed '
              'but do not actually write to disk.'))
    results_directory_option = optparse.make_option(
        '--results-directory',
        action='callback',
        callback=check_dir_option,
        type='string',
        help='Local results directory to use.')
    suffixes_option = optparse.make_option(
        '--suffixes',
        default=','.join(get_args(BaselineSuffix)),
        action='store',
        help='Comma-separated-list of file types to rebaseline.')
    builder_option = optparse.make_option(
        '--builder',
        help=('Name of the builder to pull new baselines from, '
              'e.g. "Mac11 Tests".'))
    port_name_option = optparse.make_option(
        '--port-name',
        help=('Fully-qualified name of the port that new baselines belong to, '
              'e.g. "mac-mac11". If not given, this is determined based on '
              '--builder.'))
    test_name_file_option = optparse.make_option(
        '--test-name-file',
        action='callback',
        callback=check_file_option,
        type='string',
        help=('Read names of tests to update from this file, '
              'one test per line.'))

    def __init__(self, options=None):
        super().__init__(options=options)
        self.expectation_line_changes = ChangeSet()
        self._tool = None
        self._results_dir = None
        self._dry_run = False

    def check_arguments_and_execute(self,
                                    options: optparse.Values,
                                    args: List[str],
                                    tool: Optional['BlinkTool'] = None) -> int:
        self._tool = tool
        for option, value in vars(options).items():
            self._host_port.set_option_default(option, value)
        return super().check_arguments_and_execute(options, args, tool)

    def baseline_directory(self, builder_name):
        port = self._tool.port_factory.get_from_builder_name(builder_name)
        return port.baseline_version_dir()

    @functools.cached_property
    def _host_port(self):
        # TODO(crbug.com/1498195): This may be changed to `--no-wdspec`.
        return self._tool.port_factory.get(options=optparse.Values({
            'test_types': [
                'testharness', 'reftest', 'wdspec', 'crashtest',
                'print-reftest', 'manual'
            ]
        }))

    def _file_name_for_actual_result(self, test_name, suffix):
        # output_filename takes extensions starting with '.'.
        return self._host_port.output_filename(
            test_name, test_failures.FILENAME_SUFFIX_ACTUAL, '.' + suffix)

    def _file_name_for_expected_result(self, test_name, suffix):
        # output_filename takes extensions starting with '.'.
        return self._host_port.output_filename(
            test_name, test_failures.FILENAME_SUFFIX_EXPECTED, '.' + suffix)

    def _test_can_have_suffix(self, test_name: str,
                              suffix: BaselineSuffix) -> bool:
        wpt_type = self._get_wpt_type(test_name)
        # Only legacy reftests can dump text output, not WPT reftests.
        if wpt_type in {'testharness', 'wdspec'} and suffix == 'txt':
            return True
        # Some manual tests are run as pixel tests (crbug.com/1114920), so
        # `png` is allowed in that case.
        elif wpt_type == 'manual' and suffix == 'png':
            return True
        elif self._host_port.reference_files(test_name) and suffix == 'png':
            return False
        # No other WPT-suffix combinations are allowed.
        return not wpt_type

    def _get_wpt_type(self, test_name: str) -> Optional[str]:
        wpt_dir, url_from_wpt_dir = self._host_port.split_wpt_dir(test_name)
        if not wpt_dir:
            return None  # Not a WPT.
        manifest = self._host_port.wpt_manifest(wpt_dir)
        return manifest.get_test_type(url_from_wpt_dir)


class ChangeSet(object):
    """A record of TestExpectation lines to remove.

    Note: This class is probably more complicated than necessary; it is mainly
    used to track the list of lines that we want to remove from TestExpectations.
    """

    def __init__(self, lines_to_remove=None):
        self.lines_to_remove = lines_to_remove or {}

    def remove_line(self, test, port_name):
        if test not in self.lines_to_remove:
            self.lines_to_remove[test] = []
        self.lines_to_remove[test].append(port_name)

    def to_dict(self):
        remove_lines = []
        for test in self.lines_to_remove:
            for port_name in self.lines_to_remove[test]:
                remove_lines.append({'test': test, 'port_name': port_name})
        return {'remove-lines': remove_lines}

    @staticmethod
    def from_dict(change_dict):
        lines_to_remove = {}
        if 'remove-lines' in change_dict:
            for line_to_remove in change_dict['remove-lines']:
                test = line_to_remove['test']
                port_name = line_to_remove['port_name']
                if test not in lines_to_remove:
                    lines_to_remove[test] = []
                lines_to_remove[test].append(port_name)
        return ChangeSet(lines_to_remove=lines_to_remove)

    def update(self, other):
        assert isinstance(other, ChangeSet)
        assert isinstance(other.lines_to_remove, dict)
        for test in other.lines_to_remove:
            if test not in self.lines_to_remove:
                self.lines_to_remove[test] = []
            self.lines_to_remove[test].extend(other.lines_to_remove[test])


class RebaselineTask(NamedTuple):
    test: str
    build: Build
    step_name: str
    port_name: str


class TestBaselineSet(collections.abc.Set):
    """Represents a collection of tests and platforms that can be rebaselined.

    A TestBaselineSet specifies tests to rebaseline along with information
    about where to fetch the baselines from.
    """

    def __init__(self, builders):
        self._builders = builders
        self._build_steps = set()
        self._test_map = collections.defaultdict(list)

    def __contains__(self, rebaseline_task):
        test, *build_info = rebaseline_task
        return tuple(build_info) in self._test_map.get(test, [])

    def __iter__(self):
        return iter(self._iter_combinations())

    def __len__(self):
        return sum(map(len, self._test_map.values()))

    def __bool__(self):
        return bool(self._test_map)

    def _iter_combinations(self):
        """Iterates through (test, build, step, port) combinations."""
        for test, build_steps in self._test_map.items():
            for build_step in build_steps:
                yield RebaselineTask(test, *build_step)

    def __str__(self):
        if not self._test_map:
            return '<Empty TestBaselineSet>'
        return '<TestBaselineSet with:\n  %s>' % '\n  '.join(
            '%s: %s, %s, %s' % combo for combo in self._iter_combinations())

    def all_tests(self):
        """Returns a sorted list of all tests without duplicates."""
        return sorted(self._test_map)

    def build_port_pairs(self, test):
        # Return a copy in case the caller modifies the returned list.
        return [(build, port) for build, _, port in self._test_map[test]]

    def runs_for_test(self, test: str):
        return list(self._test_map[test])

    def add(self,
            test: str,
            build: Build,
            step_name: str,
            port_name: Optional[str] = None):
        """Adds an entry for baselines to download for some set of tests.

        Args:
            test: A full test path.
            build: A Build object. Along with the step name, this specifies
                where to fetch baselines from.
            step_name: The name of the build step this test was run for.
            port_name: This specifies what platform the baseline is for. It's
                usually deduced from the builder name, but can be overridden.
        """
        if not port_name:
            port_name = self._builders.port_name_for_builder_name(
                build.builder_name)
        self._build_steps.add((build.builder_name, step_name))
        build_step = (build, step_name, port_name)
        self._test_map[test].append(build_step)

    def all_build_steps(self):
        """Returns all builder name, step name pairs in this collection."""
        return self._build_steps


class RebaselineFailureReason(enum.Flag):
    TIMEOUT_OR_CRASH = enum.auto()
    REFTEST_IMAGE_FAILURE = enum.auto()
    FLAKY_OUTPUT = enum.auto()
    LOCAL_BASELINE_NOT_FOUND = enum.auto()

    def __iter__(self) -> Iterator['RebaselineFailureReason']:
        # TODO(crbug.com/40209595): Remove this handcrafted `__iter__` after
        # python3.11+ when `enum.Flag` instances become iterable over their
        # members.
        for reason in self.__class__:
            if reason in self:
                yield reason


RebaselineGroup = Dict[RebaselineTask, WebTestResult]
RebaselineFailures = Dict[RebaselineTask, RebaselineFailureReason]
RebaselineFailureReason.DESCRIPTIONS = {
    RebaselineFailureReason.REFTEST_IMAGE_FAILURE:
    'reftest image failure',
    RebaselineFailureReason.FLAKY_OUTPUT:
    'flaky output',
    RebaselineFailureReason.LOCAL_BASELINE_NOT_FOUND:
    'missing from local results directory',
}


class AbstractParallelRebaselineCommand(AbstractRebaseliningCommand):
    """Base class for rebaseline commands that do some tasks in parallel."""
    # pylint: disable=abstract-method; not overriding `execute()`

    MAX_WORKERS: ClassVar[int] = 16

    def __init__(self, options=None):
        super(AbstractParallelRebaselineCommand,
              self).__init__(options=options)
        self.baseline_cache_stats = BaselineCacheStatistics()
        self._total_commands = self._completed_commands = 0
        self._rebaseline_failures = {}

    def _release_builders(self):
        """Returns a list of builder names for continuous release builders.

        The release builders cycle much faster than the debug ones and cover all the platforms.
        """
        release_builders = []
        for builder_name in self._tool.builders.all_continuous_builder_names():
            port = self._tool.port_factory.get_from_builder_name(builder_name)
            if port.test_configuration().build_type == 'release':
                release_builders.append(builder_name)
        return release_builders

    def _filter_baseline_set(
        self,
        test_baseline_set: TestBaselineSet,
    ) -> TestBaselineSet:
        build_steps_to_fetch_from = self.build_steps_to_fetch_from(
            test_baseline_set.all_build_steps())
        rebaselinable_set = TestBaselineSet(self._tool.builders)
        for task in test_baseline_set:
            test, build, step_name, port_name = task
            if (build.builder_name,
                    step_name) not in build_steps_to_fetch_from:
                continue
            result = self._result_for_test(test, build, step_name)
            if result and set(result.actual_results()) & {
                    ResultType.Failure,
                    ResultType.Crash,
                    ResultType.Timeout,
            }:
                rebaselinable_set.add(test, build, step_name, port_name)
        return rebaselinable_set

    def build_steps_to_fetch_from(self, build_steps_to_check):
        """Returns the subset of builder-step pairs that will cover all of the
        baseline search paths used in the input list.

        In particular, if the input list contains both Release and Debug
        versions of a configuration, we *only* return the Release version
        (since we don't save debug versions of baselines).

        Args:
            build_steps_to_check: A collection of builder name, step name
                tuples.

        Returns:
            A subset of the input list that we should fetch from.
        """
        release_build_steps = set()
        debug_build_steps = set()
        for builder, step in build_steps_to_check:
            port = self._tool.port_factory.get_from_builder_name(builder)
            if port.test_configuration().build_type == 'release':
                release_build_steps.add((builder, step))
            else:
                debug_build_steps.add((builder, step))

        port_step_pairs, build_steps = set(), set()
        for builder, step in [
                *sorted(release_build_steps),
                *sorted(debug_build_steps),
        ]:
            port_name = self._tool.builders.port_name_for_builder_name(builder)
            # Assume differently named steps provide unique coverage, even if
            # they have the same fallback path. For example, as of this writing,
            # there are two cases where a pair of suites run disjoint sets of
            # tests, so they should both be included:
            #   * `webdriver_wpt_tests` and `chrome_wpt_tests`
            #   * `blink_web_tests` and `blink_wpt_tests`
            if (port_name, step) not in port_step_pairs:
                port_step_pairs.add((port_name, step))
                build_steps.add((builder, step))
        return build_steps

    def _copy_baselines(self, groups: Dict[str, TestBaselineSet]) -> None:
        commands = []
        for base_test in sorted(groups):
            group = groups[base_test]
            for suffix in self._suffixes_for_group(group):
                if self._test_can_have_suffix(base_test, suffix):
                    commands.append(
                        ('copy_baselines', base_test, suffix, group))
        self._run_in_message_pool(self._worker_factory, commands)

    def _group_tests_by_base(
        self,
        test_baseline_set: TestBaselineSet,
    ) -> Dict[str, TestBaselineSet]:
        """Partition a given baseline set into per-base test baseline sets.

        The baseline copier/optimizer handles all virtual tests derived from a
        nonvirtual (base) test. When both a virtual test and its base test need
        to be rebaselined, this method coalesces information about where they
        failed into a shared `TestBaselineSet` that the copier/optimizer can
        easily ingest.
        """
        groups = collections.defaultdict(
            functools.partial(TestBaselineSet, self._tool.builders))
        tests = set(test_baseline_set.all_tests())
        for test, build, step_name, port_name in test_baseline_set:
            nonvirtual_test = self._host_port.lookup_virtual_test_base(
                test) or test
            test_for_group = (nonvirtual_test
                              if nonvirtual_test in tests else test)
            groups[test_for_group].add(test, build, step_name, port_name)
        return groups

    def _suffixes_for_group(self,
                            test_baseline_set: TestBaselineSet) -> Set[str]:
        return frozenset().union(
            *(self._suffixes_for_actual_failures(test, build, step_name)
              for test, build, step_name, _ in test_baseline_set))

    def _download_baselines(self, groups: Dict[str, TestBaselineSet]):
        # The same worker should download all the baselines in a group so that
        # its baseline cache is effective.
        commands = [('download_baselines', base_test,
                     self._group_with_results(groups[base_test]))
                    for base_test in sorted(groups)]
        self._run_in_message_pool(self._worker_factory, commands)

    def _group_with_results(
        self,
        test_baseline_set: TestBaselineSet,
    ) -> RebaselineGroup:
        tasks_to_results = {}
        for task in test_baseline_set:
            maybe_result = self._result_for_test(task.test, task.build,
                                                 task.step_name)
            if maybe_result:
                tasks_to_results[task] = maybe_result
        return tasks_to_results

    def _optimize_command(self,
                          tests: Collection[str],
                          verbose: bool = False) -> List[str]:
        """Return a command to de-duplicate baselines."""
        assert tests, 'should not generate a command to optimize no tests'
        command = [
            self._tool.path(),
            'optimize-baselines',
            # The manifest has already been updated when listing tests.
            '--no-manifest-update',
        ]
        if verbose:
            command.append('--verbose')
        command.extend(sorted(tests))
        return command

    def _update_expectations_files(self, lines_to_remove):
        tests = list(lines_to_remove.keys())
        to_remove = collections.defaultdict(set)
        all_versions = frozenset([
            config.version.lower()
            for config in self._host_port.all_test_configurations()
        ])
        # This is so we remove lines for builders that skip this test.
        # For example, Android skips most tests and we don't want to leave
        # stray [ Android ] lines in TestExpectations.
        # This is only necessary for "blink_tool.py rebaseline".
        for port_name in self._tool.port_factory.all_port_names():
            port = self._tool.port_factory.get(port_name)
            for test in tests:
                if (port.test_configuration().version.lower() in all_versions
                        and port.skips_test(test)):
                    to_remove[test].add(
                        port.test_configuration().version.lower())

        # Get configurations to remove based on builders for each test
        for test, port_names in lines_to_remove.items():
            for port_name in port_names:
                port = self._tool.port_factory.get(port_name)
                if port.test_configuration().version.lower() in all_versions:
                    to_remove[test].add(
                        port.test_configuration().version.lower())

        if self._dry_run:
            for test, versions in to_remove.items():
                _log.debug('Would have removed expectations for %s: %s', test,
                           ', '.join(sorted(versions)))
            return

        path = port.path_to_generic_test_expectations_file()
        test_expectations = TestExpectations(
            self._host_port,
            expectations_dict={
                path: self._tool.filesystem.read_text_file(path)
            })
        system_remover = SystemConfigurationEditor(test_expectations)
        for test, versions in to_remove.items():
            system_remover.remove_os_versions(test, versions)
        system_remover.update_expectations()

    def _run_in_message_pool(self, worker_factory, commands):
        num_workers = min(self.MAX_WORKERS, self._tool.executive.cpu_count())
        self._completed_commands, self._total_commands = 0, len(commands)
        with message_pool.get(self, worker_factory, num_workers) as pool:
            pool.run(commands)

    def _worker_factory(self, worker_connection):
        return Worker(worker_connection, dry_run=self._dry_run)

    def handle(self, name: str, source: str, *args):
        """Handler called when a worker completes a rebaseline task.

        This allows this class to conform to the `message_pool.MessageHandler`
        interface.
        """
        if name == 'report_baseline_cache_stats':
            (stats, ) = args
            self.baseline_cache_stats += stats
        elif name == 'copy_baselines':
            base_test, suffix = args
            self._log_command_completion(
                f'Copied baselines for {base_test!r} ({suffix})')
        elif name == 'download_baselines':
            base_test, rebaseline_failures = args
            self._rebaseline_failures.update(rebaseline_failures)
            self._log_command_completion(
                f'Downloaded baselines for {base_test!r}')

    def _log_command_completion(self, message: str):
        self._completed_commands += 1
        _log.info(
            f'{message} ({self._completed_commands}/{self._total_commands})')

    def rebaseline(self, options, test_baseline_set):
        """Fetches new baselines and removes related test expectation lines.

        Args:
            options: An object with the command line options.
            test_baseline_set: A TestBaselineSet instance, which represents
                a set of tests/platform combinations to rebaseline.
        """
        self._rebaseline_failures.clear()
        self._results_dir = options.results_directory
        if not self._dry_run and self._tool.git(
        ).has_working_directory_changes(pathspec=self._web_tests_dir()):
            _log.error(
                'There are uncommitted changes in the web tests directory; aborting.'
            )
            return 1

        rebaselinable_set = self._filter_baseline_set(test_baseline_set)
        test_count = len(rebaselinable_set.all_tests())
        if test_count == 0:
            _log.info('No tests to rebaseline.')
            return 0

        _log.info('Rebaselining %s.', pluralize('test', test_count))
        groups = self._group_tests_by_base(rebaselinable_set)
        self._copy_baselines(groups)
        self._download_baselines(groups)
        _log.debug('Baseline cache statistics: %s', self.baseline_cache_stats)
        self._warn_about_rebaseline_failures()

        exit_code = 0
        if options.optimize and groups:
            # No point in optimizing during a dry run where no files were
            # downloaded.
            if self._dry_run:
                _log.info('Skipping optimization during dry run.')
            else:
                optimize_command = self._optimize_command(
                    groups, options.verbose)
                exit_code = exit_code or self._tool.main(optimize_command)

        if not self._dry_run:
            unstaged_baselines = self.unstaged_baselines()
            _log.info('Staging %s with git.',
                      pluralize('baseline', len(unstaged_baselines)))
            self._tool.git().add_list(unstaged_baselines)
        return exit_code

    def _warn_about_rebaseline_failures(self):
        if not self._rebaseline_failures:
            return
        tasks_by_exp_file = self._group_tasks_by_exp_file(
            self._rebaseline_failures)
        _log.warning('Some test failures should be suppressed in '
                     'TestExpectations instead of being rebaselined.')
        # TODO(crbug.com/1149035): Fully automate writing to TestExpectations
        # files. This should be done by integrating with the existing
        # TestExpectations updater, which has better handling for:
        #   * Conflicting tag removal
        #   * Specifier merge/split
        for exp_file in sorted(tasks_by_exp_file):
            lines = '\n'.join(
                map(self._format_line, tasks_by_exp_file[exp_file]))
            # Make the TestExpectation lines copy-pastable by logging them
            # in a single statement (i.e., not prefixed by the log's
            # formatting).
            _log.warning('Consider adding the following lines to %s:\n%s',
                         exp_file, lines)

    def _group_tasks_by_exp_file(
        self,
        tasks: Collection[RebaselineTask],
    ) -> Dict[str, Set[RebaselineTask]]:
        tasks_by_exp_file = collections.defaultdict(set)
        for task in tasks:
            flag_spec_option = self._tool.builders.flag_specific_option(
                task.build.builder_name, task.step_name)
            if flag_spec_option:
                exp_file = self._host_port.path_to_flag_specific_expectations_file(
                    flag_spec_option)
            else:
                exp_file = (
                    self._host_port.path_to_generic_test_expectations_file())
            tasks_by_exp_file[exp_file].add(task)
        return tasks_by_exp_file

    def _format_line(self, task: RebaselineTask) -> str:
        results = set(
            self._result_for_test(task.test, task.build,
                                  task.step_name).actual_results())
        # This assertion holds because we only request unexpected non-PASS
        # results from ResultDB. This precondition ensures `result_tags` is not
        # `[ Pass ]` or empty.
        assert results - {ResultType.Pass}
        specifier = self._tool.builders.version_specifier_for_port_name(
            task.port_name)
        reasons = self._rebaseline_failures[task]
        if (reasons == RebaselineFailureReason.TIMEOUT_OR_CRASH
                and ResultType.Failure in results):
            # If no other rebaseline failure reason is present, the test failure
            # was successfully rebaselined and will pass going forward.
            results.remove(ResultType.Failure)
            results.add(ResultType.Pass)
        result_tags = ' '.join(RESULT_TAGS[result]
                               for result in sorted(results))
        line = f'{task.test} [ {result_tags} ]'
        descriptions = list(
            filter(None, map(RebaselineFailureReason.DESCRIPTIONS.get,
                             reasons)))
        if descriptions:
            comment = ', '.join(descriptions).capitalize()
            line += f'  # {comment}'
        return f'[ {specifier} ] {line}' if specifier else line

    def unstaged_baselines(self):
        """Returns absolute paths for unstaged (including untracked) baselines."""
        baseline_re = re.compile(r'.*[\\/]' + WEB_TESTS_LAST_COMPONENT +
                                 r'[\\/].*-expected\.(' +
                                 '|'.join(get_args(BaselineSuffix)) + ')$')
        unstaged_changes = self._tool.git().unstaged_changes()
        return sorted(self._tool.git().absolute_path(path)
                      for path in unstaged_changes
                      if re.match(baseline_re, path))

    def _web_tests_dir(self):
        return self._host_port.web_tests_dir()

    def _suffixes_for_actual_failures(self, test, build, step_name=None):
        """Gets the baseline suffixes for actual mismatch failures in some results.

        Args:
            test: A full test path string.
            build: A Build object.
            step_name: The name of the build step the test ran in.

        Returns:
            A set of file suffix strings.
        """
        test_result = self._result_for_test(test, build, step_name)
        if not test_result:
            return set()
        return set(test_result.baselines_by_suffix())

    @memoized
    def _result_for_test(self, test, build, step_name):
        results = self._tool.results_fetcher.gather_results(build, step_name)
        if not results:
            _log.debug('No results found for build %s', build)
            return None
        test_result = results.result_for_test(test)
        if not test_result:
            _log.info('No test result for test %s in build %s', test, build)
            return None
        if self._results_dir:
            for artifact_name, suffix in [
                ('actual_text', 'txt'),
                ('actual_image', 'png'),
                ('actual_audio', 'wav'),
            ]:
                filename = self._file_name_for_actual_result(test, suffix)
                path = self._tool.filesystem.join(self._results_dir, filename)
                test_result.artifacts[artifact_name] = [Artifact(path)]
        return test_result


class Rebaseline(AbstractParallelRebaselineCommand):
    name = 'rebaseline'
    help_text = 'Rebaseline tests with results from the continuous builders.'
    show_in_main_help = True
    argument_names = '[TEST_NAMES]'

    def __init__(self):
        super().__init__(options=[
            self.no_optimize_option,
            self.dry_run_option,
            # FIXME: should we support the platform options in addition to (or instead of) --builders?
            self.results_directory_option,
            optparse.make_option(
                '--builders',
                default=None,
                action='append',
                help=
                ('Comma-separated-list of builders to pull new baselines from '
                 '(can also be provided multiple times).')),
            *self.wpt_options,
        ])

    def _builders_to_pull_from(self):
        return self._tool.user.prompt_with_list(
            'Which builder to pull results from:',
            self._release_builders(),
            can_choose_multiple=True)

    def execute(self, options, args, tool):
        self._dry_run = options.dry_run
        if not args:
            _log.error('Must list tests to rebaseline.')
            return

        if options.builders:
            builders_to_check = []
            for builder_names in options.builders:
                builders_to_check += builder_names.split(',')
        else:
            builders_to_check = self._builders_to_pull_from()

        test_baseline_set = TestBaselineSet(tool.builders)
        tests = self._host_port.tests(args)
        for builder in builders_to_check:
            build = Build(builder)
            step_names = self._tool.builders.step_names_for_builder(
                build.builder_name)
            for step_name in step_names:
                for test in tests:
                    test_baseline_set.add(test, build, step_name)

        _log.debug('Rebaselining: %s', test_baseline_set)
        return self.rebaseline(options, test_baseline_set)


@dataclass
class BaselineCacheStatistics:
    hit_count: int = 0
    hit_bytes: int = 0
    total_count: int = 0
    total_bytes: int = 0

    def __add__(self,
                other: 'BaselineCacheStatistics') -> 'BaselineCacheStatistics':
        return BaselineCacheStatistics(self.hit_count + other.hit_count,
                                       self.hit_bytes + other.hit_bytes,
                                       self.total_count + other.total_count,
                                       self.total_bytes + other.total_bytes)

    def __str__(self) -> str:
        return (
            f'hit rate: {self.hit_count}/{self.total_count} '
            f'({_percentage(self.hit_count, self.total_count):.1f}%), '
            f'bytes served via cache: {self.hit_bytes}/{self.total_bytes}B '
            f'({_percentage(self.hit_bytes, self.total_bytes):.1f}%)')

    def record(self, num_bytes: int, hit: bool = False):
        self.total_bytes += num_bytes
        self.total_count += 1
        if hit:
            self.hit_bytes += num_bytes
            self.hit_count += 1


class RebaselineFailure(Exception):
    """Represents a rebaseline task that could not be executed."""
    def __init__(self, reason: RebaselineFailureReason, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.reason = reason


class BaselineLoader:
    """Selects a usable baseline from a list of artifacts.

    This class encompasses:
     1. In-memory baseline caching, keyed on the content's hash digest.
        Baselines likely to be identical (e.g., for a virtual and its base
        test) are downloaded together. The cache takes advantage of this
        temporal locality. For simplicity, the cache is not bounded. It's the
        caller's responsibility to `clear()` the cache as needed.
     2. Finding a "good" baseline for fuzzy-matched pixel tests according to
        some heuristics. See `choose_valid_baseline(...)` for details.
    """

    def __init__(self, host: Host, default_port: Port):
        self._host = host
        self._default_port = default_port
        self._digests_to_contents = {}
        # Image diff statistics are commutative. By canonicalizing the
        # (expected, actual) argument order and caching the result, we can
        # avoid invoking `image_diff` a second time unnecessarily when the
        # arguments are swapped.
        make_cached = functools.lru_cache(maxsize=128)
        self._diff_image = make_cached(self._default_port.diff_image)
        self.stats = BaselineCacheStatistics()

    def _image_diff_stats(self, actual: bytes, expected: bytes):
        # By definition, an image has zero diff with itself.
        if actual == expected:
            return {'maxDifference': 0, 'totalPixels': 0}
        _, stats, _ = self._diff_image(*sorted([actual, expected]))
        return stats

    def _fetch_contents(self, url: str) -> bytes:
        # Assume this is a local file.
        if not urlparse(url).scheme:
            try:
                return bytes(self._host.filesystem.read_binary_file(url))
            except FileNotFoundError:
                raise RebaselineFailure(
                    RebaselineFailureReason.LOCAL_BASELINE_NOT_FOUND)
        # Ensure that this is an immutable bytestring (i.e., not `bytearray`).
        return bytes(self._host.web.get_binary(url))

    def load(self, artifact: Artifact) -> bytes:
        hit = False
        if artifact.digest:
            contents = self._digests_to_contents.get(artifact.digest)
            if contents is None:
                contents = self._fetch_contents(artifact.url)
                self._digests_to_contents[artifact.digest] = contents
            else:
                hit = True
        else:
            contents = self._fetch_contents(artifact.url)
        self.stats.record(len(contents), hit)
        return contents

    def choose_valid_baseline(self, artifacts: List[Artifact], test_name: str,
                              suffix: BaselineSuffix) -> bytes:
        """Choose a baseline that would have allowed the observed runs to pass.

        Usually, this means returning the contents of a non-flaky artifact
        (i.e., has identical contents across all retries). However, for flaky
        fuzzy-matched pixel tests, we can return an image that would have
        matched all observed retries under existing fuzzy parameters.

        Raises:
            RebaselineFailure: If a test cannot be rebaselined (e.g., flakiness
                outside fuzzy parameters).
        """
        if suffix == 'png' and self._default_port.reference_files(test_name):
            raise RebaselineFailure(
                RebaselineFailureReason.REFTEST_IMAGE_FAILURE)
        assert artifacts
        contents_by_run = {
            artifact: self.load(artifact)
            for artifact in artifacts
        }
        contents = set(contents_by_run.values())
        if len(contents) > 1:
            if suffix == 'png':
                # Because we only rebaseline tests that only had unexpected
                # failures, a test using fuzzy matching can only reach this
                # point if every retry's image fell outside the acceptable
                # range with the current baseline.
                return self._find_fuzzy_matching_baseline(
                    test_name, contents_by_run)
            raise RebaselineFailure(RebaselineFailureReason.FLAKY_OUTPUT)
        return contents.pop()

    def _find_fuzzy_matching_baseline(
        self,
        test_name: str,
        contents_by_run: Dict[Artifact, bytes],
    ) -> bytes:
        max_diff_range, total_pixels_range = (
            self._default_port.get_wpt_fuzzy_metadata(test_name))
        # No fuzzy parameters present.
        if not max_diff_range or not total_pixels_range:
            raise RebaselineFailure(RebaselineFailureReason.FLAKY_OUTPUT)

        max_diff_min, max_diff_max = max_diff_range
        total_pixels_min, total_pixels_max = total_pixels_range
        # The fuzzy parameters must allow the chosen baseline to match itself.
        # Rebaselining a pixel test with nonzero parameter minimums doesn't make
        # sense because any `actual_image` you select as the new baseline will
        # not match if that actual output is seen again.
        if max_diff_min > 0 or total_pixels_min > 0:
            raise RebaselineFailure(RebaselineFailureReason.FLAKY_OUTPUT)

        match_criteria = {}
        for artifact, contents in contents_by_run.items():
            max_diff_needed, total_pixels_needed = (
                self._find_needed_fuzzy_params(contents, contents_by_run))
            fuzzy_mismatch = (max_diff_needed > max_diff_max
                              or total_pixels_needed > total_pixels_max)
            match_criteria[artifact] = (fuzzy_mismatch, total_pixels_needed,
                                        max_diff_needed)

        chosen_artifact = min(match_criteria, key=match_criteria.get)
        fuzzy_mismatch, _, _ = match_criteria[chosen_artifact]
        if fuzzy_mismatch:
            # TODO(crbug.com/1426622): Here, the existing parameters are
            # insufficient for ensuring the test would have passed consistently.
            # Consider suggesting new fuzzy parameters before suggesting
            # TestExpectations, using the tight bounds in
            # `match_criteria[chosen_artifact]`.
            raise RebaselineFailure(RebaselineFailureReason.FLAKY_OUTPUT)
        return contents_by_run[chosen_artifact]

    def _find_needed_fuzzy_params(
        self,
        candidate: bytes,
        contents_by_run: Dict[Artifact, bytes],
    ) -> Tuple[int, int]:
        """Find parameters needed to fuzzily match every observed image.

        Arguments:
            candidate: A candidate baseline that every other run's image will
                be diffed against.

        Returns:
            The minimum (maxDifference, totalPixels) parameters needed.
        """
        max_diff_max = total_pixels_max = 0
        for artifact, run_contents in contents_by_run.items():
            stats = self._image_diff_stats(candidate, run_contents)
            if not stats:
                continue
            max_diff_max = max(max_diff_max, stats['maxDifference'])
            total_pixels_max = max(total_pixels_max, stats['totalPixels'])
        return max_diff_max, total_pixels_max

    def clear(self):
        self._digests_to_contents.clear()


class Worker:
    """A worker delegate for running rebaseline tasks in parallel.

    The purpose of this worker is to persist resources across tasks:
      * HTTP(S) connections to ResultDB
      * TestExpectations cache
      * Baseline cache

    See Also:
        crbug.com/1213998#c50
    """

    def __init__(self,
                 connection,
                 dry_run: bool = False):
        self._connection = connection
        self._dry_run = dry_run
        self._commands = {
            'copy_baselines': self._copy_baselines,
            'download_baselines': self._download_baselines,
        }

    def start(self):
        self._copier = BaselineCopier(self._connection.host)
        self._host = self._connection.host
        self._default_port = self._host.port_factory.get()
        self._default_port.set_option_default('manifest_update', False)
        self._fs = self._connection.host.filesystem
        self._baseline_loader = BaselineLoader(self._host, self._default_port)

    def stop(self):
        if hasattr(self, '_baseline_loader'):
            self._connection.post('report_baseline_cache_stats',
                                  self._baseline_loader.stats)

    def handle(self, name: str, source: str, *args):
        response = self._commands[name](*args)
        # Post a message to the managing process to flush this worker's logs.
        if response is not None:
            self._connection.post(name, *response)
        else:
            self._connection.post(name)

    def _copy_baselines(self, test_name: str, suffix: BaselineSuffix,
                        group: TestBaselineSet):
        copies = list(
            self._copier.find_baselines_to_copy(test_name, suffix, group))
        copies.sort(key=lambda copy: copy[1])
        if self._dry_run:
            for source, dest in copies:
                _log.debug('Would have copied %s -> %s', source or '<extra>',
                           dest)
        else:
            # The placeholder is the contents of an "extra baseline" (as
            # defined by `ResultDigest`) that replicates omitting an explicit
            # baseline.
            if self._default_port.is_testharness_test(test_name):
                placeholder = testharness_results.ABBREVIATED_ALL_PASS
            else:
                placeholder = ''
            self._copier.write_copies(copies, placeholder)
        return test_name, suffix

    def _download_baselines(self, base_test: str,
                            group: RebaselineGroup) -> RebaselineFailures:
        self._baseline_loader.clear()
        rebaseline_failures = {}
        for task, result in group.items():
            failure_reason = RebaselineFailureReason(0)
            if set(result.actual_results()) & {
                    ResultType.Crash,
                    ResultType.Timeout,
            }:
                failure_reason |= RebaselineFailureReason.TIMEOUT_OR_CRASH
            for suffix, artifacts in result.baselines_by_suffix().items():
                try:
                    contents = self._baseline_loader.choose_valid_baseline(
                        artifacts, task.test, suffix)
                    self._write_baseline(task, suffix, artifacts[0].url,
                                         contents)
                except RebaselineFailure as error:
                    failure_reason |= error.reason
            if failure_reason:
                rebaseline_failures[task] = failure_reason
        return base_test, rebaseline_failures

    def _write_baseline(self, task: RebaselineTask, suffix: BaselineSuffix,
                        source: str, contents: bytes):
        port = self._host.port_factory.get(task.port_name)
        flag_spec_option = self._host.builders.flag_specific_option(
            task.build.builder_name, task.step_name)
        port.set_option_default('flag_specific', flag_spec_option)
        dest = self._fs.join(
            port.baseline_version_dir(),
            port.output_filename(task.test,
                                 test_failures.FILENAME_SUFFIX_EXPECTED,
                                 '.' + suffix))
        _log.debug('Retrieving source %s for target %s.', source, dest)
        if not self._dry_run:
            self._fs.maybe_make_directory(self._fs.dirname(dest))
            self._fs.write_binary_file(dest, contents)


def _percentage(a: float, b: float) -> float:
    return 100 * (a / b) if b > 0 else float('nan')