# Copyright 2022 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from concurrent import futures
import grpc
import logging
import os
import subprocess
import sys
# if the current directory is in scripts (pwd), then we need to
# add plugin in order to import from that directory
if os.path.split(os.path.dirname(__file__))[1] != 'plugin':
sys.path.append(
os.path.join(os.path.abspath(os.path.dirname(__file__)), 'plugin'))
from plugin_constants import PLUGIN_PROTOS_PATH, PLUGIN_SERVICE_WORKER_COUNT, PLUGIN_SERVICE_ADDRESS, PLUGIN_PROXY_SERVICE_PORT, REMOTE_PLUGIN_PROXY_PORT
sys.path.append(PLUGIN_PROTOS_PATH)
import test_plugin_service_pb2
import test_plugin_service_pb2_grpc
LOGGER = logging.getLogger(__name__)
class TestPluginServicer(test_plugin_service_pb2_grpc.TestPluginServiceServicer
):
"""
Implementation of test plugin service for communication between
iOS test runner and EG tests
"""
def __init__(self, enabled_plugins):
""" Initializes a new instance of this class.
Args:
enabled_plugins: a list of initialized plugins to be used during
different lifecycle stages of test execution. See the list of
available test plugins in test_plugins.py
"""
self.plugins = enabled_plugins
def TestCaseWillStart(self, request, context):
""" Executes plugin tasks when a test case is about to start """
LOGGER.info('Received request for TestCaseWillStart %s', request)
for plugin in self.plugins:
plugin.test_case_will_start(request)
return test_plugin_service_pb2.TestCaseWillStartResponse()
def TestCaseDidFinish(self, request, context):
""" Executes plugin tasks when a test case just finished executing """
LOGGER.info('Received request for TestCaseDidFinish %s', request)
for plugin in self.plugins:
plugin.test_case_did_finish(request)
return test_plugin_service_pb2.TestCaseDidFinishResponse()
def TestCaseDidFail(self, request, context):
""" Executes plugin tasks when a test case failed unexpectedly """
LOGGER.info('Received request for TestCaseDidFail %s', request)
for plugin in self.plugins:
plugin.test_case_did_fail(request)
return test_plugin_service_pb2.TestCaseDidFailResponse()
def TestBundleWillFinish(self, request, context):
""" Executes plugin tasks when a test bundle is about to finish"""
LOGGER.info('Received request for TestBundleWillFinish %s', request)
for plugin in self.plugins:
plugin.test_bundle_will_finish(request)
return test_plugin_service_pb2.TestBundleWillFinishResponse()
def ListEnabledPlugins(self, request, context):
""" Returns the list of enabled plugins """
LOGGER.info('Received request for ListEnabledPlugins %s', request)
plugins_str = [str(p) for p in self.plugins]
return test_plugin_service_pb2.ListEnabledPluginsResponse(
enabled_plugins=plugins_str)
def reset(self):
"""
Runs reset tasks for each plugin. This might be useful between
each attempt of test runs
"""
for plugin in self.plugins:
LOGGER.info('Resetting %s', plugin)
plugin.reset()
class TestPluginServicerWrapper:
""" Wrapper for the test plugin service above.
This class is useful for
managing the plugin service such as start, tear_down, etc...
"""
def __init__(self, servicer, device_proxy=None):
"""Initializes a new instance of this class.
Args:
servicer: an initialized test plugin service above
device_proxy: default to be none. Pass in an instance of
PluginServiceProxyWrapper if the test is running on physical device.
"""
self.servicer = servicer
self.server = grpc.server(
futures.ThreadPoolExecutor(max_workers=PLUGIN_SERVICE_WORKER_COUNT))
self.device_proxy = device_proxy
def start_server(self):
""" Starts a test plugin service, so it can receive gRPC requests """
LOGGER.info('Starting test plugin server...')
test_plugin_service_pb2_grpc.add_TestPluginServiceServicer_to_server(
self.servicer, self.server)
self.server.add_insecure_port(PLUGIN_SERVICE_ADDRESS)
self.server.start()
LOGGER.info('Test plugin server is running!')
if self.device_proxy:
self.device_proxy.start()
LOGGER.info('Test plugin proxy server is running!')
def wait_for_termination(self):
""" Block current thread until the test plugin service stops """
self.server.wait_for_termination()
def tear_down(self):
""" Resets and stop the test plugin service """
LOGGER.info('Tearing down plugin service...')
self.reset()
self.server.stop(grace=None)
if self.device_proxy:
self.device_proxy.tear_down()
def reset(self):
""" Resets the test plugin service. This might be useful between
each attempt of test runs """
LOGGER.info('Resetting plugin service')
self.servicer.reset()
if self.device_proxy:
self.device_proxy.reset()
# for testing purpose only when running locally
if __name__ == '__main__':
server = TestPluginServicerWrapper(TestPluginServicer([]))
server.start_server()
server.wait_for_termination()