chromium/ios/build/bots/scripts/result_sink_util.py

# Copyright 2020 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import atexit
import base64
import cgi
import json
import logging
import os
import requests
import sys
import traceback

import constants

# import protos for exceptions reporting
THIS_DIR = os.path.abspath(os.path.dirname(__file__))
CHROMIUM_SRC_DIR = os.path.abspath(os.path.join(THIS_DIR, '../../../..'))
sys.path.append(
    os.path.abspath(os.path.join(CHROMIUM_SRC_DIR, 'build/util/lib/proto')))
import exception_occurrences_pb2

from google.protobuf import json_format
from google.protobuf import any_pb2

LOGGER = logging.getLogger(__name__)
# VALID_STATUSES is a list of valid status values for test_result['status'].
# The full list can be obtained at
# https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/proto/v1/test_result.proto;drc=ca12b9f52b27f064b0fa47c39baa3b011ffa5790;l=151-174
VALID_STATUSES = {"PASS", "FAIL", "CRASH", "ABORT", "SKIP"}


def format_exception_stacktrace(e: Exception):
  exception_trace = traceback.format_exception(type(e), e, e.__traceback__)
  return exception_trace


def _compose_test_result(test_id,
                         status,
                         expected,
                         duration=None,
                         test_log=None,
                         test_loc=None,
                         tags=None,
                         file_artifacts=None):
  """Composes the test_result dict item to be posted to result sink.

  Args:
    test_id: (str) A unique identifier of the test in LUCI context.
    status: (str) Status of the test. Must be one in |VALID_STATUSES|.
    duration: (int) Test duration in milliseconds or None if unknown.
    expected: (bool) Whether the status is expected.
    test_log: (str) Log of the test. Optional.
    tags: (list) List of tags. Each item in list should be a length 2 tuple of
        string as ("key", "value"). Optional.
    test_loc: (dict): Test location metadata as described in
        https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/proto/v1/test_metadata.proto;l=32;drc=37488404d1c8aa8fccca8caae4809ece08828bae
    file_artifacts: (dict) IDs to abs paths mapping of existing files to
        report as artifact.

  Returns:
    A dict of test results with input information, confirming to
      https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/sink/proto/v1/test_result.proto
  """
  tags = tags or []
  file_artifacts = file_artifacts or {}

  assert status in VALID_STATUSES, (
      '%s is not a valid status (one in %s) for ResultSink.' %
      (status, VALID_STATUSES))

  for tag in tags:
    assert len(tag) == 2, 'Items in tags should be length 2 tuples of strings'
    assert isinstance(tag[0], str) and isinstance(
        tag[1], str), ('Items in'
                       'tags should be length 2 tuples of strings')

  test_result = {
      'testId': test_id,
      'status': status,
      'expected': expected,
      'tags': [{
          'key': key,
          'value': value
      } for (key, value) in tags],
      'testMetadata': {
          'name': test_id,
          'location': test_loc,
      }
  }

  test_result['artifacts'] = {
      name: {
          'filePath': file_artifacts[name]
      } for name in file_artifacts
  }
  if test_log:
    message = ''
    if sys.version_info.major < 3:
      message = base64.b64encode(test_log)
    else:
      # Python3 b64encode takes and returns bytes. The result must be
      # serializable in order for the eventual json.dumps to succeed
      message = base64.b64encode(test_log.encode('utf-8')).decode('utf-8')
    test_result['summaryHtml'] = '<text-artifact artifact-id="Test Log" />'
    if constants.CRASH_MESSAGE in test_log:
      test_result['failureReason'] = {
          'primaryErrorMessage': constants.CRASH_MESSAGE
      }
    test_result['artifacts'].update({
        'Test Log': {
            'contents': message
        },
    })
  if not test_result['artifacts']:
    test_result.pop('artifacts')

  if duration:
    test_result['duration'] = '%.9fs' % (duration / 1000.0)

  return test_result


def _compose_exception_occurrence(exception):
  """Composes the exception_occurrence item to be posted to result sink.

  Args:
    exception: (Exception) the exception to be posted to result sink.

  Returns:
    exception_occurrence: Conforming to protos defined in
          //build/util/lib/proto/exception_occurrences.proto
  """

  occurrence = exception_occurrences_pb2.ExceptionOccurrence(
      name=type(exception).__name__,
      stacktrace=format_exception_stacktrace(exception),
  )
  occurrence.occurred_time.GetCurrentTime()
  return occurrence


