chromium/tools/perf/core/results_merger.py

# Copyright 2016 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

# This file was copy-pasted over from:
# //build/scripts/slave/recipe_modules/swarming/resources/results_merger.py

# This file is responsbile for merging JSON test results in both the simplified
# JSON format and the Chromium JSON test results format version 3.

from __future__ import print_function

import copy
import json
import sys

import six

# These fields must appear in the test result output
REQUIRED = {
    'interrupted',
    'num_failures_by_type',
    'seconds_since_epoch',
    'tests',
    }

# These fields are optional, but must have the same value on all shards
OPTIONAL_MATCHING = (
    'builder_name',
    'build_number',
    'chromium_revision',
    'has_pretty_patch',
    'has_wdiff',
    'path_delimiter',
    'pixel_tests_enabled',
    'random_order_seed',
    )

OPTIONAL_IGNORED = (
    'layout_tests_dir',
    )

# These fields are optional and will be summed together
OPTIONAL_COUNTS = (
    'fixable',
    'num_flaky',
    'num_passes',
    'num_regressions',
    'skipped',
    'skips',
    )


class MergeException(Exception):
  pass


def merge_test_results(shard_results_list, test_cross_device=False):
  """ Merge list of results.

  Args:
    shard_results_list: list of results to merge. All the results must have the
      same format. Supported format are simplified JSON format & Chromium JSON
      test results format version 3 (see
      https://www.chromium.org/developers/the-json-test-results-format)
    test_cross_device: If true, some tests are running in multiple shards. This
      requires some extra handling on merging the values under 'tests'.

  Returns:
    a dictionary that represent the merged results. Its format follow the same
    format of all results in |shard_results_list|.
  """
  shard_results_list = [x for x in shard_results_list if x]
  if not shard_results_list:
    return {}

  if 'seconds_since_epoch' in shard_results_list[0]:
    return _merge_json_test_result_format(shard_results_list, test_cross_device)
  return _merge_simplified_json_format(shard_results_list)


def _merge_simplified_json_format(shard_results_list):
  # This code is specialized to the "simplified" JSON format that used to be
  # the standard for recipes.

  # These are the only keys we pay attention to in the output JSON.
  merged_results = {
    'successes': [],
    'failures': [],
    'valid': True,
  }

  for result_json in shard_results_list:
    successes = result_json.get('successes', [])
    failures = result_json.get('failures', [])
    valid = result_json.get('valid', True)

    if (not isinstance(successes, list) or not isinstance(failures, list) or
        not isinstance(valid, bool)):
      raise MergeException(
        'Unexpected value type in %s' % result_json)  # pragma: no cover

    merged_results['successes'].extend(successes)
    merged_results['failures'].extend(failures)
    merged_results['valid'] = merged_results['valid'] and valid
  return merged_results


def _merge_json_test_result_format(shard_results_list, test_cross_device=False):
  # This code is specialized to the Chromium JSON test results format version 3:
  # https://www.chromium.org/developers/the-json-test-results-format

  # These are required fields for the JSON test result format version 3.
  merged_results = {
    'tests': {},
    'interrupted': False,
    'version': 3,
    'seconds_since_epoch': float('inf'),
    'num_failures_by_type': {
    }
  }

  # To make sure that we don't mutate existing shard_results_list.
  shard_results_list = copy.deepcopy(shard_results_list)
  for result_json in shard_results_list:
    # TODO(tansell): check whether this deepcopy is actually necessary.
    result_json = copy.deepcopy(result_json)

    # Check the version first
    version = result_json.pop('version', -1)
    if version != 3:
      raise MergeException(  # pragma: no cover (covered by
                             # results_merger_unittest).
          'Unsupported version %s. Only version 3 is supported' % version)

    # Check the results for each shard have the required keys
    missing = REQUIRED - set(result_json)
    if missing:
      raise MergeException(  # pragma: no cover (covered by
                             # results_merger_unittest).
          'Invalid json test results (missing %s)' % missing)

    # Curry merge_values for this result_json.
    # pylint: disable=cell-var-from-loop
    merge = lambda key, merge_func: merge_value(
        result_json, merged_results, key, merge_func)

    if test_cross_device:
      # Results from the same test(story) may be found on different
      # shards(devices). We need to handle the merging on story level.
      merge('tests', merge_tries_v2)
    else:
      # Traverse the result_json's test trie & merged_results's test tries in
      # DFS order & add the n to merged['tests'].
      merge('tests', merge_tries)

    # If any were interrupted, we are interrupted.
    merge('interrupted', lambda x,y: x|y)

    # Use the earliest seconds_since_epoch value
    merge('seconds_since_epoch', min)

    # Sum the number of failure types
    merge('num_failures_by_type', sum_dicts)

    # Optional values must match
    for optional_key in OPTIONAL_MATCHING:
      if optional_key not in result_json:
        continue

      if optional_key not in merged_results:
        # Set this value to None, then blindly copy over it.
        merged_results[optional_key] = None
        merge(optional_key, lambda src, dst: src)
      else:
        merge(optional_key, ensure_match)

    # Optional values ignored
    for optional_key in OPTIONAL_IGNORED:
      if optional_key in result_json:
        merged_results[optional_key] = result_json.pop(
            # pragma: no cover (covered by
            # results_merger_unittest).
            optional_key)

    # Sum optional value counts
    for count_key in OPTIONAL_COUNTS:
      if count_key in result_json:  # pragma: no cover
        # TODO(mcgreevy): add coverage.
        merged_results.setdefault(count_key, 0)
        merge(count_key, lambda a, b: a+b)

    if result_json:
      raise MergeException(  # pragma: no cover (covered by
          # results_merger_unittest).
          'Unmergable values %s' % list(result_json.keys()))

  return merged_results


