cpython/Tools/c-analyzer/c_common/scriptutil.py

import argparse
import contextlib
import logging
import os
import os.path
import shutil
import sys

from . import fsutil, strutil, iterutil, logging as loggingutil


_NOT_SET = object()


def get_prog(spec=None, *, absolute=False, allowsuffix=True):
    if spec is None:
        _, spec = _find_script()
        # This is more natural for prog than __file__ would be.
        filename = sys.argv[0]
    elif isinstance(spec, str):
        filename = os.path.normpath(spec)
        spec = None
    else:
        filename = spec.origin
    if _is_standalone(filename):
        # Check if "installed".
        if allowsuffix or not filename.endswith('.py'):
            basename = os.path.basename(filename)
            found = shutil.which(basename)
            if found:
                script = os.path.abspath(filename)
                found = os.path.abspath(found)
                if os.path.normcase(script) == os.path.normcase(found):
                    return basename
        # It is only "standalone".
        if absolute:
            filename = os.path.abspath(filename)
        return filename
    elif spec is not None:
        module = spec.name
        if module.endswith('.__main__'):
            module = module[:-9]
        return f'{sys.executable} -m {module}'
    else:
        if absolute:
            filename = os.path.abspath(filename)
        return f'{sys.executable} {filename}'


def _find_script():
    frame = sys._getframe(2)
    while frame.f_globals['__name__'] != '__main__':
        frame = frame.f_back

    # This should match sys.argv[0].
    filename = frame.f_globals['__file__']
    # This will be None if -m wasn't used..
    spec = frame.f_globals['__spec__']
    return filename, spec


def is_installed(filename, *, allowsuffix=True):
    if not allowsuffix and filename.endswith('.py'):
        return False
    filename = os.path.abspath(os.path.normalize(filename))
    found = shutil.which(os.path.basename(filename))
    if not found:
        return False
    if found != filename:
        return False
    return _is_standalone(filename)


def is_standalone(filename):
    filename = os.path.abspath(os.path.normalize(filename))
    return _is_standalone(filename)


def _is_standalone(filename):
    return fsutil.is_executable(filename)


##################################
# logging

VERBOSITY = 3

TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip()
TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO'))


logger = logging.getLogger(__name__)


def configure_logger(verbosity, logger=None, **kwargs):
    if logger is None:
        # Configure the root logger.
        logger = logging.getLogger()
    loggingutil.configure_logger(logger, verbosity, **kwargs)


##################################
# selections

class UnsupportedSelectionError(Exception):
    def __init__(self, values, possible):
        self.values = tuple(values)
        self.possible = tuple(possible)
        super().__init__(f'unsupported selections {self.unique}')

    @property
    def unique(self):
        return tuple(sorted(set(self.values)))


def normalize_selection(selected: str, *, possible=None):
    if selected in (None, True, False):
        return selected
    elif isinstance(selected, str):
        selected = [selected]
    elif not selected:
        return ()

    unsupported = []
    _selected = set()
    for item in selected:
        if not item:
            continue
        for value in item.strip().replace(',', ' ').split():
            if not value:
                continue
            # XXX Handle subtraction (leading "-").
            if possible and value not in possible and value != 'all':
                unsupported.append(value)
            _selected.add(value)
    if unsupported:
        raise UnsupportedSelectionError(unsupported, tuple(possible))
    if 'all' in _selected:
        return True
    return frozenset(selected)


##################################
# CLI parsing helpers

class CLIArgSpec(tuple):
    def __new__(cls, *args, **kwargs):
        return super().__new__(cls, (args, kwargs))

    def __repr__(self):
        args, kwargs = self
        args = [repr(arg) for arg in args]
        for name, value in kwargs.items():
            args.append(f'{name}={value!r}')
        return f'{type(self).__name__}({", ".join(args)})'

    def __call__(self, parser, *, _noop=(lambda a: None)):
        self.apply(parser)
        return _noop

    def apply(self, parser):
        args, kwargs = self
        parser.add_argument(*args, **kwargs)


def apply_cli_argspecs(parser, specs):
    processors = []
    for spec in specs:
        if callable(spec):
            procs = spec(parser)
            _add_procs(processors, procs)
        else:
            args, kwargs = spec
            parser.add_argument(args, kwargs)
    return processors


