chromium/third_party/wpt_tools/wpt/tools/wptrunner/wptrunner/formatters/wptreport.py

# mypy: allow-untyped-defs

import json
import re

from mozlog.structured.formatters.base import BaseFormatter
from ..executors.base import strip_server


LONE_SURROGATE_RE = re.compile("[\uD800-\uDFFF]")


def surrogate_replacement(match):
    return "U+" + hex(ord(match.group()))[2:]


def replace_lone_surrogate(data):
    return LONE_SURROGATE_RE.subn(surrogate_replacement, data)[0]


class WptreportFormatter(BaseFormatter):  # type: ignore
    """Formatter that produces results in the format that wptreport expects."""

    def __init__(self):
        self.raw_results = {}
        self.results = {}

    def suite_start(self, data):
        self.results['run_info'] = data.get('run_info', {})
        self.results['time_start'] = data['time']
        self.results["results"] = []
        self.results["subsuites"] = {}

    def add_subsuite(self, data):
        self.results["subsuites"][data["name"]] = data.get("run_info", {})

    def suite_end(self, data):
        self.results['time_end'] = data['time']
        for subsuite, results in self.raw_results.items():
            for test_name, result in results.items():
                result["test"] = test_name
                result["subsuite"] = subsuite
                self.results["results"].append(result)
        return json.dumps(self.results) + "\n"

    def find_or_create_test(self, data):
        subsuite = data.get("subsuite", "")
        test_name = data["test"]
        subsuite_results = self.raw_results.setdefault(subsuite, {})
        return subsuite_results.setdefault(test_name, {"subtests": [],
                                                      "status": "",
                                                      "message": None})

    def test_start(self, data):
        test = self.find_or_create_test(data)
        test["start_time"] = data["time"]

    def create_subtest(self, data):
        test = self.find_or_create_test(data)
        subtest_name = replace_lone_surrogate(data["subtest"])

        subtest = {
            "name": subtest_name,
            "status": "",
            "message": None
        }
        test["subtests"].append(subtest)

        return subtest

    def test_status(self, data):
        subtest = self.create_subtest(data)
        subtest["status"] = data["status"]
        if "expected" in data:
            subtest["expected"] = data["expected"]
        if "known_intermittent" in data:
            subtest["known_intermittent"] = data["known_intermittent"]
        if "message" in data:
            subtest["message"] = replace_lone_surrogate(data["message"])

    def test_end(self, data):
        test = self.find_or_create_test(data)
        start_time = test.pop("start_time")
        test["duration"] = data["time"] - start_time
        test["status"] = data["status"]
        if "expected" in data:
            test["expected"] = data["expected"]
        if "known_intermittent" in data:
            test["known_intermittent"] = data["known_intermittent"]
        if "message" in data:
            test["message"] = replace_lone_surrogate(data["message"])
        if "reftest_screenshots" in data.get("extra", {}):
            test["screenshots"] = {
                strip_server(item["url"]): "sha1:" + item["hash"]
                for item in data["extra"]["reftest_screenshots"]
                if isinstance(item, dict)
            }
        test_name = data["test"]
        subsuite = data.get("subsuite", "")
        result = {"test": test_name,
                  "subsuite": subsuite}
        result.update(self.raw_results[subsuite][test_name])
        self.results["results"].append(result)
        self.raw_results[subsuite].pop(test_name)

    def assertion_count(self, data):
        test = self.find_or_create_test(data)
        test["asserts"] = {
            "count": data["count"],
            "min": data["min_expected"],
            "max": data["max_expected"]
        }

    def lsan_leak(self, data):
        if "lsan_leaks" not in self.results:
            self.results["lsan_leaks"] = []
        lsan_leaks = self.results["lsan_leaks"]
        lsan_leaks.append({"frames": data["frames"],
                           "scope": data["scope"],
                           "allowed_match": data.get("allowed_match"),
                           "subsuite": data.get("subsuite", "")})

    def find_or_create_mozleak(self, data):
        if "mozleak" not in self.results:
            self.results["mozleak"] = {}
        scope = data["scope"]
        if scope not in self.results["mozleak"]:
            self.results["mozleak"][scope] = {"objects": [], "total": []}
        return self.results["mozleak"][scope]

    def mozleak_object(self, data):
        scope_data = self.find_or_create_mozleak(data)
        scope_data["objects"].append({"process": data["process"],
                                      "name": data["name"],
                                      "allowed": data.get("allowed", False),
                                      "bytes": data["bytes"],
                                      "subsuite": data.get("subsuite", "")})

    def mozleak_total(self, data):
        scope_data = self.find_or_create_mozleak(data)
        scope_data["total"].append({"bytes": data["bytes"],
                                    "threshold": data.get("threshold", 0),
                                    "process": data["process"],
                                    "subsuite": data.get("subsuite", "")})