chromium/tools/perf/core/sharding_map_generator.py

# Copyright 2018 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections

import core.bot_platforms
import core.path_util
import core.cli_utils

core.path_util.AddTelemetryToPath()

# Initialize the duration of all stories to be shard to 10 seconds.
# The reasons are:
# 1) Even if the stories are skipped, they still have non negligible
#    overhead.
# 2) For a case of sharding a set of benchmarks with no existing data about
#    timing, initializing the stories time within a single repeat to 1 leads
#    to a roughly equal distribution of stories on the shards, whereas
#    initializing them to zero will make the algorithm put all the stories
#    into the first shard.
# 3) For the case  of adding a new benchmark to a builder that hasn't run
#    it before but has run other benchmarks, 10 seconds is a reasonable
#    amount of time to guess that it would take the stories to run and
#    creates reasonably balanced shard maps.
DEFAULT_STORY_DURATION = 10


def generate_sharding_map(benchmarks_to_shard,
                          timing_data,
                          num_shards,
                          debug,
                          repeat_config=None):
  """Generate sharding map.

    Args:
      benchmarks_to_shard: a list of bot_platforms.BenchmarkConfig and
      ExecutableConfig objects.
      timing_data: The timing data in json with 'name' and 'duration'
      num_shards: the total number of shards
      debug: if true, print out full list of stories of each shard in shard map.
      repeat_config: dict of the tests which need to repeat on multiple
      shards.
    Return:
      The shard map.
  """
  # Now we have three ways to run the tests:
  # - Run the benchmark once;
  # - Run the benchmark multiple times;
  # - Run the test multiple times.
  # The current format of the shard map (which is using the benchmark and story
  # name as the key) does not directly support running repeating the same test
  # on the same shard. Also considering reusing the existing code, now we will
  # allocate the test in the following fashion: the benchmarks or tests which
  # needs repeats are always allocated first, in order to avoid the fact that
  # all of them need to be squeezed together.

  # Sort the list of benchmarks to be sharded by benchmark's name to make the
  # execution of this algorithm deterministic.
  benchmarks_to_shard.sort(key=lambda entry: entry.name)
  benchmark_name_to_config = {b.name: b for b in benchmarks_to_shard}
  stories_by_benchmark = {}
  for b in benchmarks_to_shard:
    stories_by_benchmark[b.name] = b.stories

  # parse the test repeat config
  benchmarks_to_repeat = {}
  stories_to_repeat = {}
  if repeat_config:
    for b in repeat_config:
      if b not in benchmark_name_to_config:
        continue
      if isinstance(repeat_config[b], int):
        repeats = min(repeat_config[b], num_shards)
        if repeats > 1:
          # repeat the whole benchmark
          benchmarks_to_repeat[b] = repeats
      else:
        # repeat specific stories of the benchmark
        for s in repeat_config[b]:
          repeats = min(repeat_config[b][s], num_shards)
          if repeats > 1:
            stories_to_repeat['%s/%s' % (b, s)] = repeats

  # Generate timing list for each way of repeating.
  # A timing list of tuples of (benchmarkname/story_name, story_duration),
  # where stories are ordered as in the benchmarks.
  benchmarks_to_run_once = [
      b for b in benchmarks_to_shard if b.name not in benchmarks_to_repeat
  ]
  one_time_story_timing_list = _gather_timing_data(benchmarks_to_run_once,
                                                   timing_data, True)

  repeated_benchmark_to_timing_list = {}
  for b in benchmarks_to_repeat:
    timing_list = _gather_timing_data([benchmark_name_to_config[b]],
                                      timing_data, True)
    repeated_benchmark_to_timing_list[b] = timing_list

  repeated_story_timing_list = {}
  for s, t in one_time_story_timing_list:
    if s in stories_to_repeat:
      repeated_story_timing_list[s] = t

  # Generate the total timing and story count
  total_time = sum(t for _, t in one_time_story_timing_list)
  total_story = len(one_time_story_timing_list)
  for benchmark, timing_list in repeated_benchmark_to_timing_list.items():
    total_time += sum(t * benchmarks_to_repeat[benchmark]
                      for s, t in timing_list)
    total_story += len(timing_list) * benchmarks_to_repeat[benchmark]
  for story, time in repeated_story_timing_list.items():
    total_time += time * (stories_to_repeat[story] - 1)
    total_story += stories_to_repeat[story] - 1
  expected_shard_time = total_time / num_shards

  sharding_map = collections.OrderedDict()
  num_stories = total_story
  min_shard_time = 0x7fffffff  # maxint
  min_shard_index = None
  max_shard_time = 0
  max_shard_index = None
  predicted_shard_timings = []
  debug_timing = collections.OrderedDict()

  current_repeat_on_benchmark = 0
  repeating_benchmark_timing_list = []
  one_time_story_timing_list.reverse()
  # The algorithm below removes all the stories from story timing lists one by
  # one and add them to the current shard until the shard's total time is
  # approximately equals to |expected_shard_time|. After that point,
  # it moves to the next shard. To make sure repeating benchmarks are not on the
  # same shard, the stories from repeated_benchmark_to_timing_list have higher
  # priority.
  # For efficient removal of story timing list's elements & to keep the
  # ordering of benchmark alphabetically sorted in the shards' assignment, we
  # reverse the |story_timing_list|.
  #TODO(crbug.com/40175917): fix extra story repeat
  for i in range(num_shards):
    if len(repeating_benchmark_timing_list) == 0:
      for benchmark, timing_list in repeated_benchmark_to_timing_list.items():
        if benchmarks_to_repeat[benchmark] > current_repeat_on_benchmark:
          repeating_benchmark_timing_list += timing_list
      repeating_benchmark_timing_list.reverse()
      current_repeat_on_benchmark += 1

    shard_name = 'shard #%i' % i
    sharding_map[str(i)] = {'benchmarks': collections.OrderedDict()}

    pre_allocated_stories = []
    pre_allocated_time = 0
    # pre_allocated_stories = all_pre_allocated_stories[i]
    # for s in pre_allocated_stories:
    #   pre_allocated_time += repeated_story_timing_list[s]
    for s, t in stories_to_repeat.items():
      if t > i:
        pre_allocated_stories.append(s)
        pre_allocated_time += repeated_story_timing_list[s]

    debug_timing[shard_name] = collections.OrderedDict()
    shard_time = pre_allocated_time
    stories_in_shard = []

    # Keep adding stories to the current shard if:
    # 1. Adding the next story does not makes the shard time further from
    # the expected;
    # Or
    # 2. The current shard is the last shard.
    while one_time_story_timing_list or repeating_benchmark_timing_list:
      if repeating_benchmark_timing_list:
        story_timing_list = repeating_benchmark_timing_list
      else:
        story_timing_list = one_time_story_timing_list

      # Add one story anyway to avoid empty shard
      current_story, current_duration = story_timing_list[-1]
      story_timing_list.pop()
      if current_story not in pre_allocated_stories:
        shard_time += current_duration
      stories_in_shard.append(current_story)
      debug_timing[shard_name][current_story] = current_duration

      if not one_time_story_timing_list and not repeating_benchmark_timing_list:
        # All stories sharded
        break

      if repeating_benchmark_timing_list:
        _, next_duration = repeating_benchmark_timing_list[-1]
      else:
        _, next_duration = one_time_story_timing_list[-1]
      if (abs(shard_time + next_duration - expected_shard_time) >
          abs(shard_time - expected_shard_time)) and i != num_shards - 1:
        # it is not the last shard and we should not add the next story
        break
    _add_benchmarks_to_shard(sharding_map, i, stories_in_shard,
                             stories_by_benchmark, benchmark_name_to_config)

    sharding_map_benchmarks = sharding_map[str(i)].get(
        'benchmarks', collections.OrderedDict())
    benchmark_sections = collections.OrderedDict()
    for benchmark, config in sharding_map_benchmarks.items():
      if 'sections' in config:
        section_list = [(s.get('begin', 0),
                         s.get('end', len(stories_by_benchmark[benchmark])))
                        for s in config['sections']]
      else:
        section_list = [(config.get('begin', 0),
                         config.get('end',
                                    len(stories_by_benchmark[benchmark])))]
      benchmark_sections[benchmark] = section_list

    for pre_allocated_story in pre_allocated_stories:
      benchmark, story = pre_allocated_story.split('/', 1)
      story_index = stories_by_benchmark[benchmark].index(story)
      if benchmark in benchmark_sections:
        benchmark_sections[benchmark].append((story_index, story_index + 1))
      else:
        benchmark_sections[benchmark] = [(story_index, story_index + 1)]

    new_benchmark_configs = collections.OrderedDict()
    for benchmark, sections in benchmark_sections.items():
      merged_sections = core.cli_utils.MergeIndexRanges(sections)
      sections_config = []
      if len(merged_sections) == 1:
        begin = merged_sections[0][0] if merged_sections[0][0] != 0 else None
        end = merged_sections[0][1] if merged_sections[0][1] != len(
            stories_by_benchmark[benchmark]) else None
        benchmark_config = {}
        if begin:
          benchmark_config['begin'] = begin
        if end:
          benchmark_config['end'] = end
      elif len(merged_sections) > 1:
        for section in merged_sections:
          sections_config.append({'begin': section[0], 'end': section[1]})
        benchmark_config = {
            'sections': sections_config,
            'abridged': benchmark_name_to_config[benchmark].abridged
        }

      abridged = benchmark_name_to_config[benchmark].abridged
      benchmark_config['abridged'] = abridged

      pageset_repeat_override = benchmark_name_to_config[
          benchmark].pageset_repeat_override
      if pageset_repeat_override:
        benchmark_config['pageset_repeat'] = pageset_repeat_override

      new_benchmark_configs[benchmark] = benchmark_config

    sharding_map[str(i)]['benchmarks'] = new_benchmark_configs
    if i != num_shards - 1:
      total_time -= shard_time
      expected_shard_time = total_time / (num_shards - i - 1)
    if shard_time > max_shard_time:
      max_shard_time = shard_time
      max_shard_index = i
    if shard_time < min_shard_time:
      min_shard_time = shard_time
      min_shard_index = i

    predicted_shard_timings.append((shard_name, shard_time))
    debug_timing[shard_name]['expected_total_time'] = shard_time

  sharding_map['extra_infos'] = collections.OrderedDict([
      ('num_stories', num_stories),
      ('predicted_min_shard_time', min_shard_time),
      ('predicted_min_shard_index', min_shard_index),
      ('predicted_max_shard_time', max_shard_time),
      ('predicted_max_shard_index', max_shard_index),
  ])

  if debug:
    sharding_map['extra_infos'].update(debug_timing)
  else:
    sharding_map['extra_infos'].update(predicted_shard_timings)
  return sharding_map