def _add_procs(flattened, procs):
    # XXX Fail on non-empty, non-callable procs?
    if not procs:
        return
    if callable(procs):
        flattened.append(procs)
    else:
        #processors.extend(p for p in procs if callable(p))
        for proc in procs:
            _add_procs(flattened, proc)


def add_verbosity_cli(parser):
    parser.add_argument('-q', '--quiet', action='count', default=0)
    parser.add_argument('-v', '--verbose', action='count', default=0)

    def process_args(args, *, argv=None):
        ns = vars(args)
        key = 'verbosity'
        if key in ns:
            parser.error(f'duplicate arg {key!r}')
        ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet'))
        return key
    return process_args


def add_traceback_cli(parser):
    parser.add_argument('--traceback', '--tb', action='store_true',
                        default=TRACEBACK)
    parser.add_argument('--no-traceback', '--no-tb', dest='traceback',
                        action='store_const', const=False)

    def process_args(args, *, argv=None):
        ns = vars(args)
        key = 'traceback_cm'
        if key in ns:
            parser.error(f'duplicate arg {key!r}')
        showtb = ns.pop('traceback')

        @contextlib.contextmanager
        def traceback_cm():
            restore = loggingutil.hide_emit_errors()
            try:
                yield
            except BrokenPipeError:
                # It was piped to "head" or something similar.
                pass
            except NotImplementedError:
                raise  # re-raise
            except Exception as exc:
                if not showtb:
                    sys.exit(f'ERROR: {exc}')
                raise  # re-raise
            except KeyboardInterrupt:
                if not showtb:
                    sys.exit('\nINTERRUPTED')
                raise  # re-raise
            except BaseException as exc:
                if not showtb:
                    sys.exit(f'{type(exc).__name__}: {exc}')
                raise  # re-raise
            finally:
                restore()
        ns[key] = traceback_cm()
        return key
    return process_args


def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs):
#    if opt is True:
#        parser.add_argument(f'--{dest}', action='append', **kwargs)
#    elif isinstance(opt, str) and opt.startswith('-'):
#        parser.add_argument(opt, dest=dest, action='append', **kwargs)
#    else:
#        arg = dest if not opt else opt
#        kwargs.setdefault('nargs', '+')
#        parser.add_argument(arg, dest=dest, action='append', **kwargs)
    if not isinstance(opt, str):
        parser.error(f'opt must be a string, got {opt!r}')
    elif opt.startswith('-'):
        parser.add_argument(opt, dest=dest, action='append', **kwargs)
    else:
        kwargs.setdefault('nargs', '+')
        #kwargs.setdefault('metavar', opt.upper())
        parser.add_argument(opt, dest=dest, action='append', **kwargs)

    def process_args(args, *, argv=None):
        ns = vars(args)

        # XXX Use normalize_selection()?
        if isinstance(ns[dest], str):
            ns[dest] = [ns[dest]]
        selections = []
        for many in ns[dest] or ():
            for value in many.split(sep):
                if value not in choices:
                    parser.error(f'unknown {dest} {value!r}')
                selections.append(value)
        ns[dest] = selections
    return process_args


def add_files_cli(parser, *, excluded=None, nargs=None):
    process_files = add_file_filtering_cli(parser, excluded=excluded)
    parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME')
    return [
        process_files,
    ]


def add_file_filtering_cli(parser, *, excluded=None):
    parser.add_argument('--start')
    parser.add_argument('--include', action='append')
    parser.add_argument('--exclude', action='append')

    excluded = tuple(excluded or ())

    def process_args(args, *, argv=None):
        ns = vars(args)
        key = 'iter_filenames'
        if key in ns:
            parser.error(f'duplicate arg {key!r}')

        _include = tuple(ns.pop('include') or ())
        _exclude = excluded + tuple(ns.pop('exclude') or ())
        kwargs = dict(
            start=ns.pop('start'),
            include=tuple(_parse_files(_include)),
            exclude=tuple(_parse_files(_exclude)),
            # We use the default for "show_header"
        )
        def process_filenames(filenames, relroot=None):
            return fsutil.process_filenames(filenames, relroot=relroot, **kwargs)
        ns[key] = process_filenames
    return process_args


def _parse_files(filenames):
    for filename, _ in strutil.parse_entries(filenames):
        yield filename.strip()


