chromium/third_party/blink/tools/blinkpy/w3c/import_notifier.py

# Copyright 2017 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Sends notifications after automatic imports from web-platform-tests (WPT).

Automatically file bugs for new failures caused by WPT imports for opted-in
directories.

Design doc: https://docs.google.com/document/d/1W3V81l94slAC_rPcTKWXgv3YxRxtlSIAxi3yj6NsbBw/edit?usp=sharing
"""

import logging
import re
import typing
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import PurePath
from typing import (
    List,
    Mapping,
    MutableMapping,
    NamedTuple,
    Optional,
    Tuple,
    Union,
)

from blinkpy.common import path_finder
from blinkpy.common.checkout.git import CommitRange, FileStatusType
from blinkpy.common.memoized import memoized
from blinkpy.common.net.git_cl import CLRevisionID
from blinkpy.common.system.executive import ScriptError
from blinkpy.web_tests.models import typ_types
from blinkpy.web_tests.models.test_expectations import (
    ExpectationsChange,
    TestExpectations,
)
from blinkpy.web_tests.models.testharness_results import (
    LineType,
    Status,
    TestharnessLine,
    parse_testharness_baseline,
)
from blinkpy.w3c.buganizer import (
    BuganizerClient,
    BuganizerError,
    BuganizerIssue,
)
from blinkpy.w3c.common import (
    AUTOROLLER_EMAIL,
    WPT_GH_URL,
    WPT_GH_RANGE_URL_TEMPLATE,
)
from blinkpy.w3c.directory_owners_extractor import DirectoryOwnersExtractor
from blinkpy.w3c.gerrit import GerritAPI, GerritCL, OutputOption
from blinkpy.w3c.wpt_results_processor import TestType

_log = logging.getLogger(__name__)

GITHUB_COMMIT_PREFIX = WPT_GH_URL + 'commit/'
CHECKS_URL_TEMPLATE = 'https://chromium-review.googlesource.com/c/chromium/src/+/{}/{}?checksPatchset=1&tab=checks'
BUGANIZER_WPT_COMPONENT = '1456176'

IssuesByDir = Mapping[str, BuganizerIssue]


class ImportNotifier:
    IMPORT_SUBJECT_PREFIX = 'Import wpt@'
    COMMENT_PREAMBLE = 'Filed bugs for failures introduced by this CL: '

    def __init__(self,
                 host,
                 chromium_git,
                 local_wpt,
                 gerrit_api: GerritAPI,
                 buganizer_client: Optional[BuganizerClient] = None):
        self.host = host
        self.git = chromium_git
        self.local_wpt = local_wpt
        self._gerrit_api = gerrit_api
        self._buganizer_client = buganizer_client or BuganizerClient()

        self.finder = path_finder.PathFinder(host.filesystem)
        self.default_port = host.port_factory.get()
        self.default_port.set_option_default('additional_expectations', [
            self.finder.path_from_web_tests('MobileTestExpectations'),
        ])
        self.default_port.set_option_default('test_types',
                                             typing.get_args(TestType))
        self.owners_extractor = DirectoryOwnersExtractor(host)
        self.new_failures_by_directory = defaultdict(DirectoryFailures)

    def main(self, dry_run: bool = False) -> Tuple[IssuesByDir, GerritCL]:
        """Files bug reports for new failures.

        Arguments:
            dry_run: If True, no bugs will be actually filed to crbug.com.

        Returns:
            A map from a WPT directory to a new bug filed.

        Raises:
            GerritError: A network failure when calling Gerrit.
            ImportNotifierError: An invariant violation, which could suggest
                checkout or Gerrit corruption.

        Note: "test names" are paths of the tests relative to web_tests.
        """
        wpt_end_rev, import_rev = self.latest_wpt_import()
        cl = self._cl_for_wpt_revision(wpt_end_rev)
        repo = self.host.project_config.gerrit_project
        _log.info(f'Identifying failures for {repo}@{import_rev} ({cl.url})')
        if self._bugs_already_filed(cl):
            _log.info(f'Bugs have already been filed.')
            return {}, cl
        wpt_start_rev, _ = self.latest_wpt_import(f'{import_rev}~1')

        self.examine_baseline_changes(import_rev, cl.current_revision_id)
        self.examine_new_test_expectations(import_rev)
        wpt_range = CommitRange(wpt_start_rev, wpt_end_rev)
        bugs = self.create_bugs_from_new_failures(wpt_range,
                                                  cl.current_revision_id)
        filed_bugs = self.file_bugs(bugs, dry_run)
        if filed_bugs:
            cl.post_comment(
                self.COMMENT_PREAMBLE +
                ', '.join(sorted(bug.link for bug in filed_bugs.values())))
        return filed_bugs, cl

    @memoized
    def latest_wpt_import(
            self,
            commits: Union[None, str, CommitRange] = None) -> Tuple[str, str]:
        """Get commit hashes for the last WPT import.

        Arguments:
            commits: The range to search. See `Git.most_recent_log_matching()`
                docstring for usage.

        Returns:
            A pair of SHA-1 hex digests (40 hex digits each):
              * A valid commit in the WPT repo denoting how far WPT was rolled
                (inclusive).
              * The corresponding `chromium/src` commit where those changes
                were rolled.
        """
        raw_log = self.git.most_recent_log_matching(
            f'^{self.IMPORT_SUBJECT_PREFIX}',
            commits=commits,
            format_pattern='%s:%H').strip()
        if raw_log.startswith(self.IMPORT_SUBJECT_PREFIX):
            revisions = raw_log[len(self.IMPORT_SUBJECT_PREFIX):]
            wpt_rev, _, chromium_rev = revisions.partition(':')
            assert len(wpt_rev) == 40, wpt_rev
            assert len(chromium_rev) == 40, chromium_rev
            return wpt_rev, chromium_rev
        raise ImportNotifierError(
            f'unable to find latest WPT revision within {commits!r}')

    def _bugs_already_filed(self, cl: GerritCL) -> bool:
        return any(self.COMMENT_PREAMBLE in message['message']
                   for message in cl.messages)

    def examine_baseline_changes(self, import_rev: str,
                                 cl_revision: CLRevisionID):
        """Examines all changed baselines to find new failures.

        Arguments:
            import_rev: A chromium/src revision pointing to the import commit.
            cl_revision: Issue and patchset numbers of the imported CL.
        """
        assert cl_revision.patchset, cl_revision
        sep = re.escape(self.host.filesystem.sep)
        platform_pattern = f'(platform|flag-specific){sep}([^{sep}]+){sep}'
        baseline_pattern = re.compile(f'web_tests{sep}({platform_pattern})?')
        import_range = CommitRange(f'{import_rev}~1', import_rev)
        diff_filter = (FileStatusType.ADD | FileStatusType.MODIFY
                       | FileStatusType.RENAME)
        # Use a fairly high similarity threshold to avoid comparing unrelated
        # baselines, which is worse than missing a rename and filing a duplicate
        # bug.
        changed_files = self.git.changed_files(import_range,
                                               diff_filter=diff_filter,
                                               rename_threshold=0.9)
        for changed_file, status in changed_files.items():
            parts = baseline_pattern.split(changed_file, maxsplit=1)[1:]
            if not parts:
                continue
            test = self.default_port.test_from_output_filename(parts[-1])
            if not test:
                continue
            directory = self.find_directory_for_bug(test)
            if not directory:
                continue
            lines_before = self._read_baseline(status.source or changed_file,
                                               import_range.start)
            lines_after = self._read_baseline(changed_file, import_range.end)
            if self.more_failures_in_baseline(lines_before, lines_after):
                failures = self.new_failures_by_directory[directory]
                failures.baseline_failures.append(
                    BaselineFailure(test, f'{cl_revision}/{changed_file}'))

    def more_failures_in_baseline(
        self,
        old_lines: List[TestharnessLine],
        new_lines: List[TestharnessLine],
    ) -> bool:
        """Determines if a testharness.js baseline file has new failures.

        We recognize two types of failures: FAIL lines, which are output for a
        specific subtest failing, and harness errors, which indicate an uncaught
        error in the test. Increasing numbers of either are considered new
        failures - this includes going from FAIL to error or vice-versa.
        """
        failure_statuses = set(Status) - {Status.PASS, Status.NOTRUN}
        old_failures = [
            line for line in old_lines if line.statuses & failure_statuses
        ]
        new_failures = [
            line for line in new_lines if line.statuses & failure_statuses
        ]

        # TODO(crbug.com/329869593): Consider notifying about any new failure
        # (as determined by subtest name), not just baselines with increasing
        # total failures.
        is_error = lambda line: line.line_type is LineType.HARNESS_ERROR
        if sum(map(is_error, new_failures)) > sum(map(is_error, old_failures)):
            return True
        is_subtest = lambda line: line.line_type is LineType.SUBTEST
        return sum(map(is_subtest, new_failures)) > sum(
            map(is_subtest, old_failures))

    def _read_baseline(self, baseline_path: str,
                       ref: str) -> List[TestharnessLine]:
        try:
            contents = self.git.show_blob(baseline_path, ref)
            return parse_testharness_baseline(
                contents.decode(errors='replace'))
        except ScriptError:
            return []

    def examine_new_test_expectations(self, import_rev: str):
        """Examines new test expectations to find new failures.

        Arguments:
            import_rev: A chromium/src revision pointing to the import commit.
        """
        import_range = CommitRange(f'{import_rev}~1', import_rev)
        exp_files = set(self.default_port.all_expectations_dict())
        for changed_file in self.git.changed_files(import_range):
            abs_changed_file = self.finder.path_from_chromium_base(
                changed_file)
            if abs_changed_file not in exp_files:
                continue
            lines_before = self._read_exp_lines(changed_file,
                                                import_range.start)
            lines_after = self._read_exp_lines(changed_file, import_range.end)
            change = ExpectationsChange(lines_added=lines_after)
            change += ExpectationsChange(lines_removed=lines_before)

            for line in change.lines_added:
                directory = self.find_directory_for_bug(line.test)
                if directory:
                    failures = self.new_failures_by_directory[directory]
                    failures.exp_by_file[changed_file].append(line)

    def _read_exp_lines(self, path: str,
                        ref: str) -> List[typ_types.ExpectationType]:
        abs_path = self.finder.path_from_chromium_base(path)
        expectations = TestExpectations(
            self.default_port,
            {abs_path: self.git.show_blob(path, ref).decode()})
        return expectations.get_updated_lines(abs_path)

    def create_bugs_from_new_failures(
        self,
        wpt_range: CommitRange,
        cl_revision: CLRevisionID,
    ) -> Mapping[str, BuganizerIssue]:
        """Files bug reports for new failures.

        Arguments:
            wpt_range: The imported WPT revision range. The start is exclusive
                (i.e., the last imported revision) and the end is inclusive
                (i.e., the current imported reivision).
            cl_revision: Issue number and patchset of the imported CL.

        Returns:
            A map from a WPT directory to its corresponding issue to file.
        """
        assert cl_revision.patchset, cl_revision
        cl_revision_no_ps = CLRevisionID(cl_revision.issue)
        checks_url = CHECKS_URL_TEMPLATE.format(cl_revision.issue, '1')
        imported_commits = self.local_wpt.commits_in_range(*wpt_range)
        bugs = {}
        for directory, failures in self.new_failures_by_directory.items():
            summary = '[WPT] New failures introduced in {} by import {}'.format(
                directory, cl_revision_no_ps)

            full_directory = self.host.filesystem.join(
                self.finder.web_tests_dir(), directory)
            owners_file = self.host.filesystem.join(full_directory, 'OWNERS')
            metadata = self.owners_extractor.read_dir_metadata(full_directory)
            if not metadata or not metadata.should_notify:
                _log.info("WPT-NOTIFY disabled in %s." % full_directory)
                continue

            cc = []
            if metadata.team_email:
                cc.append(metadata.team_email)
            try:
                cc.extend(self.owners_extractor.extract_owners(owners_file))
            except FileNotFoundError:
                _log.warning(f'{owners_file!r} does not exist and '
                             'was not added to the CC list.')

            prologue = ('WPT import {} introduced new failures in {}:\n\n'
                        'List of new failures:\n'.format(
                            cl_revision_no_ps, directory))
            failure_list = failures.format_for_description(cl_revision)
            checks = '\nSee {} for details.\n'.format(checks_url)

            expectations_statement = (
                '\nExpectations or baseline files [0] have been automatically '
                'added for the failing results to keep the bots green. Please '
                'investigate the new failures and triage as appropriate.\n')
            range_statement = '\nUpstream changes imported:\n'
            range_statement += WPT_GH_RANGE_URL_TEMPLATE.format(
                *wpt_range) + '\n'
            commit_list = self.format_commit_list(imported_commits,
                                                  full_directory)
            links_list = '\n[0]: https://chromium.googlesource.com/chromium/src/+/HEAD/docs/testing/web_test_expectations.md\n'
            dir_metadata_path = self.host.filesystem.join(
                directory, "DIR_METADATA")
            epilogue = (
                '\nThis bug was filed automatically due to a new WPT test '
                'failure for which you are marked an OWNER. '
                'If you do not want to receive these reports, please add '
                '"wpt { notify: NO }"  to the relevant DIR_METADATA file.')

            # TODO(https://crbug.com/40631540): Format the description with
            # `textwrap.dedent(f'...')` so it's easier to tell what the final
            # formatted message looks like.
            description = (prologue + failure_list + checks +
                           expectations_statement + range_statement +
                           commit_list + links_list + epilogue)

            bug = BuganizerIssue(
                title=summary,
                description=description,
                component_id=(metadata.buganizer_public_component
                              or BUGANIZER_WPT_COMPONENT),
                cc=cc)
            _log.info("WPT-NOTIFY enabled in %s; adding the bug to the pending list." % full_directory)
            _log.info(f'{bug}')
            bugs[directory] = bug
        return bugs

    def format_commit_list(self, imported_commits, directory):
        """Formats the list of imported WPT commits.

        Imports affecting the given directory will be highlighted.

        Args:
            imported_commits: A list of (SHA, commit subject) pairs.
            directory: An absolute path of a directory in the Chromium repo, for
                which the list is formatted.

        Returns:
            A multi-line string.
        """
        path_from_wpt = self.host.filesystem.relpath(
            directory, self.finder.path_from_web_tests('external', 'wpt'))
        commit_list = ''
        for sha, subject in imported_commits:
            # subject is a Unicode string and can contain non-ASCII characters.
            line = u'{}: {}'.format(subject, GITHUB_COMMIT_PREFIX + sha)
            if self.local_wpt.is_commit_affecting_directory(
                    sha, path_from_wpt):
                line += ' [affecting this directory]'
            commit_list += line + '\n'
        return commit_list

    def find_directory_for_bug(self, test_name: str) -> Optional[str]:
        """Find the lowest directory with `DIR_METADATA` containing the test.

        Args:
            test_name: The name of the test (a path relative to web_tests).

        Returns:
            The path of the found directory relative to web_tests, if found.
        """
        # Always use non-virtual test names when looking up `DIR_METADATA`.
        if self.default_port.lookup_virtual_test_base(test_name):
            test_name = self.default_port.lookup_virtual_test_base(test_name)
        # `find_dir_metadata_file` takes either a relative path from the *root*
        # of the repository, or an absolute path.
        abs_test_path = self.finder.path_from_web_tests(test_name)
        metadata_file = self.owners_extractor.find_dir_metadata_file(
            self.host.filesystem.dirname(abs_test_path))
        if not metadata_file:
            _log.warning('Cannot find DIR_METADATA for %s.', test_name)
            return None
        owned_directory = self.host.filesystem.dirname(metadata_file)
        short_directory = self.host.filesystem.relpath(
            owned_directory, self.finder.web_tests_dir())
        return short_directory

    def file_bugs(self,
                  bugs: Mapping[str, BuganizerIssue],
                  dry_run: bool = False) -> List[BuganizerIssue]:
        """Files a list of bugs to Buganizer.

        Arguments:
            bugs: A list of bugs to file.
            dry_run: A boolean, whether we are in dry run mode.

        Returns:
            A map from a WPT directory to a bug actually filed successfully
            (i.e., this map may be smaller than the input map).
        """
        if dry_run:
            _log.info(
                '[dry_run] Would have filed the %d bugs in the pending list.',
                len(bugs))
            return []

        _log.info('Filing %d bugs in the pending list to Buganizer', len(bugs))
        filed_bugs = {}
        for index, (directory, bug) in enumerate(bugs.items(), start=1):
            try:
                bug = self._buganizer_client.NewIssue(bug)
                _log.info(f'[{index}] Filed bug: {bug.link}')
                filed_bugs[directory] = bug
            except BuganizerError as error:
                _log.exception('Failed to file bug', exc_info=error)
        return filed_bugs

    def _cl_for_wpt_revision(self, wpt_revision: str) -> GerritCL:
        query = ' '.join([
            f'owner:{AUTOROLLER_EMAIL}',
            f'prefixsubject:"{self.IMPORT_SUBJECT_PREFIX}{wpt_revision}"',
            'status:merged',
        ])
        output = GerritAPI.DEFAULT_OUTPUT | OutputOption.MESSAGES
        cls = self._gerrit_api.query_cls(query, limit=1, output_options=output)
        if not cls:
            raise ImportNotifierError(f'query {query!r} returned no CLs')
        return cls[0]


class ImportNotifierError(Exception):
    """Represents an unsuccessful notification attempt."""


class BaselineFailure(NamedTuple):
    test: str
    url: str

    def __str__(self) -> str:
        message = ''
        platform = re.search(r'/platform/([^/]+)/', self.url)
        if platform:
            message += '[ {} ] '.format(platform.group(1).capitalize())
        message += f'{self.test} new failing tests: {self.url}'
        return message


@dataclass
class DirectoryFailures:
    """A thin container for new failures under a WPT directory.

    This corresponds 1-1 to a filed bug.
    """
    exp_by_file: MutableMapping[str, List[typ_types.ExpectationType]] = field(
        default_factory=lambda: defaultdict(list))
    baseline_failures: List[BaselineFailure] = field(default_factory=list)

    def format_for_description(self, cl_revision: CLRevisionID) -> str:
        assert cl_revision.patchset, cl_revision
        lines = [str(failure) for failure in self.baseline_failures]
        for path in sorted(self.exp_by_file):
            for exp in self.exp_by_file[path]:
                path_for_url = PurePath(path).as_posix()
                url = f'{cl_revision}/{path_for_url}#{exp.lineno}'
                lines.append(f'{exp.to_string()}: {url}')
        return '\n'.join(lines) + '\n'