# Copyright (c) 2009, 2010, 2011 Google Inc. All rights reserved.
# Copyright (c) 2009 Apple Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import enum
import logging
import re
import os
from typing import List, Mapping, NamedTuple, Optional, Union
from blinkpy.common.memoized import memoized
from blinkpy.common.system.executive import Executive, ScriptError
from blinkpy.common.system.filesystem import FileSystem
_log = logging.getLogger(__name__)
class CommitRange(NamedTuple):
start: str
end: str
def __str__(self) -> str:
return f'{self.start}...{self.end}'
class FileStatusType(enum.Flag):
ADD = enum.auto()
COPY = enum.auto()
DELETE = enum.auto()
MODIFY = enum.auto()
RENAME = enum.auto()
def __str__(self) -> str:
return ''.join(status.name[0] for status in FileStatusType
if status & self)
@classmethod
def parse_diff_filter(cls, pattern: str) -> 'FileStatusType':
"""Parse a parameter to `git diff --diff-filter` [0].
[0]: https://git-scm.com/docs/git-diff#Documentation/git-diff.txt---diff-filterACDMRTUXB82308203
"""
status_by_symbol = {member.name[0]: member for member in cls}
status = FileStatusType(0)
for symbol in pattern:
status |= status_by_symbol[symbol]
return status
class FileStatus(NamedTuple):
status_type: FileStatusType
# Source path for copied and renamed files. Ignored for other files.
source: Optional[str] = None
class Git:
# Unless otherwise specified, methods are expected to return paths relative
# to self.checkout_root.
# Git doesn't appear to document error codes, but seems to return
# 1 or 128, mostly.
ERROR_FILE_IS_MISSING = 128
DEFAULT_DIFF_FILTER = (FileStatusType.ADD | FileStatusType.DELETE
| FileStatusType.MODIFY)
def __init__(self,
cwd=None,
executive=None,
filesystem=None,
platform=None):
self._executive = executive or Executive()
self._filesystem = filesystem or FileSystem()
self._executable_name = self.find_executable_name(
self._executive, platform)
self.cwd = cwd or self._filesystem.abspath(self._filesystem.getcwd())
if not self.in_working_directory(self.cwd):
module_directory = self._filesystem.abspath(
self._filesystem.dirname(
self._filesystem.path_to_module(self.__module__)))
_log.info(
'The current directory (%s) is not in a git repo, trying directory %s.',
cwd, module_directory)
if self.in_working_directory(module_directory):
self.cwd = module_directory
_log.error('Failed to find Git repo for %s or %s', cwd,
module_directory)
self.checkout_root = self.find_checkout_root(self.cwd)
@staticmethod
def find_executable_name(executive, platform):
"""Finds the git executable name which may be different on Windows.
The Win port uses the depot_tools package, which contains a number
of development tools, including Python and git. Instead of using a
real git executable, depot_tools indirects via a batch file, called
"git.bat". This batch file is used because it allows depot_tools to
auto-update the real git executable, which is contained in a
subdirectory.
FIXME: This is a hack and should be resolved in a different way if
possible.
"""
if not platform or not platform.is_win():
return 'git'
try:
executive.run_command(['git', 'help'], debug_logging=False)
return 'git'
except OSError:
_log.debug('Using "git.bat" as git executable.')
return 'git.bat'
def run(self,
command_args,
cwd=None,
stdin=None,
decode_output=True,
return_exit_code=False):
"""Invokes git with the given args."""
full_command_args = [self._executable_name] + command_args
cwd = cwd or self.checkout_root
return self._executive.run_command(full_command_args,
cwd=cwd,
input=stdin,
return_exit_code=return_exit_code,
decode_output=decode_output,
debug_logging=False)
def absolute_path(self, repository_relative_path):
"""Converts repository-relative paths to absolute paths."""
return self._filesystem.join(self.checkout_root,
repository_relative_path)
def in_working_directory(self, path):
return self._executive.run_command(
[self._executable_name, 'rev-parse', '--is-inside-work-tree'],
cwd=path,
error_handler=Executive.ignore_error,
debug_logging=False).rstrip() == 'true'
def find_checkout_root(self, path):
"""Returns the absolute path to the root of the repository."""
if os.getcwd().startswith('/google/cog/cloud'):
return os.getcwd()
return self.run(['rev-parse', '--show-toplevel'], cwd=path).strip()
@classmethod
def read_git_config(cls, key, cwd=None, executive=None):
# FIXME: This should probably use cwd=self.checkout_root.
# Pass --get-all for cases where the config has multiple values
# Pass the cwd if provided so that we can handle the case of running
# blink_tool.py outside of the working directory.
# FIXME: This should use an Executive.
executive = executive or Executive()
return executive.run_command(
[cls.executable_name, 'config', '--get-all', key],
error_handler=Executive.ignore_error,
cwd=cwd).rstrip('\n')
def has_working_directory_changes(self, pathspec=None):
"""Checks whether there are uncommitted changes."""
command = ['diff', 'HEAD', '--no-renames', '--name-only']
if pathspec:
command.extend(['--', pathspec])
output = self.run(command)
if output != '':
_log.error('Has working directory changes:\n%s', output)
return True
return False
def uncommitted_changes(self):
"""List files with uncommitted changes, including untracked files."""
return [path for _, _, path in self._working_changes()]
def unstaged_changes(self):
"""Lists files with unstaged changes, including untracked files.
Returns a dict mapping modified file paths (relative to checkout root)
to one-character codes identifying the change, e.g. 'M' for modified,
'D' for deleted, '?' for untracked.
"""
return {
path: unstaged_status
for _, unstaged_status, path in self._working_changes()
if unstaged_status
}
def _working_changes(self):
# `git status -z` is a version of `git status -s`, that's recommended
# for machine parsing. Lines are terminated with NUL rather than LF.
change_lines = self.run(
['status', '-z', '--no-renames',
'--untracked-files=all']).rstrip('\x00')
if not change_lines:
return
for line in change_lines.split('\x00'):
assert len(line) >= 4, 'Unexpected change line format %s' % line
path = line[3:]
yield line[0].strip(), line[1].strip(), path
def add_list(self, paths: List[str], return_exit_code: bool = False):
return self._run_chunked(['add'],
paths,
return_exit_code=return_exit_code)
def delete_list(self, paths: List[str], ignore_unmatch: bool = False):
command = ['rm', '-f']
if ignore_unmatch:
command.append('--ignore-unmatch')
return self._run_chunked(command, paths)
def _run_chunked(self,
command: List[str],
paths: List[str],
chunk_size: int = 128,
**run_kwargs):
"""Safely run `git` operations on an arbitrary number of paths.
This helper transparently avoids command line length limitations on
Windows by splitting paths across multiple `git` invocations. This only
works for commands that can operate on a variable number of paths.
Arguments:
command: The non-path arguments after `git` but before the paths.
paths: The paths to operate on.
chunk_size: The maximum number of paths to operate on at a time. The
default was picked heuristically.
Returns:
The first truthy value returned by a `run` command. This is usually
stdout or a nonzero exit code.
"""
rv = 0
for chunk_start in range(0, len(paths), chunk_size):
chunk = paths[chunk_start:chunk_start + chunk_size]
rv = rv or self.run(command + chunk, **run_kwargs)
return rv
def move(self, origin, destination):
return self.run(['mv', '-f', origin, destination])
def exists(self, path: str) -> bool:
try:
self.show_blob(path, ref='HEAD')
except ScriptError as error:
return error.exit_code != self.ERROR_FILE_IS_MISSING
return True
def show_blob(self, path: str, ref: Optional[str] = None) -> bytes:
ref = ref or self._merge_base()
return self.run(['show', f'{ref}:{path}'], decode_output=False)
def _branch_from_ref(self, ref):
return ref.replace('refs/heads/', '')
def current_branch(self):
"""Returns the name of the current branch, or empty string if HEAD is detached."""
ref = self.run(['rev-parse', '--symbolic-full-name', 'HEAD']).strip()
if ref == 'HEAD':
# HEAD is detached; return an empty string.
return ''
return self._branch_from_ref(ref)
def current_revision(self):
"""Return the commit hash of HEAD."""
return self.run(['rev-parse', 'HEAD']).strip()
def new_branch(self, name: str, stack: bool = True):
"""Create and switch to a new branch.
Arguments:
stack: If true, track the current branch (if it exists). Otherwise,
track tip-of-tree (origin/main).
"""
if stack and self.current_branch():
self.run(['new-branch', '--upstream-current', name])
else:
self.run(['new-branch', name])
def _upstream_branch(self):
current_branch = self.current_branch()
return self._branch_from_ref(
self.read_git_config('branch.%s.merge' % current_branch,
cwd=self.checkout_root,
executive=self._executive).strip())
def _merge_base(self, git_commit=None):
if git_commit:
# Rewrite UPSTREAM to the upstream branch
if 'UPSTREAM' in git_commit:
upstream = self._upstream_branch()
if not upstream:
raise ScriptError(
message='No upstream/tracking branch set.')
git_commit = git_commit.replace('UPSTREAM', upstream)
# Special-case <refname>.. to include working copy changes, e.g., 'HEAD....' shows only the diffs from HEAD.
if git_commit.endswith('....'):
return git_commit[:-4]
if '..' not in git_commit:
git_commit = git_commit + '^..' + git_commit
return git_commit
return self._remote_merge_base()
def changed_files(
self,
commits: Union[None, str, CommitRange] = None,
diff_filter: Union[str, FileStatusType] = DEFAULT_DIFF_FILTER,
path: Optional[str] = None,
rename_threshold: Optional[float] = None,
) -> Mapping[str, FileStatus]:
if isinstance(commits, CommitRange):
commit_arg = str(commits)
else:
commit_arg = self._merge_base(commits)
status_command = [
'diff',
'-r',
'-z',
'--name-status',
'--no-ext-diff',
'--full-index',
f'--diff-filter={diff_filter}',
commit_arg,
]
if rename_threshold is None:
status_command.append('--no-renames')
else:
status_command.append(f'--find-renames={100 * rename_threshold}%')
if path:
status_command.append(path)
file_statuses = {}
raw_output = self.run(status_command)
if not raw_output:
return file_statuses
values = iter(raw_output.rstrip('\x00').split('\x00'))
while (status_type := next(values, None)) is not None:
status_type = FileStatusType.parse_diff_filter(status_type[0])
affected_file = next(values)
if status_type in FileStatusType.COPY | FileStatusType.RENAME:
file_statuses[next(values)] = FileStatus(
status_type, affected_file)
else:
file_statuses[affected_file] = FileStatus(status_type)
return file_statuses
def added_files(self):
return self._run_status_and_extract_filenames(self.status_command(),
self._status_regexp('A'))
def deleted_files(self):
return self._run_status_and_extract_filenames(self.status_command(),
self._status_regexp('D'))
def _run_status_and_extract_filenames(self, status_command, status_regexp):
filenames = []
# We run with cwd=self.checkout_root so that returned-paths are root-relative.
for line in self.run(status_command,
cwd=self.checkout_root).splitlines():
match = re.search(status_regexp, line)
if not match:
continue
filename = match.group('filename')
filenames.append(filename)
return filenames
def status_command(self):
# git status returns non-zero when there are changes, so we use git diff name --name-status HEAD instead.
# No file contents printed, thus utf-8 autodecoding in self.run is fine.
return ['diff', '--name-status', '--no-renames', 'HEAD']
def _status_regexp(self, expected_types):
return '^(?P<status>[%s])\t(?P<filename>.+)$' % expected_types
def display_name(self):
return 'git'
def most_recent_log_matching(self,
grep_str: str,
path: Optional[str] = None,
commits: Union[None, str, CommitRange] = None,
format_pattern: Optional[str] = None) -> str:
"""Find and return the most recent commit message matching a pattern.
Arguments:
grep_str: A grep-style regular expression.
path: A path that matching commits should modify.
commits: A revision range to search, where:
* `None` searches the full history up to `HEAD` (inclusive).
* `str` searches the history up to that revision (inclusive).
* `CommitRange` searches between the explicit start (exclusive)
and end (inclusive) revisions.
format_pattern: How `git log` should format the message, if found.
"""
# We use '--grep=' + foo rather than '--grep', foo because
# git 1.7.0.4 (and earlier) didn't support the separate arg.
command = [
'log',
'-1',
f'--grep={grep_str}',
'--date=iso',
]
if format_pattern:
command.append(f'--format={format_pattern}')
if commits:
command.append(str(commits))
if path:
command.extend(['--', path])
return self.run(command)
def _commit_position_from_git_log(self, git_log):
match = re.search(
r"^\s*Cr-Commit-Position:.*@\{#(?P<commit_position>\d+)\}",
git_log, re.MULTILINE)
if not match:
return ''
return int(match.group('commit_position'))
def commit_position(self, path):
"""Returns the latest chromium commit position found in the checkout."""
git_log = self.most_recent_log_matching('Cr-Commit-Position:', path)
return self._commit_position_from_git_log(git_log)
def create_patch(self, git_commit=None, changed_files=None):
"""Returns a byte array (str) representing the patch file.
Patch files are effectively binary since they may contain
files of multiple different encodings.
"""
command = [
'diff',
'--binary',
'--no-color',
'--no-ext-diff',
'--full-index',
'-M',
'--src-prefix=a/',
'--dst-prefix=b/',
]
command += [self._merge_base(git_commit), '--']
if changed_files:
command += changed_files
return self.run(command, decode_output=False, cwd=self.checkout_root)
@memoized
def commit_position_from_git_commit(self, git_commit):
git_log = self.git_commit_detail(git_commit)
return self._commit_position_from_git_log(git_log)
def _branch_ref_exists(self, branch_ref):
return self.run(['show-ref', '--quiet', '--verify', branch_ref],
return_exit_code=True) == 0
def _remote_merge_base(self):
return self.run(['merge-base',
self._remote_branch_ref(), 'HEAD']).strip()
def _remote_branch_ref(self):
# Use references so that we can avoid collisions, e.g. we don't want to operate on refs/heads/trunk if it exists.
remote_main_ref = 'refs/remotes/origin/main'
if self._branch_ref_exists(remote_main_ref):
return remote_main_ref
error_msg = "Can't find a branch to diff against. %s does not exist" % remote_main_ref
raise ScriptError(message=error_msg)
def commit_locally_with_message(self, message):
command = ['commit', '--all', '-F', '-']
self.run(command, stdin=message)
def latest_git_commit(self):
return self.run(['log', '-1', '--format=%H']).strip()
def git_commits_since(self, commit):
return self.run(
['log', commit + '..master', '--format=%H', '--reverse']).split()
def git_commit_detail(self, commit, format=None): # pylint: disable=redefined-builtin
args = ['log', '-1', commit]
if format:
args.append('--format=' + format)
return self.run(args)