chromium/third_party/wpt_tools/wpt/tools/third_party_modified/mozlog/mozlog/formatters/machformatter.py

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


import time
from functools import reduce

from mozterm import Terminal

from ..handlers import SummaryHandler
from . import base
from .process import strstatus
from .tbplformatter import TbplFormatter

color_dict = {
    "log_test_status_fail": "red",
    "log_process_output": "blue",
    "log_test_status_pass": "green",
    "log_test_status_unexpected_fail": "red",
    "log_test_status_known_intermittent": "yellow",
    "time": "cyan",
    "action": "yellow",
    "pid": "cyan",
    "heading": "bold_yellow",
    "sub_heading": "yellow",
    "error": "red",
    "warning": "yellow",
    "bold": "bold",
    "grey": "grey",
    "normal": "normal",
    "bright_black": "bright_black",
}

DEFAULT = "\x1b(B\x1b[m"


def format_seconds(total):
    """Format number of seconds to MM:SS.DD form."""
    minutes, seconds = divmod(total, 60)
    return "%2d:%05.2f" % (minutes, seconds)


class TerminalColors(object):
    def __init__(self, term, color_dict):
        for key, value in color_dict.items():
            attribute = getattr(term, value)
            # In Blessed, these attributes aren't always callable. We can assume
            # that if they're not, they're just the raw ANSI Escape Sequences.
            # This TerminalColors class is basically just a lookup table for
            # what function to call to format/color an input string a certain way.
            # So if the attribute above is a callable, we can just proceed, but
            # if it's not, we need to create our own function that prepends the
            # raw ANSI Escape Sequences to the input string, so that everything
            # has the same behavior. We append DEFAULT to reset to no formatting
            # at the end of our string, to prevent text that comes afterwards
            # from inheriting the prepended formatting.
            if not callable(attribute):

                def apply_formatting(text):
                    return attribute + text + DEFAULT

                attribute = apply_formatting
            setattr(self, key, attribute)


