chromium/third_party/blink/tools/blinkpy/common/checkout/git_mock.py

# Copyright 2016 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import re
from typing import Mapping, NamedTuple, Optional, Union

from blinkpy.common.checkout.git import (
    CommitRange,
    FileStatus,
    FileStatusType,
    Git,
)
from blinkpy.common.system.executive import ScriptError
from blinkpy.common.system.executive_mock import MockExecutive
from blinkpy.common.system.filesystem_mock import MockFileSystem


class MockCommit(NamedTuple):
    message: str
    tree: Mapping[str, bytes]


class MockGit:

    # Arguments are listed below, even if they're unused, in order to match
    # the Git class. pylint: disable=unused-argument

    def __init__(self,
                 cwd=None,
                 filesystem=None,
                 executive=None,
                 platform=None):
        self.checkout_root = '/mock-checkout'
        self.cwd = cwd or self.checkout_root
        self.added_paths = set()
        self._filesystem = filesystem or MockFileSystem()
        self._staging = dict(self._filesystem.files)
        self._executive = executive or MockExecutive()
        self._executable_name = 'git'
        self._local_commits = []
        self._current_branch = 'mock-branch-name'
        self.tracking_branch = 'origin/main'
        self._branch_positions = {}

    def run(self,
            command_args,
            cwd=None,
            stdin=None,
            decode_output=True,
            return_exit_code=False):
        full_command_args = [self._executable_name] + command_args
        cwd = cwd or self.checkout_root
        return self._executive.run_command(
            full_command_args,
            cwd=cwd,
            input=stdin,
            return_exit_code=return_exit_code,
            decode_output=decode_output)

    def add(self, destination_path, return_exit_code=False):
        self.add_list([destination_path], return_exit_code)

    def add_list(self, destination_paths, return_exit_code=False):
        file_paths = []
        for path in destination_paths:
            if self._filesystem.isfile(path):
                file_paths.append(path)
            else:
                file_paths.extend(self._filesystem.files_under(path))
        for path in file_paths:
            self._staging[path] = self._filesystem.read_binary_file(path)
        self.added_paths.update(set(destination_paths))
        if return_exit_code:
            return 0

    def has_working_directory_changes(self, pathspec=None):
        if not self._local_commits:
            return False
        assert not pathspec, "fake doesn't support pathspec currently"
        return self._local_commits[-1].tree != self._filesystem.files

    def current_branch(self):
        return self._current_branch

    def new_branch(self, name: str, stack: bool = True):
        assert stack, 'fake can only support stacking currently'
        assert name not in {self._current_branch, *self._branch_positions}
        last = len(self._local_commits) - 1
        self._branch_positions[self._current_branch] = last
        self.tracking_branch = self._current_branch
        self._current_branch = name

    def exists(self, path):
        return True

    def show_blob(self, path: str, ref: Optional[str] = None) -> bytes:
        commit = self._local_commits[self._get_commit_position(ref)]
        try:
            return commit.tree[self.absolute_path(path)]
        except KeyError:
            raise ScriptError

    def absolute_path(self, *comps):
        return self._filesystem.join(self.checkout_root, *comps)

    def commit_position(self, path):
        return 5678

    def commit_position_from_git_commit(self, git_commit):
        if git_commit == '6469e754a1':
            return 1234
        if git_commit == '624c3081c0':
            return 5678
        if git_commit == '624caaaaaa':
            return 10000
        return None

    def commit_locally_with_message(self, message):
        self._local_commits.append(MockCommit(message, dict(self._staging)))

    def most_recent_log_matching(self,
                                 grep_str: str,
                                 path: Optional[str] = None,
                                 commits: Union[None, str, CommitRange] = None,
                                 format_pattern: Optional[str] = None) -> str:
        start, end = 0, len(self._local_commits)
        if isinstance(commits, str):
            end = self._get_commit_position(commits) + 1
        elif isinstance(commits, CommitRange):
            # Exclude the start, include the end.
            start = self._get_commit_position(commits.start) + 1
            end = self._get_commit_position(commits.end) + 1

        for position in reversed(range(start, end)):
            commit = self._local_commits[position]
            if re.search(grep_str, commit.message):
                # See https://git-scm.com/docs/pretty-formats for the complete
                # list.
                format_specifiers = {
                    # The mock SHA-1 commit hash is simply the position as hex.
                    'H': hex(position)[2:].zfill(40),
                    's': commit.message.splitlines()[0],
                }
                return re.sub(
                    '%(?P<specifier>[a-zA-Z])',
                    lambda match: format_specifiers[match['specifier']],
                    format_pattern) + '\n'
        return ''

    def local_commits(self):
        """Returns the internal recording of commits made via |commit_locally_with_message|.

        This is a testing convenience method; commits are formatted as:
          [ message, commit_all_working_directory_changes, author ].
        """
        return [[commit.message] for commit in self._local_commits]

    def delete(self, path):
        return self.delete_list([path])

    def delete_list(self, paths, ignore_unmatch: bool = False):
        if not self._filesystem:
            return
        for path in paths:
            self._staging.pop(path, None)
            if self._filesystem.exists(path):
                self._filesystem.remove(path)

    def move(self, origin, destination):
        if self._filesystem:
            self._filesystem.move(
                self.absolute_path(origin), self.absolute_path(destination))

    def changed_files(
        self,
        commits: Union[None, str, CommitRange] = None,
        diff_filter: Union[str, FileStatusType] = Git.DEFAULT_DIFF_FILTER,
        path: Optional[str] = None,
        rename_threshold: Optional[float] = None,
    ) -> Mapping[str, FileStatus]:
        if not self._local_commits:
            return []
        if isinstance(commits, CommitRange):
            start_pos = self._get_commit_position(commits.start)
            end_pos = self._get_commit_position(commits.end)
            files_before = self._local_commits[start_pos].tree
            files_after = self._local_commits[end_pos].tree
        else:
            # Pretend this branch is tracking the first commit.
            files_before = self._local_commits[0].tree
            files_after = self._filesystem.files

        if isinstance(diff_filter, str):
            diff_filter = FileStatusType.parse_diff_filter(diff_filter)

        changed_files = {}
        for path in sorted(set(files_before) | set(files_after)):
            before, after = files_before.get(path), files_after.get(path)
            if before == after:
                continue
            elif before is None:
                status = FileStatusType.ADD
            elif after is None:
                status = FileStatusType.DELETE
            else:
                status = FileStatusType.MODIFY
            if status & diff_filter:
                path_from_checkout_root = self._filesystem.relpath(
                    path, self.checkout_root)
                changed_files[path_from_checkout_root] = FileStatus(status)
        return changed_files

    def _get_commit_position(self, ref: str) -> int:
        if ref == '@{u}':
            return self._branch_positions[self.tracking_branch]
        match = re.fullmatch(
            r'(?P<base>HEAD|[\da-fA-F]{40})(~(?P<offset>\d+))?', ref)
        if not match:
            raise NotImplementedError(
                'only the `(HEAD|<sha1>)(~<n>)?` syntax is supported')
        if match['base'] == 'HEAD':
            base_position = len(self._local_commits) - 1
        else:
            base_position = int(match['base'], 16)
        offset_from_base = int(match['offset'] or 0)
        return base_position - offset_from_base

    def unstaged_changes(self):
        return {}

    def uncommitted_changes(self):
        return []