chromium/third_party/wpt_tools/wpt/tools/third_party_modified/mozlog/mozlog/formatters/grouping.py

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import collections
import os
import platform
import subprocess
import sys

from mozlog.formatters import base

DEFAULT_MOVE_UP_CODE = u"\x1b[A"
DEFAULT_CLEAR_EOL_CODE = u"\x1b[K"


class GroupingFormatter(base.BaseFormatter):
    """Formatter designed to produce unexpected test results grouped
    together in a readable format."""

    def __init__(self):
        super(GroupingFormatter, self).__init__()
        self.number_of_tests = 0
        self.completed_tests = 0
        self.need_to_erase_last_line = False
        self.current_display = ""
        self.running_tests = {}
        self.test_output = collections.defaultdict(str)
        self.subtest_failures = collections.defaultdict(list)
        self.test_failure_text = ""
        self.tests_with_failing_subtests = []
        self.interactive = os.isatty(sys.stdout.fileno())
        self.show_logs = False

        self.message_handler.register_message_handlers(
            "show_logs",
            {
                "on": self._enable_show_logs,
                "off": self._disable_show_logs,
            },
        )

        # TODO(mrobinson, 8313): We need to add support for Windows terminals here.
        if self.interactive:
            self.move_up, self.clear_eol = self.get_move_up_and_clear_eol_codes()
            if platform.system() != "Windows":
                self.line_width = int(
                    subprocess.check_output(["stty", "size"]).split()[1]
                )
            else:
                # Until we figure out proper Windows support,
                # this makes things work well enough to run.
                self.line_width = 80

        self.expected = {
            "OK": 0,
            "PASS": 0,
            "FAIL": 0,
            "PRECONDITION_FAILED": 0,
            "ERROR": 0,
            "TIMEOUT": 0,
            "SKIP": 0,
            "CRASH": 0,
        }

        self.unexpected_tests = {
            "OK": [],
            "PASS": [],
            "FAIL": [],
            "PRECONDITION_FAILED": [],
            "ERROR": [],
            "TIMEOUT": [],
            "CRASH": [],
        }

        # Follows the format of {(<subsuite>, <test>, <subtest>): <data>}, where
        # (<subsuite>, <test>, None) represents a top level test.
        self.known_intermittent_results = {}

    def _enable_show_logs(self):
        self.show_logs = True

    def _disable_show_logs(self):
        self.show_logs = False

    def get_move_up_and_clear_eol_codes(self):
        try:
            import blessed
        except ImportError:
            return DEFAULT_MOVE_UP_CODE, DEFAULT_CLEAR_EOL_CODE

        try:
            self.terminal = blessed.Terminal()
            return self.terminal.move_up, self.terminal.clear_eol
        except Exception as exception:
            sys.stderr.write(
                "GroupingFormatter: Could not get terminal "
                "control characters: %s\n" % exception
            )
            return DEFAULT_MOVE_UP_CODE, DEFAULT_CLEAR_EOL_CODE

    def text_to_erase_display(self):
        if not self.interactive or not self.current_display:
            return ""
        return (self.move_up + self.clear_eol) * self.current_display.count("\n")

    def generate_output(self, text=None, new_display=None):
        if not self.interactive:
            return text

        output = self.text_to_erase_display()
        if text:
            output += text
        if new_display is not None:
            self.current_display = new_display
        return output + self.current_display

    def build_status_line(self):
        if self.number_of_tests == 0:
            new_display = "  [%i] " % self.completed_tests
        else:
            new_display = "  [%i/%i] " % (self.completed_tests, self.number_of_tests)

        if self.running_tests:
            indent = " " * len(new_display)
            if self.interactive:
                max_width = self.line_width - len(new_display)
            else:
                max_width = sys.maxsize
            return (
                new_display +
                ("\n%s" % indent).join(
                    f"{self.get_test_name_output(subsuite, test_name)}"[:max_width]
                    for subsuite, test_name in self.running_tests.values()
                ) + "\n"
            )
        else:
            return new_display + "No tests running.\n"

    def suite_start(self, data):
        self.number_of_tests = sum(
            len(tests) for tests in data["tests"].values()
        )
        self.start_time = data["time"]

        if self.number_of_tests == 0:
            return "Running tests in %s\n\n" % data[u"source"]
        else:
            return "Running %i tests in %s\n\n" % (
                self.number_of_tests,
                data[u"source"],
            )

    def test_start(self, data):
        self.running_tests[data["thread"]] = (data.get("subsuite"), data["test"])
        return self.generate_output(text=None, new_display=self.build_status_line())

    def wrap_and_indent_lines(self, lines, indent):
        assert len(lines) > 0

        output = indent + u"\u25B6 %s\n" % lines[0]
        for line in lines[1:-1]:
            output += indent + u"\u2502 %s\n" % line
        if len(lines) > 1:
            output += indent + u"\u2514 %s\n" % lines[-1]
        return output

    def get_lines_for_unexpected_result(
        self, test_name, status, expected, message, stack
    ):
        # Test names sometimes contain control characters, which we want
        # to be printed in their raw form, and not their interpreted form.
        test_name = test_name.encode("unicode-escape").decode("utf-8")

        if expected:
            expected_text = u" [expected %s]" % expected
        else:
            expected_text = u""

        lines = [u"%s%s %s" % (status, expected_text, test_name)]
        if message:
            lines.append(u"  \u2192 %s" % message)
        if stack:
            lines.append("")
            lines += [stackline for stackline in stack.splitlines()]
        return lines

    def get_lines_for_known_intermittents(self, known_intermittent_results):
        lines = []

        for (subsuite, test, subtest), data in self.known_intermittent_results.items():
            status = data["status"]
            known_intermittent = ", ".join(data["known_intermittent"])
            expected = " [expected %s, known intermittent [%s]" % (
                data["expected"],
                known_intermittent,
            )
            lines += [
                u"%s%s %s%s"
                % (
                    status,
                    expected,
                    self.get_test_name_output(subsuite, test),
                    (", %s" % subtest) if subtest is not None else "",
                )
            ]
        output = self.wrap_and_indent_lines(lines, "  ") + "\n"
        return output

    def get_test_name_output(self, subsuite, test_name):
        # Generate human readable test name from subsuite and test_name.
        # Vendors can override this function to produce output in a different
        # format that suites their need.
        return f"{subsuite}:{test_name}" if subsuite else test_name

    def get_output_for_unexpected_subtests(self, subsuite, test_name, unexpected_subtests):
        if not unexpected_subtests:
            return ""

        def add_subtest_failure(lines, subtest, stack=None):
            lines += self.get_lines_for_unexpected_result(
                subtest.get("subtest", None),
                subtest.get("status", None),
                subtest.get("expected", None),
                subtest.get("message", None),
                stack,
            )

        def make_subtests_failure(subsuite, test_name, subtests, stack=None):
            lines = [u"Unexpected subtest result in %s:"
                     % self.get_test_name_output(subsuite, test_name)]
            for subtest in subtests[:-1]:
                add_subtest_failure(lines, subtest, None)
            add_subtest_failure(lines, subtests[-1], stack)
            return self.wrap_and_indent_lines(lines, "  ") + "\n"

        # Organize the failures by stack trace so we don't print the same stack trace
        # more than once. They are really tall and we don't want to flood the screen
        # with duplicate information.
        output = ""
        failures_by_stack = collections.defaultdict(list)
        for failure in unexpected_subtests:
            # Print stackless results first. They are all separate.
            if "stack" not in failure:
                output += make_subtests_failure(subsuite, test_name, [failure], None)
            else:
                failures_by_stack[failure["stack"]].append(failure)

        for (stack, failures) in failures_by_stack.items():
            output += make_subtests_failure(subsuite, test_name, failures, stack)
        return output

    def test_end(self, data):
        self.completed_tests += 1
        test_status = data["status"]
        test_name = data["test"]
        subsuite = data.get("subsuite")
        known_intermittent_statuses = data.get("known_intermittent", [])
        subtest_failures = self.subtest_failures.pop((subsuite, test_name), [])
        if "expected" in data and test_status not in known_intermittent_statuses:
            had_unexpected_test_result = True
        else:
            had_unexpected_test_result = False

        del self.running_tests[data["thread"]]
        new_display = self.build_status_line()

        if not had_unexpected_test_result and not subtest_failures:
            self.expected[test_status] += 1
            if self.interactive:
                return self.generate_output(text=None, new_display=new_display)
            else:
                return self.generate_output(
                    text="  %s\n" % self.get_test_name_output(subsuite, test_name),
                    new_display=new_display
                )

        if test_status in known_intermittent_statuses:
            self.known_intermittent_results[(subsuite, test_name, None)] = data

        # If the test crashed or timed out, we also include any process output,
        # because there is a good chance that the test produced a stack trace
        # or other error messages.
        if test_status in ("CRASH", "TIMEOUT"):
            stack = self.test_output[(subsuite, test_name)] + data.get("stack", "")
        else:
            stack = data.get("stack", None)

        output = ""
        if had_unexpected_test_result:
            self.unexpected_tests[test_status].append(data)
            lines = self.get_lines_for_unexpected_result(
                self.get_test_name_output(subsuite, test_name),
                test_status,
                data.get("expected", None),
                data.get("message", None),
                stack,
            )
            output += self.wrap_and_indent_lines(lines, "  ") + "\n"

        if subtest_failures:
            self.tests_with_failing_subtests.append((subsuite, test_name))
            output += self.get_output_for_unexpected_subtests(
                subsuite, test_name, subtest_failures
            )
        self.test_failure_text += output

        return self.generate_output(text=output, new_display=new_display)

    def test_status(self, data):
        if "expected" in data and data["status"] not in data.get(
            "known_intermittent", []
        ):
            key = (data.get("subsuite"), data["test"])
            self.subtest_failures[key].append(data)
        elif data["status"] in data.get("known_intermittent", []):
            key = (data.get("subsuite"), data["test"], data["subtest"])
            self.known_intermittent_results[key] = data

    def suite_end(self, data):
        self.end_time = data["time"]

        if not self.interactive:
            output = u"\n"
        else:
            output = ""

        output += u"Ran %i tests finished in %.1f seconds.\n" % (
            self.completed_tests,
            (self.end_time - self.start_time) / 1000.0,
        )
        output += u"  \u2022 %i ran as expected. %i tests skipped.\n" % (
            sum(self.expected.values()),
            self.expected["SKIP"],
        )
        if self.known_intermittent_results:
            output += u"  \u2022 %i known intermittent results.\n" % (
                len(self.known_intermittent_results)
            )

        def text_for_unexpected_list(text, section):
            tests = self.unexpected_tests[section]
            if not tests:
                return u""
            return u"  \u2022 %i tests %s\n" % (len(tests), text)

        output += text_for_unexpected_list(u"crashed unexpectedly", "CRASH")
        output += text_for_unexpected_list(u"had errors unexpectedly", "ERROR")
        output += text_for_unexpected_list(u"failed unexpectedly", "FAIL")
        output += text_for_unexpected_list(
            u"precondition failed unexpectedly", "PRECONDITION_FAILED"
        )
        output += text_for_unexpected_list(u"timed out unexpectedly", "TIMEOUT")
        output += text_for_unexpected_list(u"passed unexpectedly", "PASS")
        output += text_for_unexpected_list(u"unexpectedly okay", "OK")

        num_with_failing_subtests = len(self.tests_with_failing_subtests)
        if num_with_failing_subtests:
            output += (
                u"  \u2022 %i tests had unexpected subtest results\n"
                % num_with_failing_subtests
            )
        output += "\n"

        # Repeat failing test output, so that it is easier to find, since the
        # non-interactive version prints all the test names.
        if not self.interactive and self.test_failure_text:
            output += u"Tests with unexpected results:\n" + self.test_failure_text

        if self.known_intermittent_results:
            results = self.get_lines_for_known_intermittents(
                self.known_intermittent_results
            )
            output += u"Tests with known intermittent results:\n" + results

        return self.generate_output(text=output, new_display="")

    def process_output(self, data):
        if data["thread"] not in self.running_tests:
            return
        test_key = self.running_tests[data["thread"]]
        self.test_output[test_key] += data["data"] + "\n"

    def log(self, data):
        if data.get("component"):
            message = "%s %s %s" % (data["component"], data["level"], data["message"])
        else:
            message = "%s %s" % (data["level"], data["message"])
        if "stack" in data:
            message += "\n%s" % data["stack"]

        # We are logging messages that begin with STDERR, because that is how exceptions
        # in this formatter are indicated.
        if data["message"].startswith("STDERR"):
            return self.generate_output(text=message + "\n")

        if data["level"] in ("CRITICAL", "ERROR"):
            return self.generate_output(text=message + "\n")
        # Show all messages if show_logs switched on.
        if self.show_logs:
            return self.generate_output(text=message + "\n")