def _add_benchmarks_to_shard(sharding_map, shard_index, stories_in_shard,
    all_stories, benchmark_name_to_config):
  benchmarks = collections.OrderedDict()
  for story in stories_in_shard:
    (b, story) = story.split('/', 1)
    if b not in benchmarks:
      benchmarks[b] = []
    benchmarks[b].append(story)

  # Format the benchmark's stories by indices
  benchmarks_in_shard = collections.OrderedDict()
  executables_in_shard = collections.OrderedDict()
  crossbench_in_shard = collections.OrderedDict()
  for b in benchmarks:
    config = benchmark_name_to_config[b]
    if isinstance(config, core.bot_platforms.CrossbenchConfig):
      crossbench_in_shard[config.crossbench_name] = {
          'display_name': b,
          'arguments': config.arguments,
      }
      first_story = all_stories[b].index(benchmarks[b][0])
      last_story = all_stories[b].index(benchmarks[b][-1]) + 1
      if first_story != 0:
        crossbench_in_shard[config.crossbench_name]['begin'] = first_story
      if last_story != len(all_stories[b]):
        crossbench_in_shard[config.crossbench_name]['end'] = last_story
    elif config.is_telemetry:
      benchmarks_in_shard[b] = {}
      first_story = all_stories[b].index(benchmarks[b][0])
      last_story = all_stories[b].index(benchmarks[b][-1]) + 1
      if first_story != 0:
        benchmarks_in_shard[b]['begin'] = first_story
      if last_story != len(all_stories[b]):
        benchmarks_in_shard[b]['end'] = last_story
      benchmarks_in_shard[b]['abridged'] = benchmark_name_to_config[b].abridged
    else:
      executables_in_shard[b] = {}
      if config.flags:
        executables_in_shard[b]['arguments'] = config.flags
      executables_in_shard[b]['path'] = config.path
  sharding_map[str(shard_index)] = collections.OrderedDict()
  if benchmarks_in_shard:
    sharding_map[str(shard_index)]['benchmarks'] = benchmarks_in_shard
  if executables_in_shard:
    sharding_map[str(shard_index)]['executables'] = executables_in_shard
  if crossbench_in_shard:
    sharding_map[str(shard_index)]['crossbench'] = crossbench_in_shard


