chromium/third_party/blink/tools/blinkpy/w3c/gerrit.py

# Copyright 2017 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import base64
import enum
import json
import logging
from datetime import datetime
from requests.exceptions import HTTPError
from typing import Iterator, List, Mapping, Optional, Tuple
from urllib.parse import urlencode, urlsplit, urlunsplit, quote

from blinkpy.common.host import Host
from blinkpy.common.net.network_transaction import NetworkTimeout
from blinkpy.common.path_finder import RELATIVE_WPT_TESTS
from blinkpy.common.net.git_cl import CLRevisionID
from blinkpy.w3c.chromium_commit import ChromiumCommit
from blinkpy.w3c.common import is_file_exportable

_log = logging.getLogger(__name__)
URL_BASE = urlsplit('https://chromium-review.googlesource.com')


class OutputOption(enum.Flag):
    """A mask denoting what data Gerrit should return in a query.

    See [0] for the full list of options, which should be added here as needed.

    [0]: https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#query-options
    """
    CURRENT_FILES = enum.auto()
    CURRENT_REVISION = enum.auto()
    COMMIT_FOOTERS = enum.auto()
    DETAILED_ACCOUNTS = enum.auto()
    MESSAGES = enum.auto()
    ALL_REVISIONS = enum.auto()
    SKIP_DIFFSTAT = enum.auto()

    def __iter__(self) -> Iterator['OutputOption']:
        # TODO(crbug.com/40631540): Remove this handcrafted `__iter__` after
        # python3.11+ when `enum.Flag` instances become iterable over their
        # members.
        for option in self.__class__:
            if option in self:
                yield option


class GerritAPI:
    """A utility class for the Chromium code review API.

    Wraps the API for Chromium's Gerrit instance at chromium-review.googlesource.com.
    """

    DEFAULT_OUTPUT = (OutputOption.CURRENT_FILES
                      | OutputOption.CURRENT_REVISION
                      | OutputOption.COMMIT_FOOTERS
                      | OutputOption.DETAILED_ACCOUNTS)

    def __init__(self,
                 host,
                 user: Optional[str] = None,
                 token: Optional[str] = None):
        self.host = host
        self.project_config = host.project_config
        # Authentication is only needed for mutating CLs (e.g., commenting).
        self.user = user
        self.token = token

    @classmethod
    def from_credentials(cls, host: Host,
                         credentials: Mapping[str, str]) -> 'GerritAPI':
        return cls(host, credentials.get('GERRIT_USER'),
                   credentials.get('GERRIT_TOKEN'))

    def get(self,
            path: str,
            query_params: List[Tuple[str, str]],
            raw: bool = False,
            return_none_on_404: bool = False):
        query_str = urlencode(query_params, safe='":')
        url = urlunsplit(
            (URL_BASE.scheme, URL_BASE.netloc, path, query_str, ''))
        raw_data = self.host.web.get_binary(
            url, return_none_on_404=return_none_on_404, trace='b346392205')
        if raw:
            return raw_data

        if not raw_data:
            return None

        # Gerrit API responses are prefixed by a 5-character JSONP preamble
        return json.loads(raw_data[5:])

    def post(self, path, data):
        """Sends a POST request to path with data as the JSON payload.

        The path has to be prefixed with '/a/':
        https://gerrit-review.googlesource.com/Documentation/rest-api.html#authentication
        """
        assert path.startswith('/a/'), \
            'POST requests need to use authenticated routes.'
        url = urlunsplit((URL_BASE.scheme, URL_BASE.netloc, path, '', ''))
        assert self.user and self.token, 'Gerrit user and token required for authenticated routes.'

        b64auth = base64.b64encode('{}:{}'.format(self.user,
                                                  self.token).encode('utf-8'))
        headers = {
            'Authorization': 'Basic {}'.format(b64auth.decode('utf-8')),
            'Content-Type': 'application/json',
        }
        return self.host.web.request('POST',
                                     url,
                                     data=json.dumps(data).encode('utf-8'),
                                     headers=headers)

    def query_cl_comments_and_revisions(self, change_id: str) -> 'GerritCL':
        """Queries a CL with comments and revisions information."""
        return self.query_cl(
            change_id, OutputOption.MESSAGES | OutputOption.ALL_REVISIONS)

    def query_cl(
        self,
        change_id: str,
        output_options: OutputOption = DEFAULT_OUTPUT,
    ) -> 'GerritCL':
        """Queries a commit information from Gerrit."""
        # Gerrit can uniquely identify CLs by number (i.e., crrev.com/c/<n>) or
        # by `Change-Id` commit footer (i.e., a hex string prepended by "I"):
        # https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#change-id
        change_id_parts = [self.escaped_repo, change_id]
        if not change_id.isdigit():
            change_id_parts.insert(1, self.project_config.gerrit_branch)
        path = '/changes/' + '~'.join(change_id_parts)
        query_params = [('o', option.name) for option in output_options]
        try:
            cl_data = self.get(path, query_params, return_none_on_404=True)
        except NetworkTimeout:
            raise GerritError('Timed out querying CL using Change-Id')

        if not cl_data:
            raise GerritNotFoundError('Cannot find Change-Id')
        cl = GerritCL(data=cl_data, api=self)
        return cl

    def query_cls(
        self,
        query: str,
        limit: int = 500,
        output_options: OutputOption = DEFAULT_OUTPUT,
    ) -> List['GerritCL']:
        """Query Gerrit for CLs that match the given criteria.

        Arguments:
            query: The search criteria written in the syntax given by [0].
            limit: The maximum number of CLs to fetch.
            output_options: Fields to return (see `OutputOption`).

        [0]: https://gerrit-review.googlesource.com/Documentation/user-search.html
        """
        assert limit > 0
        query_params = [('q', query), ('n', str(limit))]
        query_params.extend(('o', option.name) for option in output_options)
        # The underlying host.web.get_binary() automatically retries until it
        # times out, at which point NetworkTimeout is raised.
        try:
            raw_cls = self.get('/changes/', query_params)
        except NetworkTimeout:
            raise GerritError('Timed out querying exportable open CLs.')
        return [GerritCL(data, self) for data in raw_cls]

    def query_exportable_cls(
        self,
        limit: int = 200,
        output_options: OutputOption = DEFAULT_OUTPUT,
    ) -> List['GerritCL']:
        query = ' '.join([
            f'project:"{self.project_config.gerrit_project}"',
            f'branch:{self.project_config.gerrit_branch}',
            'is:submittable',
            '-is:wip',
        ])
        open_cls = self.query_cls(query, limit, output_options)
        return [cl for cl in open_cls if cl.is_exportable()]

    @property
    def escaped_repo(self):
        return quote(self.project_config.gerrit_project, safe='')