def add_progress_cli(parser, *, threshold=VERBOSITY, **kwargs):
    parser.add_argument('--progress', dest='track_progress', action='store_const', const=True)
    parser.add_argument('--no-progress', dest='track_progress', action='store_false')
    parser.set_defaults(track_progress=True)

    def process_args(args, *, argv=None):
        if args.track_progress:
            ns = vars(args)
            verbosity = ns.get('verbosity', VERBOSITY)
            if verbosity <= threshold:
                args.track_progress = track_progress_compact
            else:
                args.track_progress = track_progress_flat
    return process_args


def add_failure_filtering_cli(parser, pool, *, default=False):
    parser.add_argument('--fail', action='append',
                        metavar=f'"{{all|{"|".join(sorted(pool))}}},..."')
    parser.add_argument('--no-fail', dest='fail', action='store_const', const=())

    def process_args(args, *, argv=None):
        ns = vars(args)

        fail = ns.pop('fail')
        try:
            fail = normalize_selection(fail, possible=pool)
        except UnsupportedSelectionError as exc:
            parser.error(f'invalid --fail values: {", ".join(exc.unique)}')
        else:
            if fail is None:
                fail = default

            if fail is True:
                def ignore_exc(_exc):
                    return False
            elif fail is False:
                def ignore_exc(_exc):
                    return True
            else:
                def ignore_exc(exc):
                    for err in fail:
                        if type(exc) == pool[err]:
                            return False
                    else:
                        return True
            args.ignore_exc = ignore_exc
    return process_args


def add_kind_filtering_cli(parser, *, default=None):
    parser.add_argument('--kinds', action='append')

    def process_args(args, *, argv=None):
        ns = vars(args)

        kinds = []
        for kind in ns.pop('kinds') or default or ():
            kinds.extend(kind.strip().replace(',', ' ').split())

        if not kinds:
            match_kind = (lambda k: True)
        else:
            included = set()
            excluded = set()
            for kind in kinds:
                if kind.startswith('-'):
                    kind = kind[1:]
                    excluded.add(kind)
                    if kind in included:
                        included.remove(kind)
                else:
                    included.add(kind)
                    if kind in excluded:
                        excluded.remove(kind)
            if excluded:
                if included:
                    ...  # XXX fail?
                def match_kind(kind, *, _excluded=excluded):
                    return kind not in _excluded
            else:
                def match_kind(kind, *, _included=included):
                    return kind in _included
        args.match_kind = match_kind
    return process_args


COMMON_CLI = [
    add_verbosity_cli,
    add_traceback_cli,
    #add_dryrun_cli,
]


def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None):
    arg_processors = {}
    if isinstance(subset, str):
        cmdname = subset
        try:
            _, argspecs, _ = commands[cmdname]
        except KeyError:
            raise ValueError(f'unsupported subset {subset!r}')
        parser.set_defaults(cmd=cmdname)
        arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs)
    else:
        if subset is None:
            cmdnames = subset = list(commands)
        elif not subset:
            raise NotImplementedError
        elif isinstance(subset, set):
            cmdnames = [k for k in commands if k in subset]
            subset = sorted(subset)
        else:
            cmdnames = [n for n in subset if n in commands]
        if len(cmdnames) < len(subset):
            bad = tuple(n for n in subset if n not in commands)
            raise ValueError(f'unsupported subset {bad}')

        common = argparse.ArgumentParser(add_help=False)
        common_processors = apply_cli_argspecs(common, commonspecs)
        subs = parser.add_subparsers(dest='cmd')
        for cmdname in cmdnames:
            description, argspecs, _ = commands[cmdname]
            sub = subs.add_parser(
                cmdname,
                description=description,
                parents=[common],
            )
            cmd_processors = _add_cmd_cli(sub, (), argspecs)
            arg_processors[cmdname] = common_processors + cmd_processors
    return arg_processors


def _add_cmd_cli(parser, commonspecs, argspecs):
    processors = []
    argspecs = list(commonspecs or ()) + list(argspecs or ())
    for argspec in argspecs:
        if callable(argspec):
            procs = argspec(parser)
            _add_procs(processors, procs)
        else:
            if not argspec:
                raise NotImplementedError
            args = list(argspec)
            if not isinstance(args[-1], str):
                kwargs = args.pop()
                if not isinstance(args[0], str):
                    try:
                        args, = args
                    except (TypeError, ValueError):
                        parser.error(f'invalid cmd args {argspec!r}')
            else:
                kwargs = {}
            parser.add_argument(*args, **kwargs)
            # There will be nothing to process.
    return processors


