chromium/tools/utr/output_adapter.py

# Copyright 2024 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Adapter for printing legacy recipe output

TODO(https://crbug.com/326904531): This file is intended to be a temporary
workaround and should be replaced once this bug is resolved"""

import json
import logging
import re
import sys

# Create a logger that avoids rich formatting as we can't control recipe
# formatting from here
basic_logger = logging.getLogger('basic_logger')
basic_logger.addHandler(logging.StreamHandler(sys.stdout))
basic_logger.propagate = False

class PassthroughAdapter:
  """Doesn't filter anything, just logs everything from the recipe run."""

  def ProcessLine(self, line):
    basic_logger.log(logging.DEBUG, line)


class LegacyOutputAdapter:
  """Interprets the legacy recipe run mode output to logging

  This will filter, route and in some cases reformat the output to trace levels
  of logging. This will cause specific output (e.g. unfiltered step names) to
  always print to std out or when -v is passed, the stdout will additionally
  be passed to the logging stdout. Note -vv will cause PassthroughAdapter to
  interpret results"""

  SEED_STEP_TEXT = '@@@SEED_STEP@'
  STEP_CLOSED_TEXT = '@@@STEP_CLOSED@@@'
  ANNOTATOR_PREFIX_SUFIX = '@@@'
  TRIGGER_STEP_PREFIX = 'test_pre_run.[trigger] '
  TRIGGER_LINK_TEXT = '@@@STEP_LINK@task UI:'

  def __init__(self):
    self._trigger_link_re = re.compile(r'.+@(https://.+)@@@$')
    self._ninja_status_re = re.compile(r'\[(\d+)\/(\d+)\]')
    self._collect_wait_re = re.compile(
        r'.+prpc call (.+) swarming.v2.Tasks.ListTaskStates, stdin: '
        r'(\{"task_id": .+\})$'
    )
    self._result_links_re = re.compile(
        r'@@@STEP_LINK@shard (#\d+) test results@(https://[^@]+)@@@')

    self._current_proccess_fn = self._StepNameProcessLine
    # The first match is used. This allows us to filter parent steps while still
    # printing child steps by adding the child step name first. By default
    # _StepNameProcessLine will be used which prints the step name and it's
    # stdout
    self._step_to_processors = {
        'compile': self._ProcessCompileLine,
        'reclient compile': self._ProcessCompileLine,
        'test_pre_run.[trigger] ': self._ProcessTriggerLine,
        'collect tasks.wait for tasks': self._ProcessCollectLine,
    }
    # The first match is used. This allows us to filter parent steps while still
    # printing child steps by adding the child step name first. By default INFO
    # will be used which prints in non-verbose mode (i.e. no -v flag)
    self._step_to_log_level = {
        'lookup_builder_gn_args': logging.DEBUG,
        'git rev-parse': logging.DEBUG,
        'git diff to instrument': logging.DEBUG,
        'save paths of affected files': logging.DEBUG,
        'preprocess for reclient.start reproxy via bootstrap': logging.INFO,
        'preprocess for reclient': logging.DEBUG,
        'process clang crashes': logging.DEBUG,
        'compile confirm no-op': logging.DEBUG,
        'postprocess for reclient': logging.DEBUG,
        'setup_build': logging.DEBUG,
        'get compile targets for scripts': logging.DEBUG,
        'lookup GN args': logging.DEBUG,
        'install infra/tools/luci/isolate': logging.DEBUG,
        'find command lines': logging.DEBUG,
        'test_pre_run.install infra/tools/luci/swarming': logging.DEBUG,
        'isolate tests': logging.DEBUG,
        'read GN args': logging.DEBUG,
        'test_pre_run.[trigger] ': logging.INFO,
        'test_pre_run.': logging.DEBUG,
        'collect tasks.wait for tasks': logging.INFO,
        'collect tasks': logging.DEBUG,
        '$debug - all results': logging.DEBUG,
        'Test statistics': logging.DEBUG,
        'read gclient': logging.DEBUG,
        'write output_properties_file': logging.DEBUG,
    }
    # Setup logger for printing to the same line
    logger = logging.getLogger('single_line_logger')
    handler = logging.StreamHandler(sys.stdout)
    handler.terminator = ''
    logger.addHandler(handler)
    logger.propagate = False

    self._last_line = ''
    self._current_log_level = logging.DEBUG
    self._single_line_logger = logger
    self._current_step_name = ''
    self._dot_count = 0

  def _StdoutProcessLine(self, line):
    if not line.startswith(self.ANNOTATOR_PREFIX_SUFIX):
      # Pass through any non-engine text
      is_urlish = re.match(r'^http[s]?://\S+$', line)
      if is_urlish:
        logging.log(self._current_log_level, line)
      else:
        basic_logger.log(self._current_log_level, line)

  def _StepNameProcessLine(self, line):
    if line.startswith(self.SEED_STEP_TEXT):
      # Always print the step name to info
      logging.log(self._current_log_level,
                  '\n[cyan]Running: ' + self._current_step_name + '[/]')
      return
    self._StdoutProcessLine(line)

  def _ProcessTriggerLine(self, line):
    if line.startswith(self.SEED_STEP_TEXT + self.TRIGGER_STEP_PREFIX):
      # The step names for tests don't have any identifying keywords so the
      # result step parsers need to be installed at trigger time
      test_name = line[len(self.SEED_STEP_TEXT +
                           self.TRIGGER_STEP_PREFIX):line.index(' (') if ' (' in
                       line else -len(self.ANNOTATOR_PREFIX_SUFIX)]
      self._step_to_processors[test_name] = self._ProcessResult
      self._step_to_log_level[test_name] = logging.DEBUG
    elif line.startswith(self.TRIGGER_LINK_TEXT):
      matches = self._trigger_link_re.match(line)
      if matches:
        task_name = self._current_step_name[len(self.TRIGGER_STEP_PREFIX):]
        basic_logger.log(self._current_log_level,
                         f'Triggered {task_name}: ' + matches[1])
    else:
      self._StdoutProcessLine(line)

  def _ProcessCompileLine(self, line):
    if line.startswith(self.SEED_STEP_TEXT):
      logging.info('\n[cyan]Running: ' + self._current_step_name + '[/]')
      return
    matches = self._ninja_status_re.match(line)
    if matches:
      self._single_line_logger.log(self._current_log_level, '\33[2K\r' + line)
      return
    if self._last_line.startswith('['):
      basic_logger.log(self._current_log_level, '')
    self._StdoutProcessLine(line)

  def _ProcessCollectLine(self, line):
    if line.startswith(self.SEED_STEP_TEXT):
      logging.info('\n[cyan]Running: ' + self._current_step_name + '[/]')
    matches = self._collect_wait_re.match(line)
    if matches:
      task_ids = json.loads(matches[2])['task_id']
      self._dot_count = (self._dot_count % 5) + 1
      self._single_line_logger.log(
          self._current_log_level,
          f'\33[2K\rStill waiting on: {len(task_ids)} shard(s)' +
          '.' * self._dot_count)
      return
    if line == self.STEP_CLOSED_TEXT:
      self._single_line_logger.log(self._current_log_level,
                                   '\33[2K\rStill waiting on: 0 shard(s)...')
      basic_logger.log(self._current_log_level, '')

  def _ProcessResult(self, line):
    matches = self._result_links_re.match(line)
    if matches:
      basic_logger.log(self._current_log_level,
                       'Test results for %s shard %s: %s',
                       self._current_step_name, matches[1], matches[2])

  def ProcessLine(self, line):
    # If we're in a new step see if it needs to be parsed differently
    if line.startswith(self.SEED_STEP_TEXT):
      self._current_step_name = line[len(self.SEED_STEP_TEXT
                                         ):-len(self.ANNOTATOR_PREFIX_SUFIX)]
      self._current_proccess_fn = self._get_processor(self._current_step_name)
      self._current_log_level = self._get_log_level(self._current_step_name)
    self._current_proccess_fn(line)
    self._last_line = line
    if line.startswith(self.STEP_CLOSED_TEXT):
      # Text outside of steps will use the last processor otherwise
      self._current_log_level = logging.DEBUG
      _current_proccess_fn = self._StepNameProcessLine

  def _get_processor(self, step_name):
    if step_name in self._step_to_processors:
      return self._step_to_processors[step_name]
    for match_name in self._step_to_processors:
      if step_name.startswith(match_name):
        return self._step_to_processors[match_name]
    return self._StepNameProcessLine

  def _get_log_level(self, step_name):
    if step_name in self._step_to_log_level:
      return self._step_to_log_level[step_name]
    for match_name in self._step_to_log_level:
      if step_name.startswith(match_name):
        return self._step_to_log_level[match_name]
    return logging.INFO