class GerritCL(object):
    """A data wrapper for a Chromium Gerrit CL."""

    def __init__(self, data, api):
        assert data['change_id']
        self._data = data
        self.api = api

    @property
    def number(self):
        return self._data['_number']

    @property
    def url(self):
        return '{}/{}'.format(urlunsplit(URL_BASE), self.number)

    @property
    def subject(self):
        return self._data['subject']

    @property
    def change_id(self):
        return self._data['change_id']

    @property
    def id(self):
        branch = self.api.project_config.gerrit_branch
        return f"{self.api.escaped_repo}~{branch}~{self.change_id}"

    @property
    def owner_email(self):
        return self._data['owner']['email']

    @property
    def current_revision_sha(self):
        return self._data['current_revision']

    @property
    def current_revision(self):
        return self._data['revisions'][self.current_revision_sha]

    @property
    def current_revision_id(self) -> CLRevisionID:
        patchset = int(self.current_revision['_number'])
        return CLRevisionID(self.number, patchset)

    @property
    def has_review_started(self):
        return self._data.get('has_review_started')

    @property
    def current_revision_description(self):
        # A patchset may have no description.
        return self.current_revision.get('description', '')

    @property
    def status(self):
        return self._data['status']

    @property
    def updated(self):
        # Timestamps are given in UTC and have the format "'yyyy-mm-dd hh:mm:ss.fffffffff'"
        # where "'ffffffffff'" represents nanoseconds.
        return datetime.strptime(self._data["updated"][:-3] + " +0000",
                                 "%Y-%m-%d %H:%M:%S.%f %z")

    @property
    def messages(self):
        return self._data['messages']

    @property
    def revisions(self):
        return self._data['revisions']

    def post_comment(self, message):
        """Posts a comment to the CL."""
        path = '/a/changes/{id}/revisions/current/review'.format(id=self.id)
        try:
            return self.api.post(path, {'message': message})
        except HTTPError as e:
            message = 'Failed to post a comment to issue {}'.format(
                self.change_id)
            if hasattr(e, 'response'):
                message += ' (code {})'.format(e.response.status_code)
            else:
                message += ' (error {})'.format(e.response.status_code)
            raise GerritError(message)

    def is_exportable(self):
        # TODO(robertma): Consolidate with the related part in chromium_exportable_commits.py.

        try:
            files = list(self.current_revision['files'].keys())
        except KeyError:
            # Empty (deleted) CL is not exportable.
            return False

        # Guard against accidental CLs that touch thousands of files.
        if len(files) > 1000:
            _log.info('Rejecting CL with over 1000 files: %s (ID: %s) ',
                      self.subject, self.change_id)
            return False

        if 'No-Export: true' in self.current_revision['commit_with_footers']:
            return False

        if 'NOEXPORT=true' in self.current_revision['commit_with_footers']:
            return False

        files_in_wpt = [f for f in files if f.startswith(RELATIVE_WPT_TESTS)]
        if not files_in_wpt:
            return False

        exportable_files = [
            f for f in files_in_wpt
            if is_file_exportable(f, self.api.project_config)
        ]

        if not exportable_files:
            return False

        return True

    def fetch_current_revision_commit(self, host):
        """Fetches the git commit for the latest revision of CL.

        This method fetches the commit corresponding to the latest revision of
        CL to local Chromium repository, but does not checkout the commit to the
        working tree. All changes in the CL are squashed into this one commit,
        regardless of how many revisions have been uploaded.

        Args:
            host: A Host object for git invocation.

        Returns:
            A ChromiumCommit object (the fetched commit).
        """
        git = host.git(host.project_config.project_root)
        url = self.current_revision['fetch']['http']['url']
        ref = self.current_revision['fetch']['http']['ref']
        git.run(['fetch', url, ref])
        sha = git.run(['rev-parse', 'FETCH_HEAD']).strip()
        return ChromiumCommit(host, sha=sha)


class GerritError(Exception):
    """Raised when Gerrit returns a non-OK response or times out."""
    pass


class GerritNotFoundError(GerritError):
    """Raised when Gerrit returns a resource not found response."""
    pass