cpython/Lib/test/libregrtest/result.py

import dataclasses
import json
from typing import Any

from .utils import (
    StrJSON, TestName, FilterTuple,
    format_duration, normalize_test_name, print_warning)


@dataclasses.dataclass(slots=True)
class TestStats:
    tests_run: int = 0
    failures: int = 0
    skipped: int = 0

    @staticmethod
    def from_unittest(result):
        return TestStats(result.testsRun,
                         len(result.failures),
                         len(result.skipped))

    @staticmethod
    def from_doctest(results):
        return TestStats(results.attempted,
                         results.failed,
                         results.skipped)

    def accumulate(self, stats):
        self.tests_run += stats.tests_run
        self.failures += stats.failures
        self.skipped += stats.skipped


# Avoid enum.Enum to reduce the number of imports when tests are run
class State:
    PASSED = "PASSED"
    FAILED = "FAILED"
    SKIPPED = "SKIPPED"
    UNCAUGHT_EXC = "UNCAUGHT_EXC"
    REFLEAK = "REFLEAK"
    ENV_CHANGED = "ENV_CHANGED"
    RESOURCE_DENIED = "RESOURCE_DENIED"
    INTERRUPTED = "INTERRUPTED"
    WORKER_FAILED = "WORKER_FAILED"   # non-zero worker process exit code
    WORKER_BUG = "WORKER_BUG"         # exception when running a worker
    DID_NOT_RUN = "DID_NOT_RUN"
    TIMEOUT = "TIMEOUT"

    @staticmethod
    def is_failed(state):
        return state in {
            State.FAILED,
            State.UNCAUGHT_EXC,
            State.REFLEAK,
            State.WORKER_FAILED,
            State.WORKER_BUG,
            State.TIMEOUT}

    @staticmethod
    def has_meaningful_duration(state):
        # Consider that the duration is meaningless for these cases.
        # For example, if a whole test file is skipped, its duration
        # is unlikely to be the duration of executing its tests,
        # but just the duration to execute code which skips the test.
        return state not in {
            State.SKIPPED,
            State.RESOURCE_DENIED,
            State.INTERRUPTED,
            State.WORKER_FAILED,
            State.WORKER_BUG,
            State.DID_NOT_RUN}

    @staticmethod
    def must_stop(state):
        return state in {
            State.INTERRUPTED,
            State.WORKER_BUG,
        }


FileName = str
LineNo = int
Location = tuple[FileName, LineNo]


@dataclasses.dataclass(slots=True)
class TestResult:
    test_name: TestName
    state: str | None = None
    # Test duration in seconds
    duration: float | None = None
    xml_data: list[str] | None = None
    stats: TestStats | None = None

    # errors and failures copied from support.TestFailedWithDetails
    errors: list[tuple[str, str]] | None = None
    failures: list[tuple[str, str]] | None = None

    # partial coverage in a worker run; not used by sequential in-process runs
    covered_lines: list[Location] | None = None

    def is_failed(self, fail_env_changed: bool) -> bool:
        if self.state == State.ENV_CHANGED:
            return fail_env_changed
        return State.is_failed(self.state)

    def _format_failed(self):
        if self.errors and self.failures:
            le = len(self.errors)
            lf = len(self.failures)
            error_s = "error" + ("s" if le > 1 else "")
            failure_s = "failure" + ("s" if lf > 1 else "")
            return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"

        if self.errors:
            le = len(self.errors)
            error_s = "error" + ("s" if le > 1 else "")
            return f"{self.test_name} failed ({le} {error_s})"

        if self.failures:
            lf = len(self.failures)
            failure_s = "failure" + ("s" if lf > 1 else "")
            return f"{self.test_name} failed ({lf} {failure_s})"

        return f"{self.test_name} failed"

    def __str__(self) -> str:
        match self.state:
            case State.PASSED:
                return f"{self.test_name} passed"
            case State.FAILED:
                return self._format_failed()
            case State.SKIPPED:
                return f"{self.test_name} skipped"
            case State.UNCAUGHT_EXC:
                return f"{self.test_name} failed (uncaught exception)"
            case State.REFLEAK:
                return f"{self.test_name} failed (reference leak)"
            case State.ENV_CHANGED:
                return f"{self.test_name} failed (env changed)"
            case State.RESOURCE_DENIED:
                return f"{self.test_name} skipped (resource denied)"
            case State.INTERRUPTED:
                return f"{self.test_name} interrupted"
            case State.WORKER_FAILED:
                return f"{self.test_name} worker non-zero exit code"
            case State.WORKER_BUG:
                return f"{self.test_name} worker bug"
            case State.DID_NOT_RUN:
                return f"{self.test_name} ran no tests"
            case State.TIMEOUT:
                assert self.duration is not None, "self.duration is None"
                return f"{self.test_name} timed out ({format_duration(self.duration)})"
            case _:
                raise ValueError("unknown result state: {state!r}")

    def has_meaningful_duration(self):
        return State.has_meaningful_duration(self.state)

    def set_env_changed(self):
        if self.state is None or self.state == State.PASSED:
            self.state = State.ENV_CHANGED

    def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
        if State.must_stop(self.state):
            return True
        if fail_fast and self.is_failed(fail_env_changed):
            return True
        return False

    def get_rerun_match_tests(self) -> FilterTuple | None:
        match_tests = []

        errors = self.errors or []
        failures = self.failures or []
        for error_list, is_error in (
            (errors, True),
            (failures, False),
        ):
            for full_name, *_ in error_list:
                match_name = normalize_test_name(full_name, is_error=is_error)
                if match_name is None:
                    # 'setUpModule (test.test_sys)': don't filter tests
                    return None
                if not match_name:
                    error_type = "ERROR" if is_error else "FAIL"
                    print_warning(f"rerun failed to parse {error_type} test name: "
                                  f"{full_name!r}: don't filter tests")
                    return None
                match_tests.append(match_name)

        if not match_tests:
            return None
        return tuple(match_tests)

    def write_json_into(self, file) -> None:
        json.dump(self, file, cls=_EncodeTestResult)

    @staticmethod
    def from_json(worker_json: StrJSON) -> 'TestResult':
        return json.loads(worker_json, object_hook=_decode_test_result)


class _EncodeTestResult(json.JSONEncoder):
    def default(self, o: Any) -> dict[str, Any]:
        if isinstance(o, TestResult):
            result = dataclasses.asdict(o)
            result["__test_result__"] = o.__class__.__name__
            return result
        else:
            return super().default(o)


def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]:
    if "__test_result__" in data:
        data.pop('__test_result__')
        if data['stats'] is not None:
            data['stats'] = TestStats(**data['stats'])
        if data['covered_lines'] is not None:
            data['covered_lines'] = [
                tuple(loc) for loc in data['covered_lines']
            ]
        return TestResult(**data)
    else:
        return data