chromium/testing/flake_suppressor_common/results.py

# Copyright 2021 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module for working with BigQuery results."""

import collections
import datetime
import os
from collections import defaultdict
from typing import List, Tuple

from flake_suppressor_common import common_typing as ct
from flake_suppressor_common import data_types
from flake_suppressor_common import expectations
from flake_suppressor_common import tag_utils

from typ import expectations_parser


class ResultProcessor():
  def __init__(self, expectations_processor: expectations.ExpectationProcessor):
    self._expectations_processor = expectations_processor

  def AggregateResults(self,
                       results: ct.QueryJsonType) -> ct.AggregatedResultsType:
    """Aggregates BigQuery results.

    Also filters out any results that have already been suppressed.

    Args:
      results: Parsed JSON results from a BigQuery query.

    Returns:
      A map in the following format:
      {
        'test_suite': {
          'test_name': {
            'typ_tags_as_tuple': [ 'list', 'of', 'urls' ],
          },
        },
      }
    """
    results = self._ConvertJsonResultsToResultObjects(results)
    results = self._FilterOutSuppressedResults(results)
    aggregated_results = {}
    for r in results:
      build_url = 'http://ci.chromium.org/b/%s' % r.build_id

      build_url_list = aggregated_results.setdefault(r.suite, {}).setdefault(
          r.test, {}).setdefault(r.tags, [])
      build_url_list.append(build_url)
    return aggregated_results

  def AggregateTestStatusResults(
      self, results: ct.QueryJsonType) -> ct.AggregatedStatusResultsType:
    """Aggregates BigQuery results.

    Also filters out any results that have already been suppressed.

    Args:
      results: Parsed JSON results from a BigQuery query.

    Returns:
      A map in the following format:
      {
        'test_suite': {
          'test_name': {
            ('typ', 'tags', 'as', 'tuple'):
            [ (status, url, date, is_slow, typ_expectations),
              (status, url, date, is_slow, typ_expectations) ],
          },
        },
      }
    """
    results = self._ConvertJsonResultsToResultObjects(results)
    results = self._FilterOutSuppressedResults(results)
    aggregated_results = defaultdict(
        lambda: defaultdict(lambda: defaultdict(list)))
    for r in results:
      build_url = 'http://ci.chromium.org/b/%s' % r.build_id
      aggregated_results[r.suite][r.test][r.tags].append(
          ct.ResultTupleType(r.status, build_url, r.date, r.is_slow,
                             r.typ_expectations))
    return aggregated_results

  def _ConvertJsonResultsToResultObjects(self, results: ct.QueryJsonType
                                         ) -> List[data_types.Result]:
    """Converts JSON BigQuery results to data_types.Result objects.

    Args:
      results: Parsed JSON results from a BigQuery query

    Returns:
      The contents of |results| as a list of data_types.Result objects.
    """
    object_results = []
    for r in results:
      suite, test_name = self.GetTestSuiteAndNameFromResultDbName(r['name'])
      build_id = r['id'].split('-')[-1]
      typ_tags = tuple(tag_utils.TagUtils.RemoveIgnoredTags(r['typ_tags']))
      status = None
      date = None
      is_slow = None
      typ_expectations = None
      if 'status' in r:
        status = r['status']
      if 'date' in r:
        date = datetime.date.fromisoformat(r['date'])
      if 'is_slow' in r:
        is_slow = r['is_slow']
      if 'typ_expectations' in r:
        typ_expectations = r['typ_expectations']
      object_results.append(
          data_types.Result(suite, test_name, typ_tags, build_id, status, date,
                            is_slow, typ_expectations))
    return object_results

  def _FilterOutSuppressedResults(self, results: List[data_types.Result]
                                  ) -> List[data_types.Result]:
    """Filters out results that have already been suppressed in the repo.

    Args:
      results: A list of data_types.Result objects.

    Returns:
      |results| with any already-suppressed failures removed.
    """
    # Get all the expectations.
    origin_expectation_contents = (
        self._expectations_processor.GetLocalCheckoutExpectationFileContents())
    origin_expectations = collections.defaultdict(list)
    for filename, contents in origin_expectation_contents.items():
      list_parser = expectations_parser.TaggedTestListParser(contents)
      for e in list_parser.expectations:
        expectation = data_types.Expectation(e.test, e.tags, e.raw_results,
                                             e.reason)
        origin_expectations[filename].append(expectation)

    # Discard any results that already have a matching expectation.
    kept_results = []
    for r in results:
      expectation_filename = (
          self._expectations_processor.GetExpectationFileForSuite(
              r.suite, r.tags))
      expectation_filename = os.path.basename(expectation_filename)
      should_keep = True
      for e in origin_expectations[expectation_filename]:
        if e.AppliesToResult(r):
          should_keep = False
          break
      if should_keep:
        kept_results.append(r)

    return kept_results

  def GetTestSuiteAndNameFromResultDbName(self, result_db_name: str
                                          ) -> Tuple[str, str]:
    raise NotImplementedError