def _flatten_processors(processors):
    for proc in processors:
        if proc is None:
            continue
        if callable(proc):
            yield proc
        else:
            yield from _flatten_processors(proc)


def process_args(args, argv, processors, *, keys=None):
    processors = _flatten_processors(processors)
    ns = vars(args)
    extracted = {}
    if keys is None:
        for process_args in processors:
            for key in process_args(args, argv=argv):
                extracted[key] = ns.pop(key)
    else:
        remainder = set(keys)
        for process_args in processors:
            hanging = process_args(args, argv=argv)
            if isinstance(hanging, str):
                hanging = [hanging]
            for key in hanging or ():
                if key not in remainder:
                    raise NotImplementedError(key)
                extracted[key] = ns.pop(key)
                remainder.remove(key)
        if remainder:
            raise NotImplementedError(sorted(remainder))
    return extracted


def process_args_by_key(args, argv, processors, keys):
    extracted = process_args(args, argv, processors, keys=keys)
    return [extracted[key] for key in keys]


##################################
# commands

def set_command(name, add_cli):
    """A decorator factory to set CLI info."""
    def decorator(func):
        if hasattr(func, '__cli__'):
            raise Exception(f'already set')
        func.__cli__ = (name, add_cli)
        return func
    return decorator


##################################
# main() helpers

def filter_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
    # We expect each filename to be a normalized, absolute path.
    for filename, _, check, _ in _iter_filenames(filenames, process_filenames, relroot):
        if (reason := check()):
            logger.debug(f'{filename}: {reason}')
            continue
        yield filename


def main_for_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
    filenames, relroot = fsutil.fix_filenames(filenames, relroot=relroot)
    for filename, relfile, check, show in _iter_filenames(filenames, process_filenames, relroot):
        if show:
            print()
            print(relfile)
            print('-------------------------------------------')
        if (reason := check()):
            print(reason)
            continue
        yield filename, relfile


def _iter_filenames(filenames, process, relroot):
    if process is None:
        yield from fsutil.process_filenames(filenames, relroot=relroot)
        return

    onempty = Exception('no filenames provided')
    items = process(filenames, relroot=relroot)
    items, peeked = iterutil.peek_and_iter(items)
    if not items:
        raise onempty
    if isinstance(peeked, str):
        if relroot and relroot is not fsutil.USE_CWD:
            relroot = os.path.abspath(relroot)
        check = (lambda: True)
        for filename, ismany in iterutil.iter_many(items, onempty):
            relfile = fsutil.format_filename(filename, relroot, fixroot=False)
            yield filename, relfile, check, ismany
    elif len(peeked) == 4:
        yield from items
    else:
        raise NotImplementedError


def track_progress_compact(items, *, groups=5, **mark_kwargs):
    last = os.linesep
    marks = iter_marks(groups=groups, **mark_kwargs)
    for item in items:
        last = next(marks)
        print(last, end='', flush=True)
        yield item
    if not last.endswith(os.linesep):
        print()


def track_progress_flat(items, fmt='<{}>'):
    for item in items:
        print(fmt.format(item), flush=True)
        yield item


def iter_marks(mark='.', *, group=5, groups=2, lines=_NOT_SET, sep=' '):
    mark = mark or ''
    group = group if group and group > 1 else 1
    groups = groups if groups and groups > 1 else 1

    sep = f'{mark}{sep}' if sep else mark
    end = f'{mark}{os.linesep}'
    div = os.linesep
    perline = group * groups
    if lines is _NOT_SET:
        # By default we try to put about 100 in each line group.
        perlines = 100 // perline * perline
    elif not lines or lines < 0:
        perlines = None
    else:
        perlines = perline * lines

    if perline == 1:
        yield end
    elif group == 1:
        yield sep

    count = 1
    while True:
        if count % perline == 0:
            yield end
            if perlines and count % perlines == 0:
                yield div
        elif count % group == 0:
            yield sep
        else:
            yield mark
        count += 1