chromium/build/fuchsia/test/ffx_integration.py

# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Provide helpers for running Fuchsia's `ffx`."""

import logging
import os
import json
import subprocess
import sys
import tempfile

from contextlib import AbstractContextManager
from typing import IO, Iterable, List, Optional

from common import run_continuous_ffx_command, run_ffx_command, SDK_ROOT

RUN_SUMMARY_SCHEMA = \
    'https://fuchsia.dev/schema/ffx_test/run_summary-8d1dd964.json'


def get_config(name: str) -> Optional[str]:
    """Run a ffx config get command to retrieve the config value."""

    try:
        return run_ffx_command(cmd=['config', 'get', name],
                               capture_output=True).stdout.strip()
    except subprocess.CalledProcessError as cpe:
        # A return code of 2 indicates no previous value set.
        if cpe.returncode == 2:
            return None
        raise


class ScopedFfxConfig(AbstractContextManager):
    """Temporarily overrides `ffx` configuration. Restores the previous value
    upon exit."""

    def __init__(self, name: str, value: str) -> None:
        """
        Args:
            name: The name of the property to set.
            value: The value to associate with `name`.
        """
        self._old_value = None
        self._new_value = value
        self._name = name

    def __enter__(self):
        """Override the configuration."""

        # Cache the old value.
        self._old_value = get_config(self._name)
        if self._new_value != self._old_value:
            run_ffx_command(cmd=['config', 'set', self._name, self._new_value])
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
        if self._new_value == self._old_value:
            return False

        # Allow removal of config to fail.
        remove_cmd = run_ffx_command(cmd=['config', 'remove', self._name],
                                     check=False)
        if remove_cmd.returncode != 0:
            logging.warning('Error when removing ffx config %s', self._name)

        # Explicitly set the value back only if removing the new value doesn't
        # already restore the old value.
        if self._old_value is not None and \
           self._old_value != get_config(self._name):
            run_ffx_command(cmd=['config', 'set', self._name, self._old_value])

        # Do not suppress exceptions.
        return False


class FfxTestRunner(AbstractContextManager):
    """A context manager that manages a session for running a test via `ffx`.

    Upon entry, an instance of this class configures `ffx` to retrieve files
    generated by a test and prepares a directory to hold these files either in a
    specified directory or in tmp. On exit, any previous configuration of
    `ffx` is restored and the temporary directory, if used, is deleted.

    The prepared directory is used when invoking `ffx test run`.
    """

    def __init__(self, results_dir: Optional[str] = None) -> None:
        """
        Args:
            results_dir: Directory on the host where results should be stored.
        """
        self._results_dir = results_dir
        self._custom_artifact_directory = None
        self._temp_results_dir = None
        self._debug_data_directory = None

    def __enter__(self):
        if self._results_dir:
            os.makedirs(self._results_dir, exist_ok=True)
        else:
            self._temp_results_dir = tempfile.TemporaryDirectory()
            self._results_dir = self._temp_results_dir.__enter__()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
        if self._temp_results_dir:
            self._temp_results_dir.__exit__(exc_type, exc_val, exc_tb)
            self._temp_results_dir = None

        # Do not suppress exceptions.
        return False

    def run_test(self,
                 component_uri: str,
                 test_args: Optional[Iterable[str]] = None,
                 node_name: Optional[str] = None,
                 test_realm: Optional[str] = None) -> subprocess.Popen:
        """Starts a subprocess to run a test on a target.
        Args:
            component_uri: The test component URI.
            test_args: Arguments to the test package, if any.
            node_name: The target on which to run the test.
        Returns:
            A subprocess.Popen object.
        """
        command = [
            'test', 'run', '--output-directory', self._results_dir,
        ]
        if test_realm:
            command.append("--realm")
            command.append(test_realm)
        command.append(component_uri)
        if test_args:
            command.append('--')
            command.extend(test_args)
        return run_continuous_ffx_command(command,
                                          node_name,
                                          stdout=subprocess.PIPE,
                                          stderr=subprocess.STDOUT)

    def _parse_test_outputs(self):
        """Parses the output files generated by the test runner.

        The instance's `_custom_artifact_directory` member is set to the
        directory holding output files emitted by the test.

        This function is idempotent, and performs no work if it has already been
        called.
        """
        if self._custom_artifact_directory:
            return

        run_summary_path = os.path.join(self._results_dir, 'run_summary.json')
        try:
            with open(run_summary_path) as run_summary_file:
                run_summary = json.load(run_summary_file)
        except IOError:
            logging.exception('Error reading run summary file.')
            return
        except ValueError:
            logging.exception('Error parsing run summary file %s',
                              run_summary_path)
            return

        assert run_summary['schema_id'] == RUN_SUMMARY_SCHEMA, \
            'Unsupported version found in %s' % run_summary_path

        run_artifact_dir = run_summary.get('data', {})['artifact_dir']
        for artifact_path, artifact in run_summary.get(
                'data', {})['artifacts'].items():
            if artifact['artifact_type'] == 'DEBUG':
                self._debug_data_directory = os.path.join(
                    self._results_dir, run_artifact_dir, artifact_path)
                break

        if run_summary['data']['outcome'] == "NOT_STARTED":
            logging.critical('Test execution was interrupted. Either the '
                             'emulator crashed while the tests were still '
                             'running or connection to the device was lost.')
            sys.exit(1)

        # There should be precisely one suite for the test that ran.
        suites_list = run_summary.get('data', {}).get('suites')
        if not suites_list:
            logging.error('Missing or empty list of suites in %s',
                          run_summary_path)
            return
        suite_summary = suites_list[0]

        # Get the top-level directory holding all artifacts for this suite.
        artifact_dir = suite_summary.get('artifact_dir')
        if not artifact_dir:
            logging.error('Failed to find suite\'s artifact_dir in %s',
                          run_summary_path)
            return

        # Get the path corresponding to artifacts
        for artifact_path, artifact in suite_summary['artifacts'].items():
            if artifact['artifact_type'] == 'CUSTOM':
                self._custom_artifact_directory = os.path.join(
                    self._results_dir, artifact_dir, artifact_path)
                break

    def get_custom_artifact_directory(self) -> str:
        """Returns the full path to the directory holding custom artifacts
        emitted by the test or None if the directory could not be discovered.
        """
        self._parse_test_outputs()
        return self._custom_artifact_directory

    def get_debug_data_directory(self):
        """Returns the full path to the directory holding debug data
        emitted by the test, or None if the path cannot be determined.
        """
        self._parse_test_outputs()
        return self._debug_data_directory


def run_symbolizer(symbol_paths: List[str],
                   input_fd: IO,
                   output_fd: IO,
                   raw_bytes: bool = False) -> subprocess.Popen:
    """Runs symbolizer that symbolizes |input| and outputs to |output|."""

    symbolize_cmd = ([
        'debug', 'symbolize', '--', '--omit-module-lines', '--build-id-dir',
        os.path.join(SDK_ROOT, '.build-id')
    ])
    for path in symbol_paths:
        symbolize_cmd.extend(['--ids-txt', path])
    if raw_bytes:
        encoding = None
    else:
        encoding = 'utf-8'
    return run_continuous_ffx_command(symbolize_cmd,
                                      stdin=input_fd,
                                      stdout=output_fd,
                                      stderr=subprocess.STDOUT,
                                      encoding=encoding,
                                      close_fds=True)