chromium/testing/flake_suppressor_common/expectations.py

# Copyright 2021 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module for interacting with expectation files."""

import base64
import collections
from datetime import timedelta, date
import itertools
import os
import posixpath
import re
from typing import Dict, List, Set, Tuple, Union
import urllib.request

from flake_suppressor_common import common_typing as ct

from typ import expectations_parser

CHROMIUM_SRC_DIR = os.path.realpath(
    os.path.join(os.path.dirname(__file__), '..', '..'))
GITILES_URL = 'https://chromium.googlesource.com/chromium/src/+/refs/heads/main'
TEXT_FORMAT_ARG = '?format=TEXT'

TAG_GROUP_REGEX = re.compile(r'# tags: \[([^\]]*)\]', re.MULTILINE | re.DOTALL)

TestToUrlsType = Dict[str, List[str]]
SuiteToTestsType = Dict[str, TestToUrlsType]
TagOrderedAggregateResultType = Dict[ct.TagTupleType, SuiteToTestsType]


def OverFailedBuildThreshold(failed_result_tuple_list: List[ct.ResultTupleType],
                             build_fail_total_number_threshold: int) -> bool:
  """Check if the number of failed build in |failed_result_tuple_list| is
     equal to or more than |build_fail_total_number_threshold|.

  Args:
    failed_result_tuple_list: A list of ct.ResultTupleType failed test results.
    build_fail_total_number_threshold: Threshold base on the number of failed
      build caused by a test.

  Returns:
      Whether number of failed build in |failed_result_tuple_list| is equal to
      or more than |build_fail_total_number_threshold|.
  """
  unique_build_ids = set()
  for result in failed_result_tuple_list:
    if '/' in result.build_url:
      unique_build_ids.add(result.build_url.split('/')[-1])
      if len(unique_build_ids) >= build_fail_total_number_threshold:
        return True
  return False


def OverFailedBuildByConsecutiveDayThreshold(
    failed_result_tuple_list: List[ct.ResultTupleType],
    build_fail_consecutive_day_threshold: int) -> bool:
  """Check if the max number of build fail in consecutive date
     is equal to or more than |build_fail_consecutive_day_threshold|.

  Args:
    failed_result_tuple_list: A list of ct.ResultTupleType failed test result.
    build_fail_consecutive_day_threshold: Threshold base on the number of
      consecutive days that a test caused build fail.

  Returns:
      Whether the max number of build fail in consecutive date
      is equal to or more than |build_fail_consecutive_day_threshold|.
  """
  dates = {t.date: False for t in failed_result_tuple_list}

  for cur_date, is_checked in dates.items():
    # A beginning point.
    if not is_checked:
      count = 1

      while count < build_fail_consecutive_day_threshold:
        new_date = cur_date + timedelta(days=count)
        if new_date in dates:
          count += 1
          # Mark checked date.
          dates[new_date] = True
        else:
          break

      if count >= build_fail_consecutive_day_threshold:
        return True

  return False


def FailedBuildWithinRecentDayThreshold(
    failed_result_tuple_list: List[ct.ResultTupleType],
    build_fail_recent_day_threshold: int) -> bool:
  """Check if there are any failed builds within the most
    recent |build_fail_latest_day_threshold| days.

  Args:
    failed_result_tuple_list: A list of ct.ResultTupleType failed test result.
    build_fail_recent_day_threshold: Threshold base on the recent day range
      that the test caused build fail.

  Returns:
      Whether the test caused build fail within the recent day.
  """
  recent_check_day = date.today() - timedelta(
      days=build_fail_recent_day_threshold)
  for test in failed_result_tuple_list:
    if test.date >= recent_check_day:
      return True
  return False