def merge_tries(source, dest):
  """ Merges test tries.

  This is intended for use as a merge_func parameter to merge_value.

  Args:
      source: A result json test trie.
      dest: A json test trie merge destination.
  """
  # merge_tries merges source into dest by performing a lock-step depth-first
  # traversal of dest and source.
  # pending_nodes contains a list of all sub-tries which have been reached but
  # need further merging.
  # Each element consists of a trie prefix, and a sub-trie from each of dest
  # and source which is reached via that prefix.
  pending_nodes = [('', dest, source)]
  while pending_nodes:
    prefix, dest_node, curr_node = pending_nodes.pop()
    for k, v in curr_node.items():
      if k in dest_node:
        if not isinstance(v, dict):
          raise MergeException(
              '%s:%s: %r not mergable, curr_node: %r\ndest_node: %r' %
              (prefix, k, v, curr_node, dest_node))
        pending_nodes.append(("%s:%s" % (prefix, k), dest_node[k], v))
      else:
        dest_node[k] = v
  return dest


def merge_tries_v2(source, dest):
  """ Merges test tries, and adds support for merging results for the same story
  from different devices, which is not supported on v1.

  This is intended for use as a merge_func parameter to merge_value.

  Args:
      source: A result json test trie.
      dest: A json test trie merge destination.
  """
  # merge_tries merges source into dest by performing a lock-step depth-first
  # traversal of dest and source.
  # pending_nodes contains a list of all sub-tries which have been reached but
  # need further merging.
  # Each element consists of a trie prefix, and a sub-trie from each of dest
  # and source which is reached via that prefix.
  pending_nodes = [('', dest, source)]
  while pending_nodes:
    prefix, dest_node, curr_node = pending_nodes.pop()
    for k, v in curr_node.items():
      if k in dest_node:
        if not isinstance(v, dict):
          raise MergeException(
              '%s:%s: %r not mergable, curr_node: %r\ndest_node: %r' %
              (prefix, k, v, curr_node, dest_node))
        if 'actual' in v and 'expected' in v:
          # v is test result of a story name which is already in dest
          _merging_cross_device_results(v, dest_node[k])
        else:
          pending_nodes.append(("%s:%s" % (prefix, k), dest_node[k], v))
      else:
        dest_node[k] = v
  return dest


def _merging_cross_device_results(src, dest):
  # 1. Merge the 'actual' field and update the is_unexpected based on new values
  dest['actual'] += ' %s' % src['actual']
  if any(actual != dest['expected'] for actual in dest['actual'].split()):
    dest['is_unexpected'] = True
  # 2. append each item under the 'artifacts' and 'times'.
  if 'artifacts' in src:
    if 'artifacts' in dest:
      for artifact, artifact_list in src['artifacts'].items():
        if artifact in dest['artifacts']:
          dest['artifacts'][artifact] += artifact_list
        else:
          dest['artifacts'][artifact] = artifact_list
    else:
      dest['artifacts'] = src['artifacts']
  if 'times' in src:
    if 'times' in dest:
      dest['times'] += src['times']
    else:
      dest['time'] = src['time']
      dest['times'] = src['times']
  # 3. remove the 'shard' because now the results are from multiple shards.
  if 'shard' in dest:
    del dest['shard']


def ensure_match(source, dest):
  """ Returns source if it matches dest.

  This is intended for use as a merge_func parameter to merge_value.

  Raises:
      MergeException if source != dest
  """
  if source != dest:
    raise MergeException(  # pragma: no cover (covered by
                           # results_merger_unittest).
        "Values don't match: %s, %s" % (source, dest))
  return source


def sum_dicts(source, dest):
  """ Adds values from source to corresponding values in dest.

  This is intended for use as a merge_func parameter to merge_value.
  """
  for k, v in source.items():
    dest.setdefault(k, 0)
    dest[k] += v

  return dest


def merge_value(source, dest, key, merge_func):
  """ Merges a value from source to dest.

  The value is deleted from source.

  Args:
    source: A dictionary from which to pull a value, identified by key.
    dest: The dictionary into to which the value is to be merged.
    key: The key which identifies the value to be merged.
    merge_func(src, dst): A function which merges its src into dst,
        and returns the result. May modify dst. May raise a MergeException.

  Raises:
    MergeException if the values can not be merged.
  """
  try:
    dest[key] = merge_func(source[key], dest[key])
  except MergeException as e:
    # The message attribute does not exist in Python 3, but Python 3's exception
    # chaining should get us equivalent functionality.
    if six.PY2:
      e.message = "MergeFailure for %s\n%s" % (key, e.message)  # pylint: disable=attribute-defined-outside-init
      e.args = tuple([e.message] + list(e.args[1:]))
      raise
    raise MergeException('MergeFailure for %s\n%s' % (key, e))
  del source[key]


def main(files):
  if len(files) < 2:
    sys.stderr.write("Not enough JSON files to merge.\n")
    return 1
  sys.stderr.write('Starting with %s\n' % files[0])
  result = json.load(open(files[0]))
  for f in files[1:]:
    sys.stderr.write('Merging %s\n' % f)
    result = merge_test_results([result, json.load(open(f))])
  print(json.dumps(result))
  return 0


if __name__ == "__main__":
  sys.exit(main(sys.argv[1:]))