chromium/ios/build/bots/scripts/plugin/test_plugin_service.py

# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

from concurrent import futures
import grpc
import logging
import os
import subprocess
import sys

# if the current directory is in scripts (pwd), then we need to
# add plugin in order to import from that directory
if os.path.split(os.path.dirname(__file__))[1] != 'plugin':
  sys.path.append(
      os.path.join(os.path.abspath(os.path.dirname(__file__)), 'plugin'))
from plugin_constants import PLUGIN_PROTOS_PATH, PLUGIN_SERVICE_WORKER_COUNT, PLUGIN_SERVICE_ADDRESS, PLUGIN_PROXY_SERVICE_PORT, REMOTE_PLUGIN_PROXY_PORT

sys.path.append(PLUGIN_PROTOS_PATH)
import test_plugin_service_pb2
import test_plugin_service_pb2_grpc

LOGGER = logging.getLogger(__name__)

class TestPluginServicer(test_plugin_service_pb2_grpc.TestPluginServiceServicer
                        ):
  """
  Implementation of test plugin service for communication between
  iOS test runner and EG tests
  """

  def __init__(self, enabled_plugins):
    """ Initializes a new instance of this class.

    Args:
      enabled_plugins: a list of initialized plugins to be used during
        different lifecycle stages of test execution. See the list of
        available test plugins in test_plugins.py

    """
    self.plugins = enabled_plugins

  def TestCaseWillStart(self, request, context):
    """ Executes plugin tasks when a test case is about to start """
    LOGGER.info('Received request for TestCaseWillStart %s', request)
    for plugin in self.plugins:
      plugin.test_case_will_start(request)
    return test_plugin_service_pb2.TestCaseWillStartResponse()

  def TestCaseDidFinish(self, request, context):
    """ Executes plugin tasks when a test case just finished executing """
    LOGGER.info('Received request for TestCaseDidFinish %s', request)
    for plugin in self.plugins:
      plugin.test_case_did_finish(request)
    return test_plugin_service_pb2.TestCaseDidFinishResponse()

  def TestCaseDidFail(self, request, context):
    """ Executes plugin tasks when a test case failed unexpectedly """
    LOGGER.info('Received request for TestCaseDidFail %s', request)
    for plugin in self.plugins:
      plugin.test_case_did_fail(request)
    return test_plugin_service_pb2.TestCaseDidFailResponse()

  def TestBundleWillFinish(self, request, context):
    """ Executes plugin tasks when a test bundle is about to finish"""
    LOGGER.info('Received request for TestBundleWillFinish %s', request)
    for plugin in self.plugins:
      plugin.test_bundle_will_finish(request)
    return test_plugin_service_pb2.TestBundleWillFinishResponse()

  def ListEnabledPlugins(self, request, context):
    """ Returns the list of enabled plugins """
    LOGGER.info('Received request for ListEnabledPlugins %s', request)
    plugins_str = [str(p) for p in self.plugins]
    return test_plugin_service_pb2.ListEnabledPluginsResponse(
        enabled_plugins=plugins_str)

  def reset(self):
    """
    Runs reset tasks for each plugin. This might be useful between
    each attempt of test runs
    """
    for plugin in self.plugins:
      LOGGER.info('Resetting %s', plugin)
      plugin.reset()


class TestPluginServicerWrapper:
  """ Wrapper for the test plugin service above.

  This class is useful for
  managing the plugin service such as start, tear_down, etc...
  """

  def __init__(self, servicer, device_proxy=None):
    """Initializes a new instance of this class.

    Args:
      servicer: an initialized test plugin service above
      device_proxy: default to be none. Pass in an instance of
        PluginServiceProxyWrapper if the test is running on physical device.

    """
    self.servicer = servicer
    self.server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=PLUGIN_SERVICE_WORKER_COUNT))
    self.device_proxy = device_proxy

  def start_server(self):
    """ Starts a test plugin service, so it can receive gRPC requests """
    LOGGER.info('Starting test plugin server...')
    test_plugin_service_pb2_grpc.add_TestPluginServiceServicer_to_server(
        self.servicer, self.server)
    self.server.add_insecure_port(PLUGIN_SERVICE_ADDRESS)
    self.server.start()
    LOGGER.info('Test plugin server is running!')

    if self.device_proxy:
      self.device_proxy.start()
      LOGGER.info('Test plugin proxy server is running!')

  def wait_for_termination(self):
    """ Block current thread until the test plugin service stops """
    self.server.wait_for_termination()

  def tear_down(self):
    """ Resets and stop the test plugin service """
    LOGGER.info('Tearing down plugin service...')
    self.reset()
    self.server.stop(grace=None)

    if self.device_proxy:
      self.device_proxy.tear_down()

  def reset(self):
    """ Resets the test plugin service. This might be useful between
    each attempt of test runs """
    LOGGER.info('Resetting plugin service')
    self.servicer.reset()

    if self.device_proxy:
      self.device_proxy.reset()


# for testing purpose only when running locally
if __name__ == '__main__':
  server = TestPluginServicerWrapper(TestPluginServicer([]))
  server.start_server()
  server.wait_for_termination()