chromium/testing/unexpected_passes_common/queries.py

# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Methods related to querying the ResultDB BigQuery tables."""

import logging
import time
from typing import Collection, Dict, Generator, Iterable, List, Optional, Tuple

from google.cloud import bigquery
from google.cloud import bigquery_storage
import pandas

from typ import expectations_parser
from typ import json_results
from unexpected_passes_common import constants
from unexpected_passes_common import data_types
from unexpected_passes_common import expectations

DEFAULT_NUM_SAMPLES = 100

# Subquery for getting all try builds that were used for CL submission. 30 days
# is chosen because the ResultDB tables we pull data from only keep data around
# for 30 days.
PARTITIONED_SUBMITTED_BUILDS_TEMPLATE = """\
    SELECT
      CONCAT("build-", CAST(unnested_builds.id AS STRING)) as id
    FROM
      `commit-queue.{project_view}.attempts`,
      UNNEST(builds) as unnested_builds,
      UNNEST(gerrit_changes) as unnested_changes
    WHERE
      unnested_builds.host = "cr-buildbucket.appspot.com"
      AND unnested_changes.submit_status = "SUCCESS"
      AND start_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(),
                                     INTERVAL 30 DAY)"""

QueryResult = pandas.Series


class BigQueryQuerier:
  """Class to handle all BigQuery queries for a script invocation."""

  def __init__(self, suite: Optional[str], project: str, num_samples: int,
               keep_unmatched_results: bool):
    """
    Args:
      suite: A string containing the name of the suite that is being queried
          for. Can be None if there is no differentiation between different
          suites.
      project: A string containing the billing project to use for BigQuery.
      num_samples: An integer containing the number of builds to pull results
          from.
      keep_unmatched_results: Whether to store and return unmatched results
          for debugging purposes.
    """
    self._suite = suite
    self._project = project
    self._num_samples = num_samples or DEFAULT_NUM_SAMPLES
    self._keep_unmatched_results = keep_unmatched_results

    assert self._num_samples > 0

  def FillExpectationMapForBuilders(
      self, expectation_map: data_types.TestExpectationMap,
      builders: Collection[data_types.BuilderEntry]
  ) -> Dict[str, data_types.ResultListType]:
    """Fills |expectation_map| with results from |builders|.

    Args:
      expectation_map: A data_types.TestExpectationMap. Will be modified
          in-place.
      builders: An iterable of data_types.BuilderEntry containing the builders
          to query.

    Returns:
      A dict containing any results that were retrieved that did not have a
      matching expectation in |expectation_map| in the following format:
      {
        |builder_type|:|builder_name| (str): [
          result1 (data_types.Result),
          result2 (data_types.Result),
          ...
        ],
      }
    """
    start_time = time.time()
    logging.debug('Starting to fill expectation map for %d builders',
                  len(builders))
    assert isinstance(expectation_map, data_types.TestExpectationMap)
    # Ensure that all the builders are of the same type since we make some
    # assumptions about that later on.
    assert builders
    builder_type = None
    for b in builders:
      if builder_type is None:
        builder_type = b.builder_type
      else:
        assert b.builder_type == builder_type

    internal_statuses = set()
    for b in builders:
      internal_statuses.add(b.is_internal_builder)

    matched_builders = set()
    all_unmatched_results = {}
    for internal in internal_statuses:
      for builder_name, results, expectation_files in (
          self.GetBuilderGroupedQueryResults(builder_type, internal)):
        matching_builder = None
        for b in builders:
          if b.name == builder_name and b.is_internal_builder == internal:
            matching_builder = b
            break

        if not matching_builder:
          logging.warning(
              'Did not find a matching builder for name %s and '
              'internal status %s. This is normal if the builder '
              'is no longer running tests (e.g. it was '
              'experimental).', builder_name, internal)
          continue

        if matching_builder in matched_builders:
          raise RuntimeError(
              f'Got query result batches matched to builder '
              f'{matching_builder} twice - this is indicative of a malformed '
              f'query returning results that are not sorted by builder')
        matched_builders.add(matching_builder)

        prefixed_builder_name = '%s/%s:%s' % (matching_builder.project,
                                              matching_builder.builder_type,
                                              matching_builder.name)
        unmatched_results = expectation_map.AddResultList(
            prefixed_builder_name, results, expectation_files)
        if self._keep_unmatched_results:
          if unmatched_results:
            all_unmatched_results[prefixed_builder_name] = unmatched_results
        else:
          logging.info('Dropping %d unmatched results', len(unmatched_results))

    logging.debug('Filling expectation map took %f', time.time() - start_time)
    return all_unmatched_results

  def GetBuilderGroupedQueryResults(
      self, builder_type: str, is_internal: bool
  ) -> Generator[Tuple[str, data_types.ResultListType, Optional[List[str]]],
                 None, None]:
    """Generates results for all relevant builders grouped by builder name.

    Args:
      builder_type: Whether the builders are CI or try builders.
      is_internal: Whether the builders are internal.

    Yields:
      A tuple (builder_name, results). |builder_name| is a string specifying the
      builder that |results| came from. |results| is a data_types.ResultListType
      containing all the results for |builder_name|.
    """
    if builder_type == constants.BuilderTypes.CI:
      if is_internal:
        query = self._GetInternalCiQuery()
      else:
        query = self._GetPublicCiQuery()
    elif builder_type == constants.BuilderTypes.TRY:
      if is_internal:
        query = self._GetInternalTryQuery()
      else:
        query = self._GetPublicTryQuery()
    else:
      raise RuntimeError(f'Unknown builder type {builder_type}')

    current_builder = None
    rows_for_builder = []
    for row in self._GetSeriesForQuery(query):
      if current_builder is None:
        current_builder = row.builder_name
      if row.builder_name != current_builder:
        results_for_builder, expectation_files = self._ProcessRowsForBuilder(
            rows_for_builder)
        # The processing should have cleared out all the stored rows.
        assert not rows_for_builder
        yield current_builder, results_for_builder, expectation_files
        current_builder = row.builder_name
      rows_for_builder.append(row)

    if current_builder is None:
      logging.warning(
          'Did not get any results for builder type %s and internal status %s. '
          'Depending on where tests are run and how frequently trybots are '
          'used for submission, this may be benign.', builder_type, is_internal)

    if current_builder is not None and rows_for_builder:
      results_for_builder, expectation_files = self._ProcessRowsForBuilder(
          rows_for_builder)
      assert not rows_for_builder
      yield current_builder, results_for_builder, expectation_files

  def _GetSeriesForQuery(self,
                         query: str) -> Generator[pandas.Series, None, None]:
    """Generates results for |query|.

    Args:
      query: A string containing the BigQuery query to run.

    Yields:
      A pandas.Series object for each row returned by the query. Columns can be
      accessed directly as attributes.
    """
    client = bigquery.Client(project=self._project)
    job = client.query(query)
    row_iterator = job.result()
    # Using a Dataframe iterator instead of directly using |row_iterator| allows
    # us to use the BigQuery Storage API, which results in ~10x faster query
    # result retrieval at the cost of a few more dependencies.
    dataframe_iterator = row_iterator.to_dataframe_iterable(
        bigquery_storage.BigQueryReadClient())
    for df in dataframe_iterator:
      for _, row in df.iterrows():
        yield row

  def _GetPublicCiQuery(self) -> str:
    """Returns the BigQuery query for public CI builder results."""
    raise NotImplementedError()

  def _GetInternalCiQuery(self) -> str:
    """Returns the BigQuery query for internal CI builder results."""
    raise NotImplementedError()

  def _GetPublicTryQuery(self) -> str:
    """Returns the BigQuery query for public try builder results."""
    raise NotImplementedError()

  def _GetInternalTryQuery(self) -> str:
    """Returns the BigQuery query for internal try builder results."""
    raise NotImplementedError()

  def _ProcessRowsForBuilder(
      self, rows: List[QueryResult]
  ) -> Tuple[data_types.ResultListType, Optional[List[str]]]:
    """Processes rows from a query into data_types.Result representations.

    Args:
      rows: A list of rows from a BigQuery query.

    Returns:
      A tuple (results, expectation_files). |results| is a list of
      data_types.Result objects. |expectation_files| is the list of expectation
      files that are used by the tests in |results|, but can be None to specify
      that all expectation files should be considered.
    """
    # It's possible that a builder runs multiple versions of a test with
    # different expectation files for each version. So, find a result for each
    # unique step and get the expectation files from all of them.
    results_for_each_step = {}
    for r in rows:
      step_name = r.step_name
      if step_name not in results_for_each_step:
        results_for_each_step[step_name] = r

    expectation_files = set()
    for r in results_for_each_step.values():
      # None is a special value indicating "use all expectation files", so
      # handle that.
      ef = self._GetRelevantExpectationFilesForQueryResult(r)
      if ef is None:
        expectation_files = None
        break
      expectation_files |= set(ef)
    if expectation_files is not None:
      expectation_files = list(expectation_files)

    # The query result list is potentially very large, so reduce the list as we
    # iterate over it instead of using a standard for/in so that we don't
    # temporarily end up with a ~2x increase in memory.
    results = []
    while rows:
      r = rows.pop()
      if self._ShouldSkipOverResult(r):
        continue
      results.append(self._ConvertBigQueryRowToResultObject(r))

    return results, expectation_files

  def _ConvertBigQueryRowToResultObject(self,
                                        row: QueryResult) -> data_types.Result:
    """Converts a single BigQuery result row to a data_types.Result.

    Args:
      row: A single row from BigQuery.

    Returns:
      A data_types.Result object containing the information from |row|.
    """
    build_id = _StripPrefixFromBuildId(row.id)
    test_name = self._StripPrefixFromTestId(row.test_id)
    actual_result = _ConvertActualResultToExpectationFileFormat(row.status)
    tags = expectations.GetInstance().FilterToKnownTags(row.typ_tags)
    step = row.step_name
    return data_types.Result(test_name, tags, actual_result, step, build_id)

  def _GetRelevantExpectationFilesForQueryResult(
      self, query_result: QueryResult) -> Optional[Iterable[str]]:
    """Gets the relevant expectation file names for a given query result.

    Args:
      query_result: An object representing a row/result from a query. Columns
          can be accessed via .column_name.

    Returns:
      An iterable of strings containing expectation file names that are
      relevant to |query_result|, or None if all expectation files should be
      considered relevant.
    """
    raise NotImplementedError()

  def _ShouldSkipOverResult(self, result: QueryResult) -> bool:
    """Whether |result| should be ignored and skipped over.

    Args:
      result: A dict containing a single BigQuery result row.

    Returns:
      True if the result should be skipped over/ignored, otherwise False.
    """
    del result
    return False

  def _StripPrefixFromTestId(self, test_id: str) -> str:
    """Strips the prefix from a test ID, leaving only the test case name.

    Args:
      test_id: A string containing a full ResultDB test ID, e.g.
          ninja://target/directory.suite.class.test_case

    Returns:
      A string containing the test cases name extracted from |test_id|.
    """
    raise NotImplementedError()


def _StripPrefixFromBuildId(build_id: str) -> str:
  # Build IDs provided by ResultDB are prefixed with "build-"
  split_id = build_id.split('-')
  assert len(split_id) == 2
  return split_id[-1]


def _ConvertActualResultToExpectationFileFormat(actual_result: str) -> str:
  # Web tests use ResultDB's ABORT value for both test timeouts and device
  # failures, but Abort is not defined in typ. So, map it to timeout now.
  if actual_result == 'ABORT':
    actual_result = json_results.ResultType.Timeout
  # The result reported to ResultDB is in the format PASS/FAIL, while the
  # expected results in an expectation file are in the format Pass/Failure.
  return expectations_parser.RESULT_TAGS[actual_result]