chromium/chrome/test/webapps/test_analysis.py

#!/usr/bin/env python3
# Copyright 2021 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Test analysis functions for the testing framework.
"""

from collections import defaultdict
import datetime
import logging
import re
import os
from typing import List, Set

from models import Action
from models import CoverageTest
from models import CoverageTestsByPlatform
from models import ActionType
from models import CoverageTestsByPlatformSet
from models import TestId
from models import TestIdsTestNamesByPlatformSet
from models import TestIdTestNameTuple
from models import TestPartitionDescription
from models import TestPlatform


def filter_tests_for_partition(tests: List[CoverageTest],
                               partition: TestPartitionDescription
                               ) -> List[CoverageTest]:
    def DoesTestHaveActionWithPrefixes(test: CoverageTest):
        """Returns if the given tests has any actions with the given prefixes"""
        nonlocal partition
        for action in test.actions:
            for prefix in partition.action_name_prefixes:
                if action.name.startswith(prefix):
                    return True

    return list(filter(DoesTestHaveActionWithPrefixes, tests))


def compare_and_print_tests_to_remove_and_add(
        existing_tests: TestIdsTestNamesByPlatformSet,
        required_tests: CoverageTestsByPlatformSet,
        test_partitions: List[TestPartitionDescription],
        default_partition: TestPartitionDescription, add_to_file: bool):
    """
    Given the existing tests on disk and the required tests, print out the
    changes that need to happen to make them match. This also takes into account
    test partitioning, so tests are asked to be written to the appropriate test
    partition file.
    Note: This does NOT support moving tests between partition files. If a test
    was found in any partition file, then it is ignored.
    """
    def print_tests(filename: str, tests: List[CoverageTest],
                    partition: TestPartitionDescription, add_to_file: bool):
        new_test_str: str = ""
        for test in tests:
            new_test_str += ("\n" + test.generate_browsertest(partition) +
                             "\n")
        if add_to_file:
            if os.path.exists(filename):
                with open(filename, "r") as f:
                    test_file = f.read()
                # Find the last test in the test file
                if re.search(r"IN_PROC_BROWSER_TEST_F(.|\n)*?}\n", test_file):
                    res = re.finditer(r"IN_PROC_BROWSER_TEST_F(.|\n)*?}\n",
                                      test_file)
                    last_test_end_index = list(res)[-1].end()
                # Find the first closing parenthesis (end of namespace) if
                # there is no test in the file
                elif "}" in test_file:
                    last_test_end_index = test_file.find("}") - 1
                else:
                    last_test_end_index = len(test_file)
                new_content = (test_file[:last_test_end_index] + new_test_str +
                               test_file[last_test_end_index:])
                with open(filename, "w") as f:
                    f.write(new_content)
            else:
                print(f"\n\nCreate a new test file: {filename}\n"
                      "Remember to add the new test file to the BUILD file.\n"
                      "Add the following tests to the new test file:\n"
                      f"{new_test_str}")
        else:
            print(f"\n\nAdd the following tests to {filename}:\n"
                  f"{new_test_str}")

    test_ids_to_keep: TestIdsByPlatformSet = defaultdict(lambda: set())
    for platforms, tests in required_tests.items():
        tests_to_add: List[CoverageTest] = []
        for test in tests:
            if platforms in existing_tests:
                existing_test_set = set(
                    [test_id for (test_id, _) in existing_tests[platforms]])
                if test.id not in existing_test_set:
                    tests_to_add.append(test)
                else:
                    test_ids_to_keep[platforms].add(test.id)
            else:
                tests_to_add.append(test)
        tests_added_to_partition: Set[TestId] = set()
        for partition in test_partitions:
            tests_to_add_partition = filter_tests_for_partition(
                tests_to_add, partition)
            if not tests_to_add_partition:
                continue
            # Record all tests to ensure we don't have duplicates in different
            # files, and to output remaining tests to the default partition.
            for test in tests_to_add_partition:
                if test.id in tests_added_to_partition:
                    raise ValueError(
                        "Cannot have a test written to multiple test files.")
                tests_added_to_partition.add(test.id)
            filename = partition.generate_browsertest_filepath(platforms)
            print_tests(filename, tests_to_add_partition, partition,
                        add_to_file)

        # All remaining tests go into the default partition
        default_tests: List[CoverageTest] = [
            test for test in tests_to_add
            if test.id not in tests_added_to_partition
        ]
        if not default_tests:
            continue
        filename = default_partition.generate_browsertest_filepath(platforms)
        print_tests(filename, default_tests, default_partition, add_to_file)
    # Print out all tests to remove. To keep the algorithm simple the partition
    # is not kept track of.
    for platforms, test_ids_names in existing_tests.items():
        tests_to_remove = []
        prompt_str = ""
        nice_platform_str = ", ".join(
            [f"{platform}" for platform in platforms])
        if platforms not in test_ids_to_keep:
            prompt_str = (f"\n\nRemove ALL tests from the file for the "
                          f"platforms [{nice_platform_str}]:\n")
            tests_to_remove = [test_name for (_, test_name) in test_ids_names]
        else:
            prompt_str = (f"\n\nRemove these tests from the file for the "
                          f"platforms [{nice_platform_str}]:\n")
            tests_to_remove = [
                test_name for (test_id, test_name) in test_ids_names
                if test_id not in test_ids_to_keep[platforms]
            ]

        if not tests_to_remove:
            continue
        print(f"{prompt_str}{', '.join(tests_to_remove)}")


def expand_parameterized_tests(coverage_tests: List[CoverageTest]
                               ) -> List[CoverageTest]:
    """
    Takes a list of coverage tests that contain parameterized actions, and
    expands all of the tests with those actions to result in a list of tests
    without parameterized actions.
    """

    def get_all_parameterized_tests(test_actions: List[Action]
                                    ) -> List[List[Action]]:
        """
        Takes a list of actions with possible parameterized actions, and outputs
        a list of resulting tests with all parameterized actions expanded.
        """
        if not test_actions:
            return [[]]
        for i, action in enumerate(test_actions):
            if action.type is not ActionType.PARAMETERIZED:
                continue
            actions_before_parameterized = test_actions[:i]
            actions_after_parameterized = test_actions[i + 1:]
            resulting_tests = []
            for output_action in action.output_actions:
                remaining_expanded_tests = get_all_parameterized_tests(
                    actions_after_parameterized)
                for remaining_test in remaining_expanded_tests:
                    test = (actions_before_parameterized + [output_action] +
                            remaining_test)
                    resulting_tests.append(test)
            return resulting_tests
        # No parameterized actions were found, so just return the test actions.
        return [test_actions]

    result_tests = []
    for test in coverage_tests:
        expanded_tests = get_all_parameterized_tests(test.actions)
        logging.info(f"Generated {len(expanded_tests)} test/s from {test.id}")
        for resulting_test in get_all_parameterized_tests(test.actions):
            result_tests.append(CoverageTest(resulting_test, test.platforms))
    return result_tests


def filter_coverage_tests_for_platform(tests: List[CoverageTest],
                                       platform: TestPlatform
                                       ) -> List[CoverageTest]:
    def IsSupportedOnPlatform(test: CoverageTest):
        return platform in test.platforms

    return list(filter(IsSupportedOnPlatform, tests))


def partition_framework_tests_per_platform_combination(
        generated_tests_per_platform: CoverageTestsByPlatform
) -> CoverageTestsByPlatformSet:
    test_id_to_platforms = defaultdict(lambda: set())
    test_id_to_test = {}
    platform_set_to_tests = defaultdict(lambda: list())
    for platform, tests in generated_tests_per_platform.items():
        for test in tests:
            test_id_to_platforms[test.id].add(platform)
            if test.id not in test_id_to_test:
                test_id_to_test[test.id] = CoverageTest(test.actions, set())
            test_id_to_test[test.id].platforms.add(platform)
    for test_id, platforms in test_id_to_platforms.items():
        platforms = frozenset(platforms)
        platform_set_to_tests[platforms].append(test_id_to_test[test_id])
    return platform_set_to_tests