chromium/third_party/wpt_tools/wpt/tools/third_party_modified/mozlog/mozlog/formatters/tbplformatter.py

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import functools
from collections import deque
from functools import reduce

from ..handlers import SummaryHandler
from .base import BaseFormatter
from .process import strstatus


def output_subtests(func):
    @functools.wraps(func)
    def inner(self, data):
        if self.subtests_count:
            return self._format_subtests(data.get("component")) + func(self, data)
        else:
            return func(self, data)

    return inner


class TbplFormatter(BaseFormatter):
    """Formatter that formats logs in the legacy formatting format used by TBPL
    This is intended to be used to preserve backward compatibility with existing tools
    hand-parsing this format.
    """

    def __init__(self, compact=False, summary_on_shutdown=False, **kwargs):
        super(TbplFormatter, self).__init__(**kwargs)
        self.suite_start_time = None
        self.test_start_times = {}
        self.buffer = None
        self.compact = compact
        self.subtests_count = 0

        self.summary = SummaryHandler()
        self.summary_on_shutdown = summary_on_shutdown

    def __call__(self, data):
        if self.summary_on_shutdown:
            self.summary(data)
        return super(TbplFormatter, self).__call__(data)

    @property
    def compact(self):
        return self._compact

    @compact.setter
    def compact(self, value):
        self._compact = value
        if value:
            self.buffer = deque([], 10)
        else:
            self.buffer = None

    def _format_subtests(self, component, subtract_context=False):
        count = self.subtests_count
        if subtract_context:
            count -= len(self.buffer)
        self.subtests_count = 0
        return self._log(
            {"level": "INFO", "message": "." * count, "component": component}
        )

    @output_subtests
    def log(self, data):
        return self._log(data)

    def _log(self, data):
        if data.get("component"):
            message = "%s %s" % (data["component"], data["message"])
        else:
            message = data["message"]

        if "stack" in data:
            message += "\n%s" % data["stack"]

        return "%s\n" % message

    @output_subtests
    def process_output(self, data):
        pid = data["process"]
        if pid.isdigit():
            pid = "PID %s" % pid
        return "%s | %s\n" % (pid, data["data"])

    @output_subtests
    def process_start(self, data):
        msg = "TEST-INFO | started process %s" % data["process"]
        if "command" in data:
            msg = "%s (%s)" % (msg, data["command"])
        return msg + "\n"

    @output_subtests
    def process_exit(self, data):
        return "TEST-INFO | %s: %s\n" % (data["process"], strstatus(data["exitcode"]))

    @output_subtests
    def crash(self, data):
        id = data["test"] if "test" in data else "pid: %s" % data["process"]

        if data.get("java_stack"):
            # use "<exception> at <top frame>" as a crash signature for java exception
            sig = data["java_stack"].split("\n")
            sig = " ".join(sig[0:2])
            rv = ["PROCESS-CRASH | %s | %s\n[%s]" % (id, sig, data["java_stack"])]

            if data.get("reason"):
                rv.append("Mozilla crash reason: %s" % data["reason"])

            if data.get("minidump_path"):
                rv.append("Crash dump filename: %s" % data["minidump_path"])

        else:
            signature = data["signature"] if data["signature"] else "unknown top frame"
            reason = data.get("reason", "application crashed")
            rv = ["PROCESS-CRASH | %s [%s] | %s " % (reason, signature, id)]

            if data.get("process_type"):
                rv.append("Process type: {}".format(data["process_type"]))

            rv.append("Process pid: {}".format(data.get("pid", "unknown")))

            if data.get("reason"):
                rv.append("Mozilla crash reason: %s" % data["reason"])

            if data.get("minidump_path"):
                rv.append("Crash dump filename: %s" % data["minidump_path"])

            if data.get("stackwalk_stderr"):
                rv.append("stderr from minidump-stackwalk:")
                rv.append(data["stackwalk_stderr"])
            elif data.get("stackwalk_stdout"):
                rv.append(data["stackwalk_stdout"])

            if data.get("stackwalk_returncode", 0) != 0:
                rv.append(
                    "minidump-stackwalk exited with return code %d"
                    % data["stackwalk_returncode"]
                )

            if data.get("stackwalk_errors"):
                rv.extend(data.get("stackwalk_errors"))

        rv = "\n".join(rv)
        if not rv[-1] == "\n":
            rv += "\n"

        return rv

    def suite_start(self, data):
        self.suite_start_time = data["time"]
        num_tests = reduce(lambda x, y: x + len(y), data["tests"].values(), 0)
        return "SUITE-START | Running %i tests\n" % num_tests

    def test_start(self, data):
        self.test_start_times[self.test_id(data["test"])] = data["time"]

        return "TEST-START | %s\n" % data["test"]

    def test_status(self, data):
        if self.compact:
            if "expected" in data:
                rv = []
                rv.append(
                    self._format_subtests(data.get("component"), subtract_context=True)
                )
                rv.extend(self._format_status(item) for item in self.buffer)
                rv.append(self._format_status(data))
                self.buffer.clear()
                return "".join(rv)
            else:
                self.subtests_count += 1
                self.buffer.append(data)
        else:
            return self._format_status(data)

    def assertion_count(self, data):
        if data["min_expected"] != data["max_expected"]:
            expected = "%i to %i" % (data["min_expected"], data["max_expected"])
        else:
            expected = "%i" % data["min_expected"]

        if data["count"] < data["min_expected"]:
            status, comparison = "TEST-UNEXPECTED-PASS", "is less than"
        elif data["count"] > data["max_expected"]:
            status, comparison = "TEST-UNEXPECTED-FAIL", "is more than"
        elif data["count"] > 0:
            status, comparison = "TEST-KNOWN-FAIL", "matches"
        else:
            return

        return "%s | %s | assertion count %i %s expected %s assertions\n" % (
            status,
            data["test"],
            data["count"],
            comparison,
            expected,
        )

    def _format_status(self, data):
        message = "- " + data["message"] if "message" in data else ""
        if "stack" in data:
            message += "\n%s" % data["stack"]
        if message and message[-1] == "\n":
            message = message[:-1]

        status = data["status"]

        if "expected" in data:
            if status in data.get("known_intermittent", []):
                status = "KNOWN-INTERMITTENT-%s" % status
            else:
                if not message:
                    message = "- expected %s" % data["expected"]
                failure_line = "TEST-UNEXPECTED-%s | %s | %s %s\n" % (
                    status,
                    data["test"],
                    data["subtest"],
                    message,
                )
                if data["expected"] != "PASS":
                    info_line = "TEST-INFO | expected %s\n" % data["expected"]
                    return failure_line + info_line
                return failure_line

        return "TEST-%s | %s | %s %s\n" % (
            status,
            data["test"],
            data["subtest"],
            message,
        )

    def test_end(self, data):
        rv = []
        if self.compact and self.subtests_count:
            print_context = "expected" in data
            rv.append(
                self._format_subtests(
                    data.get("component"), subtract_context=print_context
                )
            )
            if print_context:
                rv.extend(self._format_status(item) for item in self.buffer)
            self.buffer.clear()

        test_id = self.test_id(data["test"])
        duration_msg = ""

        if test_id in self.test_start_times:
            start_time = self.test_start_times.pop(test_id)
            time = data["time"] - start_time
            duration_msg = "took %ims" % time

        screenshot_msg = ""
        extra = data.get("extra", {})
        if "reftest_screenshots" in extra:
            screenshots = extra["reftest_screenshots"]
            if len(screenshots) == 3:
                screenshot_msg = (
                    "\nREFTEST   IMAGE 1 (TEST): data:image/png;base64,%s\n"
                    "REFTEST   IMAGE 2 (REFERENCE): data:image/png;base64,%s"
                ) % (screenshots[0]["screenshot"], screenshots[2]["screenshot"])
            elif len(screenshots) == 1:
                screenshot_msg = (
                    "\nREFTEST   IMAGE: data:image/png;base64,%s"
                    % screenshots[0]["screenshot"]
                )

        status = data["status"]

        if "expected" in data:
            if status in data.get("known_intermittent", []):
                status = "KNOWN-INTERMITTENT-%s" % status
            else:
                message = data.get("message", "")
                if not message:
                    message = "expected %s" % data["expected"]
                if "stack" in data:
                    message += "\n%s" % data["stack"]
                if message and message[-1] == "\n":
                    message = message[:-1]

                message += screenshot_msg

                failure_line = "TEST-UNEXPECTED-%s | %s | %s\n" % (
                    data["status"],
                    test_id,
                    message,
                )

                if data["expected"] not in ("PASS", "OK"):
                    expected_msg = "expected %s | " % data["expected"]
                else:
                    expected_msg = ""
                info_line = "TEST-INFO %s%s\n" % (expected_msg, duration_msg)

                return failure_line + info_line

        sections = ["TEST-%s" % status, test_id]
        if duration_msg:
            sections.append(duration_msg)
        rv.append(" | ".join(sections) + "\n")
        if screenshot_msg:
            rv.append(screenshot_msg[1:] + "\n")
        return "".join(rv)

    def suite_end(self, data):
        start_time = self.suite_start_time
        # pylint --py3k W1619
        # in wpt --repeat mode sometimes we miss suite_start()
        if start_time is None:
            start_time = data["time"]
        time = int((data["time"] - start_time) / 1000)

        return "SUITE-END | took %is\n" % time

    def test_id(self, test_id):
        if isinstance(test_id, str):
            return test_id
        else:
            return tuple(test_id)

    @output_subtests
    def valgrind_error(self, data):
        rv = "TEST-UNEXPECTED-VALGRIND-ERROR | " + data["primary"] + "\n"
        for line in data["secondary"]:
            rv = rv + line + "\n"

        return rv

    def lint(self, data):
        fmt = "TEST-UNEXPECTED-{level} | {path}:{lineno}{column} | {message} ({rule})"
        data["column"] = ":%s" % data["column"] if data["column"] else ""
        data["rule"] = data["rule"] or data["linter"] or ""
        return fmt.append(fmt.format(**data))

    def lsan_leak(self, data):
        frames = data.get("frames")
        allowed_match = data.get("allowed_match")
        frame_list = ", ".join(frames)
        prefix = "TEST-UNEXPECTED-FAIL" if not allowed_match else "TEST-FAIL"
        suffix = (
            ""
            if not allowed_match
            else "INFO | LeakSanitizer | Frame %s matched a expected leak\n"
            % allowed_match
        )
        return "%s | LeakSanitizer | leak at %s\n%s" % (prefix, frame_list, suffix)

    def lsan_summary(self, data):
        level = "INFO" if data.get("allowed", False) else "ERROR"
        return (
            "%s | LeakSanitizer | "
            "SUMMARY: AddressSanitizer: %d byte(s) leaked in %d allocation(s)."
            % (level, data["bytes"], data["allocations"])
        )

    def mozleak_object(self, data):
        return "TEST-INFO | leakcheck | %s leaked %d %s\n" % (
            data["process"],
            data["bytes"],
            data["name"],
        )

    def mozleak_total(self, data):
        if data["bytes"] is None:
            # We didn't see a line with name 'TOTAL'
            if data.get("induced_crash", False):
                return (
                    "TEST-INFO | leakcheck | %s deliberate crash and thus no leak log\n"
                    % data["process"]
                )
            if data.get("ignore_missing", False):
                return (
                    "TEST-INFO | leakcheck | "
                    "%s ignoring missing output line for total leaks\n"
                    % data["process"]
                )

            return (
                "TEST-UNEXPECTED-FAIL | leakcheck | "
                "%s missing output line for total leaks!\n" % data["process"]
            )

        if data["bytes"] == 0:
            return "TEST-PASS | leakcheck | %s no leaks detected!\n" % data["process"]

        # Create a comma delimited string of the first N leaked objects found,
        # to aid with bug summary matching in TBPL. Note: The order of the objects
        # had no significance (they're sorted alphabetically).
        max_objects = 5
        object_summary = ", ".join(data["objects"][:max_objects])
        if len(data["objects"]) > max_objects:
            object_summary += ", ..."

        message = "leakcheck | %s %d bytes leaked (%s)\n" % (
            data["process"],
            data["bytes"],
            object_summary,
        )

        # data["bytes"] will include any expected leaks, so it can be off
        # by a few thousand bytes.
        if data["bytes"] > data["threshold"]:
            return "TEST-UNEXPECTED-FAIL | %s\n" % message
        else:
            return "WARNING | %s\n" % message

    def _format_suite_summary(self, suite, summary):
        counts = summary["counts"]
        logs = summary["unexpected_logs"]
        intermittent_logs = summary["intermittent_logs"]

        total = sum(self.summary.aggregate("count", counts).values())
        expected = sum(self.summary.aggregate("expected", counts).values())
        intermittents = sum(
            self.summary.aggregate("known_intermittent", counts).values()
        )
        known = (
            " ({} known intermittent tests)".format(intermittents)
            if intermittents
            else ""
        )
        status_str = "{}/{}{}".format(expected, total, known)
        rv = ["{}: {}".format(suite, status_str)]

        for results in logs.values():
            for data in results:
                rv.append("  {}".format(self._format_status(data)))

        if intermittent_logs:
            rv.append("Known Intermittent tests:")
            for results in intermittent_logs.values():
                for data in results:
                    data["subtest"] = data.get("subtest", "")
                    rv.append("  {}".format(self._format_status(data)))

        return "\n".join(rv)

    def shutdown(self, data):
        if not self.summary_on_shutdown:
            return

        rv = ["", "Overall Summary"]
        for suite, summary in self.summary:
            rv.append(self._format_suite_summary(suite, summary))
        rv.append("")
        return "\n".join(rv)