def _gather_timing_data(benchmarks_to_shard, timing_data, repeat):
  """Generates a list of story and duration in order.
  Return:
    A list of tuples of (story_name, story_duration), sorted by the order of
    benchmark name + story order within the benchmark.
  """
  timing_data_dict = {}
  for run in timing_data:
    if run['duration']:
      timing_data_dict[run['name']] = float(run['duration'])
  timing_data_list = []
  for b in benchmarks_to_shard:
    run_count = b.repeat if repeat else 1
    for s in b.stories:
      test_name = '%s/%s' % (b.name, s)
      test_duration = getattr(b, 'estimated_runtime', DEFAULT_STORY_DURATION)
      if test_name in timing_data_dict:
        test_duration = timing_data_dict[test_name] * run_count
      timing_data_list.append((test_name, test_duration))
  return timing_data_list


def test_sharding_map(
    sharding_map, benchmarks_to_shard, test_timing_data):
  story_timing_list = _gather_timing_data(
      benchmarks_to_shard, test_timing_data, False)

  story_timing_dict = dict(story_timing_list)

  results = collections.OrderedDict()
  all_stories = {}
  for b in benchmarks_to_shard:
    all_stories[b.name] = b.stories

  sharding_map.pop('extra_infos', None)
  for shard in sharding_map:
    results[shard] = collections.OrderedDict()
    shard_total_time = 0
    for benchmark_name in sharding_map[shard]['benchmarks']:
      benchmark = sharding_map[shard]['benchmarks'][benchmark_name]
      begin = 0
      end = len(all_stories[benchmark_name])
      if 'begin' in benchmark:
        begin = benchmark['begin']
      if 'end' in benchmark:
        end = benchmark['end']
      benchmark_timing = 0
      for story in all_stories[benchmark_name][begin : end]:
        story_timing = story_timing_dict[benchmark_name + '/' + story]
        results[shard][benchmark_name + '/' + story] = str(story_timing)
        benchmark_timing += story_timing
      shard_total_time += benchmark_timing
    results[shard]['full_time'] = shard_total_time
  return results