class ExpectationProcessor():
  # pylint: disable=too-many-locals
  def IterateThroughResultsForUser(self, result_map: ct.AggregatedResultsType,
                                   group_by_tags: bool,
                                   include_all_tags: bool) -> None:
    """Iterates over |result_map| for the user to provide input.

    For each unique result, user will be able to decide whether to ignore it (do
    nothing), mark as flaky (add RetryOnFailure expectation), or mark as failing
    (add Failure expectation). If the latter two are chosen, they can also
    associate a bug with the new expectation.

    Args:
      result_map: Aggregated query results from results.AggregateResults to
          iterate over.
      group_by_tags: A boolean denoting whether to attempt to group expectations
          by tags or not. If True, expectations will be added after an existing
          expectation whose tags are the largest subset of the produced tags. If
          False, new expectations will be appended to the end of the file.
      include_all_tags: A boolean denoting whether all tags should be used for
          expectations or only the most specific ones.
    """
    typ_tag_ordered_result_map = self._ReorderMapByTypTags(result_map)
    for suite, test_map in result_map.items():
      if self.IsSuiteUnsupported(suite):
        continue
      for test, tag_map in test_map.items():
        for typ_tags, build_url_list in tag_map.items():

          print('')
          print('Suite: %s' % suite)
          print('Test: %s' % test)
          print('Configuration:\n    %s' % '\n    '.join(typ_tags))
          print('Failed builds:\n    %s' % '\n    '.join(build_url_list))

          other_failures_for_test = self.FindFailuresInSameTest(
              result_map, suite, test, typ_tags)
          if other_failures_for_test:
            print('Other failures in same test found on other configurations')
            for (tags, failure_count) in other_failures_for_test:
              print('    %d failures on %s' % (failure_count, ' '.join(tags)))

          other_failures_for_config = self.FindFailuresInSameConfig(
              typ_tag_ordered_result_map, suite, test, typ_tags)
          if other_failures_for_config:
            print('Other failures on same configuration found in other tests')
            for (name, failure_count) in other_failures_for_config:
              print('    %d failures in %s' % (failure_count, name))

          expected_result, bug = self.PromptUserForExpectationAction()
          if not expected_result:
            continue

          self.ModifyFileForResult(suite, test, typ_tags, bug, expected_result,
                                   group_by_tags, include_all_tags)

  # pylint: enable=too-many-locals

  # pylint: disable=too-many-locals,too-many-arguments
  def IterateThroughResultsWithThresholds(
      self, result_map: ct.AggregatedResultsType, group_by_tags: bool,
      result_counts: ct.ResultCountType, ignore_threshold: float,
      flaky_threshold: float, include_all_tags: bool) -> None:
    """Iterates over |result_map| and generates expectations based off
       thresholds.

    Args:
      result_map: Aggregated query results from results.AggregateResults to
          iterate over.
      group_by_tags: A boolean denoting whether to attempt to group expectations
          by tags or not. If True, expectations will be added after an existing
          expectation whose tags are the largest subset of the produced tags. If
          False, new expectations will be appended to the end of the file.
      result_counts: A dict in the format output by queries.GetResultCounts.
      ignore_threshold: A float containing the fraction of failed tests under
          which failures will be ignored.
      flaky_threshold: A float containing the fraction of failed tests under
          which failures will be suppressed with RetryOnFailure and above which
          will be suppressed with Failure.
      include_all_tags: A boolean denoting whether all tags should be used for
          expectations or only the most specific ones.
    """
    assert isinstance(ignore_threshold, float)
    assert isinstance(flaky_threshold, float)
    for suite, test_map in result_map.items():
      if self.IsSuiteUnsupported(suite):
        continue
      for test, tag_map in test_map.items():
        for typ_tags, build_url_list in tag_map.items():
          failure_count = len(build_url_list)
          total_count = result_counts[typ_tags][test]
          fraction = failure_count / total_count
          if fraction < ignore_threshold:
            continue
          expected_result = self.GetExpectedResult(fraction, flaky_threshold)
          if expected_result:
            self.ModifyFileForResult(suite, test, typ_tags, '', expected_result,
                                     group_by_tags, include_all_tags)

  def CreateExpectationsForAllResults(
      self, result_map: ct.AggregatedStatusResultsType, group_by_tags: bool,
      include_all_tags: bool, build_fail_total_number_threshold: int,
      build_fail_consecutive_day_threshold: int,
      build_fail_recent_day_threshold: int) -> None:
    """Iterates over |result_map|, selects tests that hit all
       build-fail*-thresholds and adds expectations for their results. Same
       test in all builders that caused build fail must be over all threshold
       requirement.

    Args:
      result_map: Aggregated query results from results.AggregateResults to
          iterate over.
      group_by_tags: A boolean denoting whether to attempt to group expectations
          by tags or not. If True, expectations will be added after an existing
          expectation whose tags are the largest subset of the produced tags. If
          False, new expectations will be appended to the end of the file.
      include_all_tags: A boolean denoting whether all tags should be used for
          expectations or only the most specific ones.
      build_fail_total_number_threshold: Threshold based on the number of
          failed builds caused by a test. Add to the expectations, if actual
          is equal to or more than this threshold. All build-fail*-thresholds
          must be hit in order for a test to actually be suppressed.
      build_fail_consecutive_day_threshold: Threshold based on the number of
          consecutive days that a test caused build fail. Add to the
          expectations, if the consecutive days that it caused build fail
          are equal to or more than this. All build-fail*-thresholds
          must be hit in order for a test to actually be suppressed.
      build_fail_recent_day_threshold: How many days worth of recent builds
          to check for non-hidden failures. A test will be suppressed if
          it has non-hidden failures within this time span. All
          build-fail*-thresholds must be hit in order for a test to actually
          be suppressed.
    """
    for suite, test_map in result_map.items():
      if self.IsSuiteUnsupported(suite):
        continue
      for test, tag_map in test_map.items():
        # Same test in all builders that caused build fail must be over all
        # threshold requirement.
        all_results = list(itertools.chain(*tag_map.values()))
        if (not OverFailedBuildThreshold(all_results,
                                         build_fail_total_number_threshold)
            or not OverFailedBuildByConsecutiveDayThreshold(
                all_results, build_fail_consecutive_day_threshold)):
          continue
        for typ_tags, result_tuple_list in tag_map.items():
          if not FailedBuildWithinRecentDayThreshold(
              result_tuple_list, build_fail_recent_day_threshold):
            continue
          status = set()
          for test_result in result_tuple_list:
            # Should always add a pass to all flaky web tests in
            # TestsExpectation that have passed runs.
            status.add('Pass')
            if test_result.status == ct.ResultStatus.CRASH:
              status.add('Crash')
            elif test_result.status == ct.ResultStatus.FAIL:
              status.add('Failure')
            elif test_result.status == ct.ResultStatus.ABORT:
              status.add('Timeout')
          if status:
            status_list = list(status)
            status_list.sort()
            self.ModifyFileForResult(suite, test, typ_tags, '',
                                     ' '.join(status_list), group_by_tags,
                                     include_all_tags)

  # pylint: enable=too-many-locals,too-many-arguments

  def FindFailuresInSameTest(self, result_map: ct.AggregatedResultsType,
                             target_suite: str, target_test: str,
                             target_typ_tags: ct.TagTupleType
                             ) -> List[Tuple[ct.TagTupleType, int]]:
    """Finds all other failures that occurred in the given test.

    Ignores the failures for the test on the same configuration.

    Args:
      result_map: Aggregated query results from results.AggregateResults.
      target_suite: A string containing the test suite being checked.
      target_test: A string containing the target test case being checked.
      target_typ_tags: A tuple of strings containing the typ tags that the
          failure took place on.

    Returns:
      A list of tuples (typ_tags, count). |typ_tags| is a list of strings
      defining a configuration the specified test failed on. |count| is how many
      times the test failed on that configuration.
    """
    assert isinstance(target_typ_tags, tuple)
    other_failures = []
    tag_map = result_map.get(target_suite, {}).get(target_test, {})
    for typ_tags, build_url_list in tag_map.items():
      if typ_tags == target_typ_tags:
        continue
      other_failures.append((typ_tags, len(build_url_list)))
    return other_failures

  def FindFailuresInSameConfig(
      self, typ_tag_ordered_result_map: TagOrderedAggregateResultType,
      target_suite: str, target_test: str,
      target_typ_tags: ct.TagTupleType) -> List[Tuple[str, int]]:
    """Finds all other failures that occurred on the given configuration.

    Ignores the failures for the given test on the given configuration.

    Args:
      typ_tag_ordered_result_map: Aggregated query results from
          results.AggregateResults that have been reordered using
          _ReorderMapByTypTags.
      target_suite: A string containing the test suite the original failure was
          found in.
      target_test: A string containing the test case the original failure was
          found in.
      target_typ_tags: A tuple of strings containing the typ tags defining the
          configuration to find failures for.

    Returns:
      A list of tuples (full_name, count). |full_name| is a string containing a
      test suite and test case concatenated together. |count| is how many times
      |full_name| failed on the configuration specified by |target_typ_tags|.
    """
    assert isinstance(target_typ_tags, tuple)
    other_failures = []
    suite_map = typ_tag_ordered_result_map.get(target_typ_tags, {})
    for suite, test_map in suite_map.items():
      for test, build_url_list in test_map.items():
        if suite == target_suite and test == target_test:
          continue
        full_name = '%s.%s' % (suite, test)
        other_failures.append((full_name, len(build_url_list)))
    return other_failures

  def _ReorderMapByTypTags(self, result_map: ct.AggregatedResultsType
                           ) -> TagOrderedAggregateResultType:
    """Rearranges|result_map| to use typ tags as the top level keys.

    Args:
      result_map: Aggregated query results from results.AggregateResults

    Returns:
      A dict containing the same contents as |result_map|, but in the following
      format:
      {
        typ_tags (tuple of str): {
          suite (str): {
            test (str): build_url_list (list of str),
          },
        },
      }
    """
    reordered_map = {}
    for suite, test_map in result_map.items():
      for test, tag_map in test_map.items():
        for typ_tags, build_url_list in tag_map.items():
          reordered_map.setdefault(typ_tags,
                                   {}).setdefault(suite,
                                                  {})[test] = build_url_list
    return reordered_map

  def PromptUserForExpectationAction(
      self) -> Union[Tuple[str, str], Tuple[None, None]]:
    """Prompts the user on what to do to handle a failure.

    Returns:
      A tuple (expected_result, bug). |expected_result| is a string containing
      the expected result to use for the expectation, e.g. RetryOnFailure. |bug|
      is a string containing the bug to use for the expectation. If the user
      chooses to ignore the failure, both will be None. Otherwise, both are
      filled, although |bug| may be an empty string if no bug is provided.
    """
    prompt = ('How should this failure be handled? (i)gnore/(r)etry on '
              'failure/(f)ailure: ')
    valid_inputs = ['f', 'i', 'r']
    response = input(prompt).lower()
    while response not in valid_inputs:
      print('Invalid input, valid inputs are %s' % (', '.join(valid_inputs)))
      response = input(prompt).lower()

    if response == 'i':
      return (None, None)
    expected_result = 'RetryOnFailure' if response == 'r' else 'Failure'

    prompt = ('What is the bug URL that should be associated with this '
              'expectation? E.g. crbug.com/1234. ')
    response = input(prompt)
    return (expected_result, response)

  # pylint: disable=too-many-locals,too-many-arguments
  def ModifyFileForResult(self, suite: str, test: str,
                          typ_tags: ct.TagTupleType, bug: str,
                          expected_result: str, group_by_tags: bool,
                          include_all_tags: bool) -> None:
    """Adds an expectation to the appropriate expectation file.

    Args:
      suite: A string containing the suite the failure occurred in.
      test: A string containing the test case the failure occurred in.
      typ_tags: A tuple of strings containing the typ tags the test produced.
      bug: A string containing the bug to associate with the new expectation.
      expected_result: A string containing the expected result to use for the
          new expectation, e.g. RetryOnFailure.
      group_by_tags: A boolean denoting whether to attempt to group expectations
          by tags or not. If True, expectations will be added after an existing
          expectation whose tags are the largest subset of the produced tags. If
          False, new expectations will be appended to the end of the file.
      include_all_tags: A boolean denoting whether all tags should be used for
          expectations or only the most specific ones.
    """
    expectation_file = self.GetExpectationFileForSuite(suite, typ_tags)
    if not include_all_tags:
      typ_tags = self.FilterToMostSpecificTypTags(typ_tags, expectation_file)
    bug = '%s ' % bug if bug else bug

    def AppendExpectationToEnd():
      expectation_line = '%s[ %s ] %s [ %s ]\n' % (bug, ' '.join(
          self.ProcessTypTagsBeforeWriting(typ_tags)), test, expected_result)
      with open(expectation_file, 'a') as outfile:
        outfile.write(expectation_line)

    if group_by_tags:
      insertion_line, best_matching_tags = (
          self.FindBestInsertionLineForExpectation(typ_tags, expectation_file))
      if insertion_line == -1:
        AppendExpectationToEnd()
      else:
        # If we've already filtered tags, then use those instead of the "best
        # matching" ones.
        tags_to_use = best_matching_tags
        if not include_all_tags:
          tags_to_use = typ_tags
        # enumerate starts at 0 but line numbers start at 1.
        insertion_line -= 1
        tags_to_use = list(self.ProcessTypTagsBeforeWriting(tags_to_use))
        tags_to_use.sort()
        expectation_line = '%s[ %s ] %s [ %s ]\n' % (bug, ' '.join(tags_to_use),
                                                     test, expected_result)
        with open(expectation_file) as infile:
          input_contents = infile.read()
        output_contents = ''
        for lineno, line in enumerate(input_contents.splitlines(True)):
          output_contents += line
          if lineno == insertion_line:
            output_contents += expectation_line
        with open(expectation_file, 'w') as outfile:
          outfile.write(output_contents)
    else:
      AppendExpectationToEnd()

  # pylint: enable=too-many-locals,too-many-arguments

  # pylint: disable=too-many-locals
  def FilterToMostSpecificTypTags(self, typ_tags: ct.TagTupleType,
                                  expectation_file: str) -> ct.TagTupleType:
    """Filters |typ_tags| to the most specific set.

    Assumes that the tags in |expectation_file| are ordered from least specific
    to most specific within each tag group.

    Args:
      typ_tags: A tuple of strings containing the typ tags the test produced.
      expectation_file: A string containing a filepath pointing to the
          expectation file to filter tags with.

    Returns:
      A tuple containing the contents of |typ_tags| with only the most specific
      tag from each tag group remaining.
    """
    with open(expectation_file) as infile:
      contents = infile.read()

    tag_groups = self.GetTagGroups(contents)
    num_matches = 0
    tags_in_same_group = collections.defaultdict(list)
    for tag in typ_tags:
      for index, tag_group in enumerate(tag_groups):
        if tag in tag_group:
          tags_in_same_group[index].append(tag)
          num_matches += 1
          break
    if num_matches != len(typ_tags):
      all_tags = set()
      for group in tag_groups:
        all_tags |= set(group)
      raise RuntimeError('Found tags not in expectation file: %s' %
                         ' '.join(set(typ_tags) - all_tags))

    filtered_tags = []
    for index, tags in tags_in_same_group.items():
      if len(tags) == 1:
        filtered_tags.append(tags[0])
      else:
        tag_group = tag_groups[index]
        best_index = -1
        for t in tags:
          i = tag_group.index(t)
          if i > best_index:
            best_index = i
        filtered_tags.append(tag_group[best_index])

    # Sort to keep order consistent with what we were given.
    filtered_tags.sort()
    return tuple(filtered_tags)

  # pylint: enable=too-many-locals

  def FindBestInsertionLineForExpectation(self, typ_tags: ct.TagTupleType,
                                          expectation_file: str
                                          ) -> Tuple[int, Set[str]]:
    """Finds the best place to insert an expectation when grouping by tags.

    Args:
      typ_tags: A tuple of strings containing typ tags that were produced by the
          failing test.
      expectation_file: A string containing a filepath to the expectation file
      to use.

    Returns:
      A tuple (insertion_line, best_matching_tags). |insertion_line| is an int
      specifying the line number to insert the expectation into.
      |best_matching_tags| is a set containing the tags of an existing
      expectation that was found to be the closest match. If no appropriate
      line is found, |insertion_line| is -1 and |best_matching_tags| is empty.
    """
    best_matching_tags = set()
    best_insertion_line = -1
    with open(expectation_file) as f:
      content = f.read()
    list_parser = expectations_parser.TaggedTestListParser(content)
    for e in list_parser.expectations:
      expectation_tags = e.tags
      if not expectation_tags.issubset(typ_tags):
        continue
      if len(expectation_tags) > len(best_matching_tags):
        best_matching_tags = expectation_tags
        best_insertion_line = e.lineno
      elif len(expectation_tags) == len(best_matching_tags):
        if best_insertion_line < e.lineno:
          best_insertion_line = e.lineno
    return best_insertion_line, best_matching_tags

  def GetOriginExpectationFileContents(self) -> Dict[str, str]:
    """Gets expectation file contents from origin/main.

    Returns:
      A dict of expectation file name (str) -> expectation file contents (str)
      that are available on origin/main. File paths are relative to the
      Chromium src dir and are OS paths.
    """
    # Get the path to the expectation file directory in gitiles, i.e. the POSIX
    # path relative to the Chromium src directory.
    origin_file_contents = {}
    expectation_files = self.ListOriginExpectationFiles()
    for f in expectation_files:
      filepath_posix = f.replace(os.sep, '/')
      origin_filepath_url = posixpath.join(GITILES_URL,
                                           filepath_posix) + TEXT_FORMAT_ARG
      response = urllib.request.urlopen(origin_filepath_url).read()
      decoded_text = base64.b64decode(response).decode('utf-8')
      # After the URL access maintain all the paths as os paths.
      origin_file_contents[f] = decoded_text

    return origin_file_contents

  def GetLocalCheckoutExpectationFileContents(self) -> Dict[str, str]:
    """Gets expectation file contents from the local checkout.

    Returns:
      A dict of expectation file name (str) -> expectation file contents (str)
      that are available from the local checkout. File paths are relative to
      the Chromium src dir and are OS paths.
    """
    local_file_contents = {}
    expectation_files = self.ListLocalCheckoutExpectationFiles()
    for f in expectation_files:
      absolute_filepath = os.path.join(CHROMIUM_SRC_DIR, f)
      with open(absolute_filepath) as infile:
        local_file_contents[f] = infile.read()
    return local_file_contents

  def AssertCheckoutIsUpToDate(self) -> None:
    """Confirms that the local checkout's expectations are up to date."""
    origin_file_contents = self.GetOriginExpectationFileContents()
    local_file_contents = self.GetLocalCheckoutExpectationFileContents()
    if origin_file_contents != local_file_contents:
      raise RuntimeError(
          'Local Chromium checkout expectations are out of date. Please '
          'perform a `git pull`.')

  def GetExpectationFileForSuite(self, suite: str,
                                 typ_tags: ct.TagTupleType) -> str:
    """Finds the correct expectation file for the given suite.

    Args:
      suite: A string containing the test suite to look for.
      typ_tags: A tuple of strings containing typ tags that were produced by
          the failing test.

    Returns:
      A string containing a filepath to the correct expectation file for
      |suite|and |typ_tags|.
    """
    raise NotImplementedError

  def ListGitilesDirectory(self, origin_dir: str) -> List[str]:
    """Gets the list of all files from origin/main under origin_dir.

    Args:
      origin_dir: A string containing the path to the directory containing
      expectation files. Path is relative to the Chromium src dir.

    Returns:
      A list of filename strings under origin_dir.
    """
    origin_dir_url = posixpath.join(GITILES_URL, origin_dir) + TEXT_FORMAT_ARG
    response = urllib.request.urlopen(origin_dir_url).read()
    # Response is a base64 encoded, newline-separated list of files in the
    # directory in the format: `mode file_type hash name`
    files = []
    decoded_text = base64.b64decode(response).decode('utf-8')
    for line in decoded_text.splitlines():
      files.append(line.split()[-1])
    return files

  def IsSuiteUnsupported(self, suite: str) -> bool:
    raise NotImplementedError

  def ListLocalCheckoutExpectationFiles(self) -> List[str]:
    """Finds the list of all expectation files from the local checkout.

    Returns:
      A list of strings containing relative file paths to expectation files.
      OS paths relative to Chromium src dir are returned.
    """
    raise NotImplementedError

  def ListOriginExpectationFiles(self) -> List[str]:
    """Finds the list of all expectation files from origin/main.

    Returns:
      A list of strings containing relative file paths to expectation files.
      OS paths are relative to Chromium src directory.
    """
    raise NotImplementedError

  def GetTagGroups(self, contents: str) -> List[List[str]]:
    tag_groups = []
    for match in TAG_GROUP_REGEX.findall(contents):
      tag_groups.append(match.strip().replace('#', '').split())
    return tag_groups

  def GetExpectedResult(self, fraction: float, flaky_threshold: float) -> str:
    raise NotImplementedError

  def ProcessTypTagsBeforeWriting(self,
                                  typ_tags: ct.TagTupleType) -> ct.TagTupleType:
    return typ_tags