class ExceptionResults:
  results = []  # Static variable to hold exception results

  @staticmethod
  def add_result(exception):
    ExceptionResults.results.append(exception)


class ResultSinkClient(object):
  """Stores constants and handles posting to ResultSink."""

  def __init__(self):
    """Initiates and stores constants to class."""
    self.sink = None
    luci_context_file = os.environ.get('LUCI_CONTEXT')
    if not luci_context_file:
      logging.warning('LUCI_CONTEXT not found in environment. ResultDB'
                      ' integration disabled.')
      return

    with open(luci_context_file) as f:
      self.sink = json.load(f).get('result_sink')
      if not self.sink:
        logging.warning('ResultSink constants not found in LUCI context.'
                        ' ResultDB integration disabled.')
        return

      self.url = ('http://%s/prpc/luci.resultsink.v1.Sink/ReportTestResults' %
                  self.sink['address'])
      self.headers = {
          'Content-Type': 'application/json',
          'Accept': 'application/json',
          'Authorization': 'ResultSink %s' % self.sink['auth_token'],
      }
      self._session = requests.Session()

      # Ensure session is closed at exit.
      atexit.register(self.close)

    logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)

  def close(self):
    """Closes the connection to result sink server."""
    if not self.sink:
      return
    LOGGER.info('Closing connection with result sink server.')
    # Reset to default logging level of test runner scripts.
    logging.getLogger("urllib3.connectionpool").setLevel(logging.DEBUG)
    self._session.close()

  def post(self, test_id, status, expected, **kwargs):
    """Composes and posts a test and status to result sink.

    Args:
      test_id: (str) A unique identifier of the test in LUCI context.
      status: (str) Status of the test. Must be one in |VALID_STATUSES|.
      expected: (bool) Whether the status is expected.
      **kwargs: Optional keyword args. Namely:
        duration: (int) Test duration in milliseconds or None if unknown.
        test_log: (str) Log of the test. Optional.
        tags: (list) List of tags. Each item in list should be a length 2 tuple
          of string as ("key", "value"). Optional.
        file_artifacts: (dict) IDs to abs paths mapping of existing files to
          report as artifact.
    """
    if not self.sink:
      return
    self._post_test_result(
        _compose_test_result(test_id, status, expected, **kwargs))

  def post_exceptions(self, exceptions):
    """Composes and posts exception result to server.

    Args:
      exception: [Exception] list of exceptions to be posted to result sink.
    """
    if not self.sink:
      return
    exception_occurrences = []
    for exception in exceptions:
      exception_occurrences.append(_compose_exception_occurrence(exception))
    self._post_exceptions(exception_occurrences)

  def _post_test_result(self, test_result):
    """Posts single test result to server.

    This method assumes |self.sink| is not None.

    Args:
        test_result: (dict) Confirming to protocol defined in
          https://source.chromium.org/chromium/infra/infra/+/main:go/src/go.chromium.org/luci/resultdb/sink/proto/v1/test_result.proto
    """
    res = self._session.post(
        url=self.url,
        headers=self.headers,
        data=json.dumps({'testResults': [test_result]}),
    )
    res.raise_for_status()

  def _post_exceptions(self, exception_occurrences):
    """Posts exception result to server.

    This method assumes |self.sink| is not None.

    Args:
        exception_occurrences: list of exception_occurrences,
          conforming to protos defined in
          //build/util/lib/proto/exception_occurrences.proto
    """

    occurrences = exception_occurrences_pb2.ExceptionOccurrences()
    occurrences.datapoints.extend(exception_occurrences)
    any_msg = any_pb2.Any()
    any_msg.Pack(occurrences)
    inv_data = json.dumps(
        {
            'invocation': {
                'extended_properties': {
                    'exception_occurrences':
                        json_format.MessageToDict(
                            any_msg, preserving_proto_field_name=True)
                }
            },
            'update_mask': {
                'paths': ['extended_properties.exception_occurrences'],
            }
        },
        sort_keys=True)

    LOGGER.info(inv_data)

    updateInvo_url = (
        'http://%s/prpc/luci.resultsink.v1.Sink/UpdateInvocation' %
        self.sink['address'])
    res = self._session.post(
        url=updateInvo_url,
        headers=self.headers,
        data=inv_data,
    )
    res.raise_for_status()