llvm/clang/tools/scan-build-py/lib/libscanbuild/__init__.py

# -*- coding: utf-8 -*-
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
""" This module is a collection of methods commonly used in this project. """
import collections
import functools
import json
import logging
import os
import os.path
import re
import shlex
import subprocess
import sys

ENVIRONMENT_KEY = "INTERCEPT_BUILD"

Execution = collections.namedtuple("Execution", ["pid", "cwd", "cmd"])

CtuConfig = collections.namedtuple(
    "CtuConfig", ["collect", "analyze", "dir", "extdef_map_cmd"]
)


def duplicate_check(method):
    """Predicate to detect duplicated entries.

    Unique hash method can be use to detect duplicates. Entries are
    represented as dictionaries, which has no default hash method.
    This implementation uses a set datatype to store the unique hash values.

    This method returns a method which can detect the duplicate values."""

    def predicate(entry):
        entry_hash = predicate.unique(entry)
        if entry_hash not in predicate.state:
            predicate.state.add(entry_hash)
            return False
        return True

    predicate.unique = method
    predicate.state = set()
    return predicate


def run_build(command, *args, **kwargs):
    """Run and report build command execution

    :param command: array of tokens
    :return: exit code of the process
    """
    environment = kwargs.get("env", os.environ)
    logging.debug("run build %s, in environment: %s", command, environment)
    exit_code = subprocess.call(command, *args, **kwargs)
    logging.debug("build finished with exit code: %d", exit_code)
    return exit_code


def run_command(command, cwd=None):
    """Run a given command and report the execution.

    :param command: array of tokens
    :param cwd: the working directory where the command will be executed
    :return: output of the command
    """

    def decode_when_needed(result):
        """check_output returns bytes or string depend on python version"""
        return result.decode("utf-8") if isinstance(result, bytes) else result

    try:
        directory = os.path.abspath(cwd) if cwd else os.getcwd()
        logging.debug("exec command %s in %s", command, directory)
        output = subprocess.check_output(
            command, cwd=directory, stderr=subprocess.STDOUT
        )
        return decode_when_needed(output).splitlines()
    except subprocess.CalledProcessError as ex:
        ex.output = decode_when_needed(ex.output).splitlines()
        raise ex


def reconfigure_logging(verbose_level):
    """Reconfigure logging level and format based on the verbose flag.

    :param verbose_level: number of `-v` flags received by the command
    :return: no return value
    """
    # Exit when nothing to do.
    if verbose_level == 0:
        return

    root = logging.getLogger()
    # Tune logging level.
    level = logging.WARNING - min(logging.WARNING, (10 * verbose_level))
    root.setLevel(level)
    # Be verbose with messages.
    if verbose_level <= 3:
        fmt_string = "%(name)s: %(levelname)s: %(message)s"
    else:
        fmt_string = "%(name)s: %(levelname)s: %(funcName)s: %(message)s"
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter(fmt=fmt_string))
    root.handlers = [handler]


def command_entry_point(function):
    """Decorator for command entry methods.

    The decorator initialize/shutdown logging and guard on programming
    errors (catch exceptions).

    The decorated method can have arbitrary parameters, the return value will
    be the exit code of the process."""

    @functools.wraps(function)
    def wrapper(*args, **kwargs):
        """Do housekeeping tasks and execute the wrapped method."""

        try:
            logging.basicConfig(
                format="%(name)s: %(message)s", level=logging.WARNING, stream=sys.stdout
            )
            # This hack to get the executable name as %(name).
            logging.getLogger().name = os.path.basename(sys.argv[0])
            return function(*args, **kwargs)
        except KeyboardInterrupt:
            logging.warning("Keyboard interrupt")
            return 130  # Signal received exit code for bash.
        except Exception:
            logging.exception("Internal error.")
            if logging.getLogger().isEnabledFor(logging.DEBUG):
                logging.error(
                    "Please report this bug and attach the output " "to the bug report"
                )
            else:
                logging.error(
                    "Please run this command again and turn on "
                    "verbose mode (add '-vvvv' as argument)."
                )
            return 64  # Some non used exit code for internal errors.
        finally:
            logging.shutdown()

    return wrapper


def compiler_wrapper(function):
    """Implements compiler wrapper base functionality.

    A compiler wrapper executes the real compiler, then implement some
    functionality, then returns with the real compiler exit code.

    :param function: the extra functionality what the wrapper want to
    do on top of the compiler call. If it throws exception, it will be
    caught and logged.
    :return: the exit code of the real compiler.

    The :param function: will receive the following arguments:

    :param result:       the exit code of the compilation.
    :param execution:    the command executed by the wrapper."""

    def is_cxx_compiler():
        """Find out was it a C++ compiler call. Compiler wrapper names
        contain the compiler type. C++ compiler wrappers ends with `c++`,
        but might have `.exe` extension on windows."""

        wrapper_command = os.path.basename(sys.argv[0])
        return re.match(r"(.+)c\+\+(.*)", wrapper_command)

    def run_compiler(executable):
        """Execute compilation with the real compiler."""

        command = executable + sys.argv[1:]
        logging.debug("compilation: %s", command)
        result = subprocess.call(command)
        logging.debug("compilation exit code: %d", result)
        return result

    # Get relevant parameters from environment.
    parameters = json.loads(os.environ[ENVIRONMENT_KEY])
    reconfigure_logging(parameters["verbose"])
    # Execute the requested compilation. Do crash if anything goes wrong.
    cxx = is_cxx_compiler()
    compiler = parameters["cxx"] if cxx else parameters["cc"]
    result = run_compiler(compiler)
    # Call the wrapped method and ignore it's return value.
    try:
        call = Execution(
            pid=os.getpid(),
            cwd=os.getcwd(),
            cmd=["c++" if cxx else "cc"] + sys.argv[1:],
        )
        function(result, call)
    except:
        logging.exception("Compiler wrapper failed complete.")
    finally:
        # Always return the real compiler exit code.
        return result


def wrapper_environment(args):
    """Set up environment for interpose compiler wrapper."""

    return {
        ENVIRONMENT_KEY: json.dumps(
            {
                "verbose": args.verbose,
                "cc": shlex.split(args.cc),
                "cxx": shlex.split(args.cxx),
            }
        )
    }