chromium/third_party/blink/tools/blinkpy/tool/commands/build_resolver.py

# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import contextlib
import logging
import re
from concurrent.futures import Executor
from typing import Collection, Dict, Optional, Tuple

from requests.exceptions import RequestException

from blinkpy.common import exit_codes
from blinkpy.common.host import Host
from blinkpy.common.net.rpc import Build
from blinkpy.common.net.git_cl import (
    BuildStatus,
    BuildStatuses,
    CLRevisionID,
    GitCL,
)
from blinkpy.tool.grammar import pluralize
from blinkpy.w3c.gerrit import GerritAPI, OutputOption

_log = logging.getLogger(__name__)


class UnresolvedBuildException(Exception):
    """Exception raised when the tool should halt because of unresolved builds.

    Note that this is not necessarily an error that needs action (e.g.,
    waiting for unfinished tryjobs).
    """


class BuildResolver:
    """Resolve builder names and build numbers into statuses.

    This resolver fetches build information from Buildbucket.
    """
    # Build fields required by `_build_statuses_from_responses`.
    _build_fields = [
        'id',
        'number',
        'builder.builder',
        'builder.bucket',
        'status',
        'output.properties',
        'steps.*.name',
        'steps.*.logs.*.name',
        'steps.*.logs.*.view_url',
    ]

    def __init__(self,
                 host: Host,
                 git_cl: GitCL,
                 io_pool: Optional[Executor] = None,
                 gerrit: Optional[GerritAPI] = None,
                 can_trigger_jobs: bool = False):
        self._web = host.web
        self._gerrit = gerrit or GerritAPI(host)
        self._git_cl = git_cl
        self._io_pool = io_pool
        self._can_trigger_jobs = can_trigger_jobs

    def _builder_predicate(self, build: Build) -> Dict[str, str]:
        return {
            'project': 'chromium',
            'bucket': build.bucket,
            'builder': build.builder_name,
        }

    def _build_statuses_from_responses(self, raw_builds) -> BuildStatuses:
        raw_builds = list(raw_builds)
        map_fn = self._io_pool.map if self._io_pool else map
        statuses = map_fn(self._status_if_interrupted, raw_builds)
        return {
            Build(build['builder']['builder'], build['number'], build['id'],
                  build['builder']['bucket']): status
            for build, status in zip(raw_builds, statuses)
        }

    def resolve_builds(self,
                       builds: Collection[Build],
                       patchset: Optional[int] = None) -> BuildStatuses:
        """Resolve builders (maybe with build numbers) into statuses.

        Arguments:
            builds: Builds to fetch. If the build number is not specified, the
                build number is inferred:
                  * For try builders, the build for the specified patchset on
                    the current CL. If such a build does not exist, a build can
                    be triggered.
                  * For CI builders, the latest failing build.
                Multiple builds from the same builder are allowed.
            patchset: Patchset that try build results should be fetched from.
        """
        try_builders_to_infer = set()
        for build in builds:
            if build.build_number:
                self._git_cl.bb_client.add_get_build_req(
                    build,
                    build_fields=self._build_fields)
            elif build.bucket == 'try':
                try_builders_to_infer.add(build.builder_name)
            else:
                predicate = {
                    'builder': self._builder_predicate(build),
                    'status': 'FAILURE'
                }
                self._git_cl.bb_client.add_search_builds_req(
                    predicate, build_fields=self._build_fields, count=1)

        build_statuses = {}
        # Handle implied tryjobs first, since there are more failure modes.
        if try_builders_to_infer:
            try_build_statuses = self.fetch_or_trigger_try_jobs(
                try_builders_to_infer, patchset)
            build_statuses.update(try_build_statuses)
            # Re-request completed try builds so that the resolver can check
            # for interrupted steps.
            for build, status in try_build_statuses.items():
                if build.build_number and status in BuildStatus.COMPLETED:
                    self._git_cl.bb_client.add_get_build_req(
                        build, build_fields=self._build_fields)
        # Find explicit or CI builds.
        build_statuses.update(
            self._build_statuses_from_responses(
                self._git_cl.bb_client.execute_batch()))
        if build_statuses:
            self.log_builds(build_statuses)
        if BuildStatus.TRIGGERED in build_statuses.values():
            raise UnresolvedBuildException(
                'Once all pending try jobs have finished, '
                'please re-run the tool to fetch new results.')
        return build_statuses

    def _status_if_interrupted(self, raw_build) -> BuildStatus:
        """Map non-browser-related failures to an infrastructue failure status.

        Such failures include shard-level timeouts and early exits caused by
        exceeding the failure threshold. These failures are opaque to LUCI, but
        can be discovered from `run_web_tests.py` exit code conventions.
        """
        # TODO(crbug.com/352762538):
        #  1. Fetch shard exit codes separately for each suite.
        #  2. Instead of coercing bad shards to `INFRA_FAILURE`, they should be
        #     interpreted to populate `WebTestResults.incomplete_reason`
        #     directly.
        run_web_tests_pattern = re.compile(
            r'[\w_-]*(webdriver|blink_(web|wpt))_tests.*\(with patch\)[^|]*')
        status = BuildStatus[raw_build['status']]
        if status is BuildStatus.FAILURE:
            output_props = raw_build.get('output', {}).get('properties', {})
            # Buildbucket's `FAILURE` status encompasses both normal test
            # failures (i.e., needs rebaseline) and unrelated compile or
            # result merge failures. To distinguish them, look at the failure
            # reason yielded by the recipe.
            failure_type = output_props.get('failure_type')
            try:
                status = BuildStatus[failure_type]
            except KeyError:
                status = BuildStatus.OTHER_FAILURE
        for step in raw_build.get('steps', []):
            if run_web_tests_pattern.fullmatch(step['name']):
                summary = self._fetch_swarming_summary(step)
                shards = (summary or {}).get('shards', [])
                if any(map(_shard_interrupted, shards)):
                    return BuildStatus.INFRA_FAILURE
        return status

    def _fetch_swarming_summary(self,
                                step,
                                log_name: str = 'chromium_swarming.summary'):
        # TODO(crbug.com/342409114): Use swarming v2 API to fetch shard status
        # and exit codes, not the potentially unstable
        # `chromium_swarming.summary` log.
        for log in step.get('logs', []):
            if log['name'] == log_name:
                with contextlib.suppress(RequestException):
                    params = {'format': 'raw'}
                    return self._web.session.get(log['viewUrl'],
                                                 params=params).json()
        return None

    def fetch_or_trigger_try_jobs(
            self,
            builders: Collection[str],
            patchset: Optional[int] = None,
    ) -> BuildStatuses:
        """Fetch or trigger try jobs for the current CL.

        Arguments:
            builders: Try builder names.
            patchset: Patchset that build results should be fetched from.
                Defaults to the latest patchset that's not a trivial rebase or
                commit message edit.

        Raises:
            UnresolvedBuildException: If the CL issue number is not set or no
                try jobs are available but try jobs cannot be triggered.
        """
        issue_number = self._git_cl.get_issue_number()
        try:
            issue_number = int(issue_number)
        except ValueError as error:
            raise UnresolvedBuildException(
                'No issue number for current branch.') from error
        if not patchset:
            patchset = self.latest_nontrivial_patchset(issue_number)
        cl = CLRevisionID(int(issue_number), patchset)
        _log.info(f'Fetching status for {pluralize("build", len(builders))} '
                  f'from {cl}.')
        build_statuses = self._git_cl.latest_try_jobs(issue_number,
                                                      builder_names=builders,
                                                      patchset=patchset)
        if not build_statuses and not self._can_trigger_jobs:
            raise UnresolvedBuildException(
                "Aborted: no try jobs and '--no-trigger-jobs' or '--dry-run' "
                'passed.')

        builders_without_results = set(builders) - {
            build.builder_name
            for build in build_statuses
        }
        placeholder_status = BuildStatus.MISSING
        if self._can_trigger_jobs and builders_without_results:
            self._git_cl.trigger_try_jobs(builders_without_results)
            placeholder_status = BuildStatus.TRIGGERED
        for builder in builders_without_results:
            build_statuses[Build(builder)] = placeholder_status
        return build_statuses

    def latest_nontrivial_patchset(self, issue_number: int) -> int:
        output = OutputOption.ALL_REVISIONS | OutputOption.SKIP_DIFFSTAT
        cl = self._gerrit.query_cl(str(issue_number), output)
        # https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#revision-info
        for revision in sorted(cl.revisions.values(),
                               key=lambda rev: rev['_number'],
                               reverse=True):
            if revision.get('kind') == 'REWORK':
                return revision['_number']
        # This error shouldn't happen because the initial upload to Gerrit
        # should always be `REWORK`.
        raise UnresolvedBuildException(
            f'{CLRevisionID(issue_number)} has no nontrivial changes')

    def log_builds(self, build_statuses: BuildStatuses):
        """Log builds in a tabular format."""
        finished_builds = {
            build: status
            for build, status in build_statuses.items()
            if status in BuildStatus.COMPLETED
        }
        if len(finished_builds) == len(build_statuses):
            _log.info('All builds finished.')
            return
        if finished_builds:
            _log.info('Finished builds:')
            self._log_build_statuses(finished_builds)
        else:
            _log.info('No finished builds.')
        unfinished_builds = {
            build: status
            for build, status in build_statuses.items() if
            build not in finished_builds and status is not BuildStatus.MISSING
        }
        if unfinished_builds:
            _log.info('Scheduled or started builds:')
            self._log_build_statuses(unfinished_builds)

    def _log_build_statuses(self, build_statuses: BuildStatuses):
        assert build_statuses
        builder_names = [build.builder_name for build in build_statuses]
        # Clamp to a minimum width to visually separate the `BUILDER` and
        # `NUMBER` columns.
        name_column_width = max(20, *map(len, builder_names))
        status_column_width = max(
            len(status.name) for status in build_statuses.values())
        template = (f'  %-{name_column_width}s %-7s '
                    f'%-{status_column_width}s %-6s')
        _log.info(template, 'BUILDER', 'NUMBER', 'STATUS', 'BUCKET')
        for build in sorted(build_statuses, key=_build_sort_key):
            _log.info(template, build.builder_name,
                      str(build.build_number or '--'),
                      build_statuses[build].name, build.bucket)


def _build_sort_key(build: Build) -> Tuple[str, int]:
    return (build.builder_name, build.build_number or 0)


def _shard_interrupted(shard) -> bool:
    if shard.get('state') not in {'COMPLETED', 'DEDUPED'}:
        return True
    return int(shard.get('exit_code', 0)) in exit_codes.ERROR_CODES