# Copyright (C) 2009 Google Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import contextlib
import errno
import hashlib
import io
import os
import re
import unittest
from unittest.mock import patch
import six
from blinkpy.common.system.filesystem import _remove_contents, _sanitize_filename
_TEXT_ENCODING = 'utf-8'
def _ensure_binary_contents(file_contents):
# Iterate over a copy while the underlying mapping is mutated.
for path, contents in list(file_contents.items()):
if contents is not None:
contents = six.ensure_binary(contents, _TEXT_ENCODING)
file_contents[path] = contents
class MockFileSystem(object):
# pylint: disable=unused-argument
sep = '/'
pardir = '..'
def __init__(self, files=None, dirs=None, cwd='/'):
"""Initializes a "mock" filesystem that can be used to replace the
FileSystem class in tests.
Args:
files: A dictionary of filenames to file contents. A file contents
value of None indicates that the file does not exist.
"""
self.files = files or {}
self.executable_files = set()
self.written_files = {}
self.last_tmpdir = None
self.current_tmpno = 0
self.cwd = cwd
self.dirs = set(dirs or [])
self.dirs.add(cwd)
_ensure_binary_contents(self.files)
for file_path in self.files:
directory = self.dirname(file_path)
while directory not in self.dirs:
self.dirs.add(directory)
directory = self.dirname(directory)
def clear_written_files(self):
# This function can be used to track what is written between steps in a test.
self.written_files = {}
def _raise_not_found(self, path):
raise FileNotFoundError('%s: %s' % (os.strerror(errno.ENOENT), path))
def _split(self, path):
# This is not quite a full implementation of os.path.split; see:
# http://docs.python.org/library/os.path.html#os.path.split
if self.sep in path:
return path.rsplit(self.sep, 1)
return ('', path)
def make_executable(self, file_path):
self.executable_files.add(file_path)
def abspath(self, path):
if os.path.isabs(path):
return self.normpath(path)
return self.abspath(self.join(self.cwd, path))
def realpath(self, path):
return self.abspath(path)
def basename(self, path):
return self._split(path)[1]
def expanduser(self, path):
if path[0] != '~':
return path
parts = path.split(self.sep, 1)
home_directory = self.sep + 'Users' + self.sep + 'mock'
if len(parts) == 1:
return home_directory
return home_directory + self.sep + parts[1]
def path_to_module(self, module_name):
return ('/mock-checkout/third_party/blink/tools/' +
module_name.replace('.', '/') + '.py')
def chdir(self, path):
path = self.normpath(path)
if not self.isdir(path):
raise OSError(errno.ENOENT, path, os.strerror(errno.ENOENT))
self.cwd = path
def copyfile(self, source, destination):
if not self.exists(source):
self._raise_not_found(source)
if self.isdir(source):
raise IOError(errno.EISDIR, source, os.strerror(errno.EISDIR))
if self.isdir(destination):
raise IOError(errno.EISDIR, destination, os.strerror(errno.EISDIR))
if not self.exists(self.dirname(destination)):
raise IOError(errno.ENOENT, destination, os.strerror(errno.ENOENT))
self.files[destination] = self.files[source]
self.written_files[destination] = self.files[source]
def dirname(self, path):
return self._split(path)[0]
def exists(self, path):
return self.isfile(path) or self.isdir(path)
def files_under(self, path, dirs_to_skip=None, file_filter=None):
dirs_to_skip = dirs_to_skip or []
filter_all = lambda fs, dirpath, basename: True
file_filter = file_filter or filter_all
files = []
if self.isfile(path):
if (file_filter(self, self.dirname(path), self.basename(path))
and self.files[path] is not None):
files.append(path)
return files
if self.basename(path) in dirs_to_skip:
return []
if not path.endswith(self.sep):
path += self.sep
dir_substrings = [self.sep + d + self.sep for d in dirs_to_skip]
for filename in self.files:
if not filename.startswith(path):
continue
suffix = filename[len(path) - 1:]
if any(dir_substring in suffix
for dir_substring in dir_substrings):
continue
dirpath, basename = self._split(filename)
if (file_filter(self, dirpath, basename)
and self.files[filename] is not None):
files.append(filename)
return files
def getcwd(self):
return self.cwd
def glob(self, glob_string):
# FIXME: This handles '*', but not '?', '[', or ']'.
glob_string = re.escape(glob_string)
# Allow zero directories (e.g., `a/**/b` matches `a/b`).
glob_string = glob_string.replace('/\\*\\*', '(|/.*)')
glob_string = glob_string.replace('\\*', '[^\\/]*')
glob_string = glob_string.replace('\\/', '/')
path_filter = lambda path: re.fullmatch(glob_string, path)
# We could use fnmatch.fnmatch, but that might not do the right thing on Windows.
existing_files = [
path for path, contents in self.files.items()
if contents is not None
]
yield from filter(path_filter, existing_files)
yield from filter(path_filter, self.dirs)
def isabs(self, path):
return path.startswith(self.sep)
def isfile(self, path):
return path in self.files and self.files[path] is not None
def isdir(self, path):
return self.normpath(path) in self.dirs
def _slow_but_correct_join(self, comp, *comps):
return re.sub(re.escape(os.path.sep), self.sep,
os.path.join(comp, *comps))
def join(self, *comps):
# The real `os.path.join` accepts both strings and bytes:
# (*bytes) -> bytes
# (*str) -> str
# Record what type the caller originally passed, perform the join with
# text strings, then coerce the return value to the original argument
# type.
binary_mode = all(isinstance(comp, bytes) for comp in comps)
# This function is called a lot, so we optimize it; there are
# unit tests to check that we match _slow_but_correct_join(), above.
path = ''
sep = self.sep
for comp in comps:
if not comp:
continue
comp = six.ensure_text(comp)
if comp[0] == sep:
path = comp
continue
if path:
path += sep
path += comp
if six.ensure_text(comps[-1]) == '' and path:
path += '/'
path = path.replace(sep + sep, sep)
return path.encode() if binary_mode else path
def listdir(self, path):
_, directories, files = list(self.walk(path))[0]
return directories + files
def walk(self, top):
sep = self.sep
if not self.isdir(top):
raise OSError('%s is not a directory' % top)
if not top.endswith(sep):
top += sep
directories = []
files = []
for file_path in self.files:
if self.exists(file_path) and file_path.startswith(top):
remaining = file_path[len(top):]
if sep in remaining:
directory = remaining[:remaining.index(sep)]
if directory not in directories:
directories.append(directory)
else:
files.append(remaining)
# The real `os.walk(...)` [0] gives the caller a chance to modify which
# subdirectories to traverse by mutating the `directories` list, so we
# should yield here instead of returning a precomputed list.
#
# [0]: https://docs.python.org/3/library/os.html#os.walk
yield (top[:-1], directories, files)
for directory in directories:
directory = top + directory
yield from self.walk(directory)
def mtime(self, path):
if self.exists(path):
return 0
self._raise_not_found(path)
def mktemp(self, suffix='', prefix='tmp', dir=None, **_): # pylint: disable=redefined-builtin
if dir is None:
dir = self.sep + '__im_tmp'
curno = self.current_tmpno
self.current_tmpno += 1
self.last_tmpdir = self.join(dir, '%s_%u_%s' % (prefix, curno, suffix))
return self.last_tmpdir
def mkdtemp(self, **kwargs):
class TemporaryDirectory(object):
def __init__(self, fs, **kwargs):
self._kwargs = kwargs
self._filesystem = fs
self._directory_path = fs.mktemp(**kwargs) # pylint: disable=protected-access
fs.maybe_make_directory(self._directory_path)
def __str__(self):
return self._directory_path
def __enter__(self):
return self._directory_path
def __exit__(self, exception_type, exception_value, traceback):
# Only self-delete if necessary.
# FIXME: Should we delete non-empty directories?
if self._filesystem.exists(self._directory_path):
self._filesystem.rmtree(self._directory_path)
return TemporaryDirectory(fs=self, **kwargs)
def maybe_make_directory(self, *path):
norm_path = self.normpath(self.join(*path))
while norm_path and not self.isdir(norm_path):
self.dirs.add(norm_path)
norm_path = self.dirname(norm_path)
def move(self, source, destination):
if not self.exists(source):
self._raise_not_found(source)
if self.isfile(source):
self.files[destination] = self.files[source]
self.written_files[destination] = self.files[destination]
self.files[source] = None
self.written_files[source] = None
return
self.copytree(source, destination)
self.rmtree(source)
def _slow_but_correct_normpath(self, path):
return re.sub(re.escape(os.path.sep), self.sep, os.path.normpath(path))
def normpath(self, path):
# This function is called a lot, so we try to optimize the common cases
# instead of always calling _slow_but_correct_normpath(), above.
if '..' in path or '/./' in path:
# This doesn't happen very often; don't bother trying to optimize it.
return self._slow_but_correct_normpath(path)
if not path:
return '.'
if path == '/':
return path
if path == '/.':
return '/'
if path.endswith('/.'):
return path[:-2]
if path.endswith('/'):
return path[:-1]
return path
def open_binary_tempfile(self, suffix=''):
path = self.mktemp(suffix)
return self.open_binary_file_for_writing(path), path
def open_binary_file_for_reading(self, path):
if self.files.get(path) is None:
self._raise_not_found(path)
return BufferedReader(WriteThroughBinaryFile(self, path))
def open_binary_file_for_writing(self, path):
self.files[path] = b''
return WriteThroughBinaryFile(self, path)
def read_binary_file(self, path):
maybe_contents = self.files.get(path)
if maybe_contents is None:
self._raise_not_found(path)
return maybe_contents
def write_binary_file(self, path, contents):
# FIXME: should this assert if dirname(path) doesn't exist?
self.maybe_make_directory(self.dirname(path))
self.files[path] = contents
self.written_files[path] = contents
def open_text_tempfile(self, suffix=''):
path = self.mktemp(suffix)
return self.open_text_file_for_writing(path), path
def open_text_file_for_reading(self, path):
return TextIOWrapper(self.open_binary_file_for_reading(path))
def open_text_file_for_writing(self, path):
return TextIOWrapper(self.open_binary_file_for_writing(path))
def open_text_file_for_appending(self, path):
self.files.setdefault(path, b'')
file_handle = TextIOWrapper(WriteThroughBinaryFile(self, path))
file_handle.seek(0, io.SEEK_END)
return file_handle
def read_text_file(self, path):
return self.read_binary_file(path).decode(_TEXT_ENCODING)
def write_text_file(self, path, contents):
return self.write_binary_file(path, contents.encode(_TEXT_ENCODING))
def sha1(self, path):
contents = self.read_binary_file(path)
return hashlib.sha1(contents).hexdigest()
def relpath(self, path, start='.'):
# Since os.path.relpath() calls os.path.normpath()
# (see http://docs.python.org/library/os.path.html#os.path.abspath )
# it also removes trailing slashes and converts forward and backward
# slashes to the preferred slash os.sep.
start = self.abspath(start)
path = self.abspath(path)
common_root = start
dot_dot = ''
while not common_root == '':
if path.startswith(common_root):
break
common_root = self.dirname(common_root)
dot_dot += '..' + self.sep
rel_path = path[len(common_root):]
if not rel_path:
return '.'
if rel_path[0] == self.sep:
# It is probably sufficient to remove just the first character
# since os.path.normpath() collapses separators, but we use
# lstrip() just to be sure.
rel_path = rel_path.lstrip(self.sep)
elif not common_root == '/':
# We are in the case typified by the following example:
# path = "/tmp/foobar", start = "/tmp/foo" -> rel_path = "bar"
common_root = self.dirname(common_root)
dot_dot += '..' + self.sep
rel_path = path[len(common_root) + 1:]
return dot_dot + rel_path
def remove(self, path, retry=True):
if self.files.get(path) is None:
self._raise_not_found(path)
self.files[path] = None
self.written_files[path] = None
def rmtree(self, path_to_remove, ignore_errors=True, onerror=None):
path_to_remove = self.normpath(path_to_remove)
for file_path in self.files:
# We need to add a trailing separator to path_to_remove to avoid matching
# cases like path_to_remove='/foo/b' and file_path='/foo/bar/baz'.
if file_path == path_to_remove or file_path.startswith(
path_to_remove + self.sep):
self.files[file_path] = None
def should_remove(directory):
return directory == path_to_remove or directory.startswith(
path_to_remove + self.sep)
self.dirs = {d for d in self.dirs if not should_remove(d)}
def remove_contents(self, dirname):
return _remove_contents(self, dirname, sleep=lambda *args, **kw: None)
def copytree(self, source, destination):
source = self.normpath(source)
destination = self.normpath(destination)
for source_file in list(self.files):
if source_file.startswith(source):
destination_path = self.join(destination,
self.relpath(source_file, source))
self.maybe_make_directory(self.dirname(destination_path))
self.files[destination_path] = self.files[source_file]
def split(self, path):
idx = path.rfind(self.sep)
if idx == -1:
return ('', path)
return (path[:idx], path[(idx + 1):])
def splitext(self, path):
idx = path.rfind('.')
if idx == -1:
idx = len(path)
return (path[0:idx], path[idx:])
def symlink(self, source, link_name):
raise NotImplementedError('Symlink not expected to be called in tests')
def sanitize_filename(self, filename, replacement='_'):
return _sanitize_filename(filename, replacement)
def _open_mock(self, filename, mode='r', **_kwargs):
"""A mock for Python's built-in `open` backed by this Blink FS."""
mode_match = re.match(r'([rwa])(b?)', mode)
open_func_map = {
('r', ''): self.open_text_file_for_reading,
('w', ''): self.open_text_file_for_writing,
('r', 'b'): self.open_binary_file_for_reading,
('w', 'b'): self.open_binary_file_for_writing,
}
return open_func_map[mode_match.groups()](filename)
@contextlib.contextmanager
def patch_builtins(self):
with contextlib.ExitStack() as stack:
stack.enter_context(patch('builtins.open', self._open_mock))
stack.enter_context(patch('os.sep', self.sep))
stack.enter_context(patch('os.path.sep', self.sep))
stack.enter_context(patch('os.path.abspath', self.abspath))
stack.enter_context(patch('os.path.relpath', self.relpath))
stack.enter_context(patch('os.path.join', self.join))
stack.enter_context(patch('os.path.isfile', self.isfile))
stack.enter_context(patch('os.path.isdir', self.isdir))
stack.enter_context(patch('os.path.exists', self.exists))
stack.enter_context(patch('os.makedirs',
self.maybe_make_directory))
stack.enter_context(patch('os.replace', self.move))
stack.enter_context(patch('os.unlink', self.remove))
stack.enter_context(
patch('tempfile.TemporaryFile',
lambda *args, **kwargs: self.open_text_tempfile()[0]))
stack.enter_context(
patch('tempfile.NamedTemporaryFile',
lambda *args, **kwargs: self.open_text_tempfile()[0]))
yield
class BufferedReader(io.BufferedReader):
def __init__(self, raw, **options):
super().__init__(raw, **options)
self.fs = raw.fs
class TextIOWrapper(io.TextIOWrapper):
def __init__(self,
raw,
encoding=_TEXT_ENCODING,
errors='replace',
newline='\n',
**options):
super().__init__(raw,
encoding=encoding,
errors=errors,
newline=newline,
**options)
self.fs = raw.fs
class WriteThroughBinaryFile(io.BytesIO):
def __init__(self, fs, name: str):
self.fs = fs
self.name = name
super().__init__(self.fs.files[self.name])
def write(self, buf):
amount_written = super().write(buf)
self.fs.files[self.name] += buf
self.fs.written_files[self.name] = self.fs.files[self.name]
return amount_written
def writelines(self, lines):
super().writelines(lines)
contents = b''.join(lines)
self.fs.files[self.name] = contents
self.fs.written_files[self.name] = contents
def truncate(self, size=None):
new_size = super().truncate(size)
self.fs.files[self.name] = self.getvalue()
return new_size
class FileSystemTestCase(unittest.TestCase):
# pylint: disable=invalid-name
# Use assertFilesAdded to be consistent with unittest.
class _AssertFilesAddedContext(object):
"""Internal class used by FileTestCase.assertFilesAdded()."""
def __init__(self, test_case, mock_filesystem, expected_files):
self.test_case = test_case
self.mock_filesystem = mock_filesystem
self.expected_files = expected_files
_ensure_binary_contents(self.expected_files)
def __enter__(self):
# Make sure that the expected_files aren't already in the mock
# file system.
for filepath in self.expected_files:
assert filepath not in self.mock_filesystem.files, "%s was already in mock file system (%r)" % (
filepath, self.mock_filesystem.files)
return self
def __exit__(self, exc_type, exc_value, tb):
# Exception already occurring, just exit.
if exc_type is not None:
return
for filepath in sorted(self.expected_files):
self.test_case.assertIn(filepath, self.mock_filesystem.files)
self.test_case.assertEqual(
self.expected_files[filepath],
self.mock_filesystem.files[filepath])
def assertFilesAdded(self, mock_filesystem, files):
"""Assert that the given files where added to the mock_filesystem.
Use in a similar manner to self.assertRaises;
with self.assertFilesAdded(mock_filesystem, {'/newfile': 'contents'}):
code(mock_filesystem)
"""
return self._AssertFilesAddedContext(self, mock_filesystem, files)