class MachFormatter(base.BaseFormatter):
    def __init__(
        self,
        start_time=None,
        write_interval=False,
        write_times=True,
        terminal=None,
        disable_colors=False,
        summary_on_shutdown=False,
        verbose=False,
        enable_screenshot=False,
        **kwargs
    ):
        super(MachFormatter, self).__init__(**kwargs)

        if start_time is None:
            start_time = time.time()
        start_time = int(start_time * 1000)
        self.start_time = start_time
        self.write_interval = write_interval
        self.write_times = write_times
        self.status_buffer = {}
        self.has_unexpected = {}
        self.last_time = None
        self.color_formatter = TerminalColors(
            Terminal(disable_styling=disable_colors), color_dict
        )
        self.verbose = verbose
        self._known_pids = set()
        self.tbpl_formatter = None
        self.enable_screenshot = enable_screenshot
        self.summary = SummaryHandler()
        self.summary_on_shutdown = summary_on_shutdown

        message_handlers = {
            "colors": {
                "on": self._enable_colors,
                "off": self._disable_colors,
            },
            "summary_on_shutdown": {
                "on": self._enable_summary_on_shutdown,
                "off": self._disable_summary_on_shutdown,
            },
        }

        for topic, handlers in message_handlers.items():
            self.message_handler.register_message_handlers(topic, handlers)

    def __call__(self, data):
        self.summary(data)

        s = super(MachFormatter, self).__call__(data)
        if s is None:
            return

        time = self.color_formatter.time(format_seconds(self._time(data)))
        return "%s %s\n" % (time, s)

    def _enable_colors(self):
        self.disable_colors = False

    def _disable_colors(self):
        self.disable_colors = True

    def _enable_summary_on_shutdown(self):
        self.summary_on_shutdown = True

    def _disable_summary_on_shutdown(self):
        self.summary_on_shutdown = False

    def _get_test_id(self, data):
        test_id = data.get("test")
        if isinstance(test_id, list):
            test_id = tuple(test_id)
        return test_id

    def _get_file_name(self, test_id):
        if isinstance(test_id, str):
            return test_id

        if isinstance(test_id, tuple):
            return "".join(test_id)

        assert False, "unexpected test_id"

    def suite_start(self, data):
        num_tests = reduce(lambda x, y: x + len(y), data["tests"].values(), 0)
        action = self.color_formatter.action(data["action"].upper())
        name = ""
        if "name" in data:
            name = " %s -" % (data["name"],)
        return "%s:%s running %i tests" % (action, name, num_tests)

    def suite_end(self, data):
        action = self.color_formatter.action(data["action"].upper())
        rv = [action]
        if not self.summary_on_shutdown:
            rv.append(
                self._format_suite_summary(
                    self.summary.current_suite, self.summary.current
                )
            )
        return "\n".join(rv)

    def _format_expected(self, status, expected, known_intermittent=[]):
        if status == expected:
            color = self.color_formatter.log_test_status_pass
            if expected not in ("PASS", "OK"):
                color = self.color_formatter.log_test_status_fail
                status = "EXPECTED-%s" % status
        else:
            if status in known_intermittent:
                color = self.color_formatter.log_test_status_known_intermittent
                status = "KNOWN-INTERMITTENT-%s" % status
            else:
                color = self.color_formatter.log_test_status_fail
                if status in ("PASS", "OK"):
                    status = "UNEXPECTED-%s" % status
        return color(status)

    def _format_status(self, test, data):
        name = data.get("subtest", test)
        rv = "%s %s" % (
            self._format_expected(
                data["status"],
                data.get("expected", data["status"]),
                data.get("known_intermittent", []),
            ),
            name,
        )
        if "message" in data:
            rv += " - %s" % data["message"]
        if "stack" in data:
            rv += self._format_stack(data["stack"])
        return rv

    def _format_stack(self, stack):
        return "\n%s\n" % self.color_formatter.bright_black(stack.strip("\n"))

    def _format_suite_summary(self, suite, summary):
        count = summary["counts"]
        logs = summary["unexpected_logs"]
        intermittent_logs = summary["intermittent_logs"]
        harness_errors = summary["harness_errors"]

        rv = [
            "",
            self.color_formatter.sub_heading(suite),
            self.color_formatter.sub_heading("~" * len(suite)),
        ]

        # Format check counts
        checks = self.summary.aggregate("count", count)
        rv.append(
            "Ran {} checks ({})".format(
                sum(checks.values()),
                ", ".join(
                    ["{} {}s".format(v, k) for k, v in sorted(checks.items()) if v]
                ),
            )
        )

        # Format expected counts
        checks = self.summary.aggregate("expected", count, include_skip=False)
        intermittent_checks = self.summary.aggregate(
            "known_intermittent", count, include_skip=False
        )
        intermittents = sum(intermittent_checks.values())
        known = (
            " ({} known intermittents)".format(intermittents) if intermittents else ""
        )
        rv.append("Expected results: {}{}".format(sum(checks.values()), known))

        # Format skip counts
        skip_tests = count["test"]["expected"]["skip"]
        skip_subtests = count["subtest"]["expected"]["skip"]
        if skip_tests:
            skipped = "Skipped: {} tests".format(skip_tests)
            if skip_subtests:
                skipped = "{}, {} subtests".format(skipped, skip_subtests)
            rv.append(skipped)

        # Format unexpected counts
        checks = self.summary.aggregate("unexpected", count)
        unexpected_count = sum(checks.values())
        rv.append("Unexpected results: {}".format(unexpected_count))
        if unexpected_count:
            for key in ("test", "subtest", "assert"):
                if not count[key]["unexpected"]:
                    continue
                status_str = ", ".join(
                    [
                        "{} {}".format(n, s)
                        for s, n in sorted(count[key]["unexpected"].items())
                    ]
                )
                rv.append(
                    "  {}: {} ({})".format(
                        key, sum(count[key]["unexpected"].values()), status_str
                    )
                )

        # Format intermittents
        if intermittents > 0:
            heading = "Known Intermittent Results"
            rv.extend(
                [
                    "",
                    self.color_formatter.heading(heading),
                    self.color_formatter.heading("-" * len(heading)),
                ]
            )
            if count["subtest"]["count"]:
                for test_id, results in intermittent_logs.items():
                    test = self._get_file_name(test_id)
                    rv.append(self.color_formatter.bold(test))
                    for data in results:
                        rv.append("  %s" % self._format_status(test, data).rstrip())
            else:
                for test_id, results in intermittent_logs.items():
                    test = self._get_file_name(test_id)
                    for data in results:
                        assert "subtest" not in data
                        rv.append(self._format_status(test, data).rstrip())

        # Format status
        testfailed = any(
            count[key]["unexpected"] for key in ("test", "subtest", "assert")
        )
        if not testfailed and not harness_errors:
            rv.append(self.color_formatter.log_test_status_pass("OK"))
        else:
            # Format test failures
            heading = "Unexpected Results"
            rv.extend(
                [
                    "",
                    self.color_formatter.heading(heading),
                    self.color_formatter.heading("-" * len(heading)),
                ]
            )
            if count["subtest"]["count"]:
                for test_id, results in logs.items():
                    test = self._get_file_name(test_id)
                    rv.append(self.color_formatter.bold(test))
                    for data in results:
                        rv.append("  %s" % self._format_status(test, data).rstrip())
            else:
                for test_id, results in logs.items():
                    test = self._get_file_name(test_id)
                    for data in results:
                        assert "subtest" not in data
                        rv.append(self._format_status(test, data).rstrip())

            # Format harness errors
            if harness_errors:
                for data in harness_errors:
                    rv.append(self.log(data))

        return "\n".join(rv)

    def test_start(self, data):
        action = self.color_formatter.action(data["action"].upper())
        return "%s: %s" % (action, self._get_test_id(data))

    def test_end(self, data):
        subtests = self._get_subtest_data(data)

        if "expected" in data and data["status"] not in data.get(
            "known_intermittent", []
        ):
            parent_unexpected = True
            expected_str = ", expected %s" % data["expected"]
        else:
            parent_unexpected = False
            expected_str = ""

        has_screenshots = "reftest_screenshots" in data.get("extra", {})

        test = self._get_test_id(data)

        # Reset the counts to 0
        self.status_buffer[test] = {"count": 0, "unexpected": 0, "pass": 0}
        self.has_unexpected[test] = bool(subtests["unexpected"])

        if subtests["count"] != 0:
            rv = "Test %s%s. Subtests passed %i/%i. Unexpected %s" % (
                data["status"],
                expected_str,
                subtests["pass"],
                subtests["count"],
                subtests["unexpected"],
            )
        else:
            rv = "%s%s" % (data["status"], expected_str)

        unexpected = self.summary.current["unexpected_logs"].get(data["test"])
        if unexpected:
            if len(unexpected) == 1 and parent_unexpected:
                message = unexpected[0].get("message", "")
                if message:
                    rv += " - %s" % message
                if "stack" in data:
                    rv += self._format_stack(data["stack"])
            elif not self.verbose:
                rv += "\n"
                for d in unexpected:
                    rv += self._format_status(data["test"], d)

        intermittents = self.summary.current["intermittent_logs"].get(data["test"])
        if intermittents:
            rv += "\n"
            for d in intermittents:
                rv += self._format_status(data["test"], d)

        if "expected" not in data and not bool(subtests["unexpected"]):
            color = self.color_formatter.log_test_status_pass
        else:
            color = self.color_formatter.log_test_status_unexpected_fail

        action = color(data["action"].upper())
        rv = "%s: %s" % (action, rv)
        if has_screenshots and self.enable_screenshot:
            if self.tbpl_formatter is None:
                self.tbpl_formatter = TbplFormatter()
            # Create TBPL-like output that can be pasted into the reftest analyser
            rv = "\n".join((rv, self.tbpl_formatter.test_end(data)))
        return rv

    def valgrind_error(self, data):
        rv = " " + data["primary"] + "\n"
        for line in data["secondary"]:
            rv = rv + line + "\n"

        return rv

    def lsan_leak(self, data):
        allowed = data.get("allowed_match")
        if allowed:
            prefix = self.color_formatter.log_test_status_fail("FAIL")
        else:
            prefix = self.color_formatter.log_test_status_unexpected_fail(
                "UNEXPECTED-FAIL"
            )

        return "%s LeakSanitizer: leak at %s" % (prefix, ", ".join(data["frames"]))

    def lsan_summary(self, data):
        allowed = data.get("allowed", False)
        if allowed:
            prefix = self.color_formatter.warning("WARNING")
        else:
            prefix = self.color_formatter.error("ERROR")

        return (
            "%s | LeakSanitizer | "
            "SUMMARY: AddressSanitizer: %d byte(s) leaked in %d allocation(s)."
            % (prefix, data["bytes"], data["allocations"])
        )

    def mozleak_object(self, data):
        data_log = data.copy()
        data_log["level"] = "INFO"
        data_log["message"] = "leakcheck: %s leaked %d %s" % (
            data["process"],
            data["bytes"],
            data["name"],
        )
        return self.log(data_log)

    def mozleak_total(self, data):
        if data["bytes"] is None:
            # We didn't see a line with name 'TOTAL'
            if data.get("induced_crash", False):
                data_log = data.copy()
                data_log["level"] = "INFO"
                data_log["message"] = (
                    "leakcheck: %s deliberate crash and thus no leak log\n"
                    % data["process"]
                )
                return self.log(data_log)
            if data.get("ignore_missing", False):
                return (
                    "%s ignoring missing output line for total leaks\n"
                    % data["process"]
                )

            status = self.color_formatter.log_test_status_pass("FAIL")
            return "%s leakcheck: " "%s missing output line for total leaks!\n" % (
                status,
                data["process"],
            )

        if data["bytes"] == 0:
            return "%s leakcheck: %s no leaks detected!\n" % (
                self.color_formatter.log_test_status_pass("PASS"),
                data["process"],
            )

        message = "leakcheck: %s %d bytes leaked\n" % (data["process"], data["bytes"])

        # data["bytes"] will include any expected leaks, so it can be off
        # by a few thousand bytes.
        failure = data["bytes"] > data["threshold"]
        status = (
            self.color_formatter.log_test_status_unexpected_fail("UNEXPECTED-FAIL")
            if failure
            else self.color_formatter.log_test_status_fail("FAIL")
        )
        return "%s %s\n" % (status, message)

    def test_status(self, data):
        test = self._get_test_id(data)
        if test not in self.status_buffer:
            self.status_buffer[test] = {"count": 0, "unexpected": 0, "pass": 0}
        self.status_buffer[test]["count"] += 1

        if data["status"] == "PASS":
            self.status_buffer[test]["pass"] += 1

        if "expected" in data and data["status"] not in data.get(
            "known_intermittent", []
        ):
            self.status_buffer[test]["unexpected"] += 1

        if self.verbose:
            return self._format_status(test, data).rstrip("\n")

    def assertion_count(self, data):
        if data["min_expected"] <= data["count"] <= data["max_expected"]:
            return

        if data["min_expected"] != data["max_expected"]:
            expected = "%i to %i" % (data["min_expected"], data["max_expected"])
        else:
            expected = "%i" % data["min_expected"]

        action = self.color_formatter.log_test_status_fail("ASSERT")
        return "%s: Assertion count %i, expected %s assertions\n" % (
            action,
            data["count"],
            expected,
        )

    def process_output(self, data):
        rv = []

        pid = data["process"]
        if pid.isdigit():
            pid = "pid:%s" % pid
        pid = self.color_formatter.pid(pid)

        if "command" in data and data["process"] not in self._known_pids:
            self._known_pids.add(data["process"])
            rv.append("%s Full command: %s" % (pid, data["command"]))

        rv.append("%s %s" % (pid, data["data"]))
        return "\n".join(rv)

    def crash(self, data):
        test = self._get_test_id(data)

        if data.get("stackwalk_returncode", 0) != 0 and not data.get(
            "stackwalk_stderr"
        ):
            success = True
        else:
            success = False

        rv = [
            "pid:%s. Process type: %s. Test:%s. Minidump analysed:%s. Signature:[%s]"
            % (
                data.get("pid", "unknown"),
                data.get("process_type", None),
                test,
                success,
                data["signature"],
            )
        ]

        if data.get("java_stack"):
            rv.append("Java exception: %s" % data["java_stack"])
        else:
            if data.get("reason"):
                rv.append("Mozilla crash reason: %s" % data["reason"])

            if data.get("minidump_path"):
                rv.append("Crash dump filename: %s" % data["minidump_path"])

            if data.get("stackwalk_returncode", 0) != 0:
                rv.append(
                    "minidump-stackwalk exited with return code %d"
                    % data["stackwalk_returncode"]
                )

            if data.get("stackwalk_stderr"):
                rv.append("stderr from minidump-stackwalk:")
                rv.append(data["stackwalk_stderr"])
            elif data.get("stackwalk_stdout"):
                rv.append(data["stackwalk_stdout"])

            if data.get("stackwalk_errors"):
                rv.extend(data.get("stackwalk_errors"))

        rv = "\n".join(rv)
        if not rv[-1] == "\n":
            rv += "\n"

        action = self.color_formatter.action(data["action"].upper())
        return "%s: %s" % (action, rv)

    def process_start(self, data):
        rv = "Started process `%s`" % data["process"]
        desc = data.get("command")
        if desc:
            rv = "%s (%s)" % (rv, desc)
        return rv

    def process_exit(self, data):
        return "%s: %s" % (data["process"], strstatus(data["exitcode"]))

    def log(self, data):
        level = data.get("level").upper()

        if level in ("CRITICAL", "ERROR"):
            level = self.color_formatter.error(level)
        elif level == "WARNING":
            level = self.color_formatter.warning(level)
        elif level == "INFO":
            level = self.color_formatter.log_process_output(level)

        if data.get("component"):
            rv = " ".join([data["component"], level, data["message"]])
        else:
            rv = "%s %s" % (level, data["message"])

        if "stack" in data:
            rv += "\n%s" % data["stack"]

        return rv

    def lint(self, data):
        fmt = (
            "{path}  {c1}{lineno}{column}  {c2}{level}{normal}  {message}"
            "  {c1}{rule}({linter}){normal}"
        )
        message = fmt.format(
            path=data["path"],
            normal=self.color_formatter.normal,
            c1=self.color_formatter.grey,
            c2=self.color_formatter.error
            if data["level"] == "error"
            else (self.color_formatter.log_test_status_fail),
            lineno=str(data["lineno"]),
            column=(":" + str(data["column"])) if data.get("column") else "",
            level=data["level"],
            message=data["message"],
            rule="{} ".format(data["rule"]) if data.get("rule") else "",
            linter=data["linter"].lower() if data.get("linter") else "",
        )

        return message

    def shutdown(self, data):
        if not self.summary_on_shutdown:
            return

        heading = "Overall Summary"
        rv = [
            "",
            self.color_formatter.heading(heading),
            self.color_formatter.heading("=" * len(heading)),
        ]
        for suite, summary in self.summary:
            rv.append(self._format_suite_summary(suite, summary))
        return "\n".join(rv)

    def _get_subtest_data(self, data):
        test = self._get_test_id(data)
        return self.status_buffer.get(test, {"count": 0, "unexpected": 0, "pass": 0})

    def _time(self, data):
        entry_time = data["time"]
        if self.write_interval and self.last_time is not None:
            t = entry_time - self.last_time
            self.last_time = entry_time
        else:
            t = entry_time - self.start_time

        return t / 1000.0