# Copyright 2013 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import psutil
import sys
import time
import urllib.parse
import util
import command_executor
from command_executor import Command
from webelement import WebElement
from webshadowroot import WebShadowRoot
from websocket_connection import WebSocketConnection
from exceptions import *
ELEMENT_KEY_W3C = "element-6066-11e4-a52e-4f735466cecf"
ELEMENT_KEY = "ELEMENT"
SHADOW_KEY = "shadow-6066-11e4-a52e-4f735466cecf"
MAX_RETRY_COUNT = 5
def _ExceptionForLegacyResponse(response):
exception_class_map = {
6: InvalidSessionId,
7: NoSuchElement,
8: NoSuchFrame,
9: UnknownCommand,
10: StaleElementReference,
11: ElementNotVisible,
12: InvalidElementState,
13: UnknownError,
17: JavaScriptError,
19: XPathLookupError,
21: Timeout,
23: NoSuchWindow,
24: InvalidCookieDomain,
26: UnexpectedAlertOpen,
27: NoSuchAlert,
28: ScriptTimeout,
32: InvalidSelector,
33: SessionNotCreated,
60: ElementNotInteractable,
61: InvalidArgument,
62: NoSuchCookie,
405: UnsupportedOperation
}
status = response['status']
msg = response['value']['message']
return exception_class_map.get(status, ChromeDriverException)(msg)
def _ExceptionForStandardResponse(response):
error = response['value']['error']
msg = response['value']['message']
stacktrace = response['value']['stacktrace']
if stacktrace:
msg += '\n\nStackTrace:\n\n' + stacktrace
return EXCEPTION_MAP.get(error, ChromeDriverException)(msg)
class ChromeDriver(object):
"""Starts and controls a single Chrome instance on this machine."""
retry_count = 0
retried_tests = []
def __init__(self, server_url, server_pid, **kwargs):
try:
self._InternalInit(server_url, **kwargs)
except Exception as e:
if not str(e).startswith('timed out'):
raise
else:
# Kill ChromeDriver child processes recursively
# (i.e. browsers and their child processes etc)
# when there is a timeout for launching browser
if server_pid:
processes = psutil.Process(server_pid).children(recursive=True)
if len(processes):
print('Terminating', len(processes), 'processes')
for p in processes:
p.terminate()
_, alive = psutil.wait_procs(processes, timeout=3)
if len(alive):
print('Killing', len(alive), 'processes')
for p in alive:
p.kill()
if ChromeDriver.retry_count < MAX_RETRY_COUNT:
ChromeDriver.retried_tests.append(kwargs.get('test_name'))
try:
self._InternalInit(server_url, **kwargs)
except:
# Only count it as retry if failed
print('Retry', ChromeDriver.retry_count, 'failed')
ChromeDriver.retry_count = ChromeDriver.retry_count + 1
raise
else:
raise
def _InternalInit(self, server_url,
chrome_binary=None, android_package=None,
android_activity=None, android_process=None,
android_use_running_app=None, chrome_switches=None,
chrome_extensions=None, chrome_log_path=None,
debugger_address=None, logging_prefs=None,
mobile_emulation=None, experimental_options=None,
download_dir=None, network_connection=None,
send_w3c_capability=True, send_w3c_request=True,
page_load_strategy=None, unexpected_alert_behaviour=None,
devtools_events_to_log=None, accept_insecure_certs=None,
timeouts=None, test_name=None, web_socket_url=None, browser_name=None,
http_timeout=None):
self._executor = command_executor.CommandExecutor(server_url,
http_timeout=http_timeout)
self._server_url = server_url
self.w3c_compliant = False
self.debuggerAddress = None
options = {}
if experimental_options:
assert isinstance(experimental_options, dict)
options = experimental_options.copy()
if android_package:
options['androidPackage'] = android_package
if android_activity:
options['androidActivity'] = android_activity
if android_process:
options['androidProcess'] = android_process
if android_use_running_app:
options['androidUseRunningApp'] = android_use_running_app
elif chrome_binary:
options['binary'] = chrome_binary
if chrome_switches is None:
chrome_switches = []
if sys.platform.startswith('linux') and android_package is None:
# Workaround for crbug.com/611886.
chrome_switches.append('no-sandbox')
# https://bugs.chromium.org/p/chromedriver/issues/detail?id=1695
chrome_switches.append('disable-gpu')
chrome_switches.append('force-color-profile=srgb')
# Resampling can change the distance of a synthetic scroll.
chrome_switches.append('disable-features=ResamplingScrollEvents')
assert type(chrome_switches) is list
options['args'] = chrome_switches
# TODO(crbug.com/40101714): Work around a bug with headless on Mac.
if (util.GetPlatformName() == 'mac' and
browser_name == 'chrome-headless-shell' and
debugger_address is None):
options['excludeSwitches'] = ['--enable-logging']
if mobile_emulation:
assert type(mobile_emulation) is dict
options['mobileEmulation'] = mobile_emulation
if chrome_extensions:
assert type(chrome_extensions) is list
options['extensions'] = chrome_extensions
if chrome_log_path:
assert type(chrome_log_path) is str
options['logPath'] = chrome_log_path
if debugger_address:
assert type(debugger_address) is str
options['debuggerAddress'] = debugger_address
if logging_prefs:
assert type(logging_prefs) is dict
log_types = ['client', 'driver', 'browser', 'server', 'performance',
'devtools']
log_levels = ['ALL', 'DEBUG', 'INFO', 'WARNING', 'SEVERE', 'OFF']
for log_type, log_level in logging_prefs.items():
assert log_type in log_types
assert log_level in log_levels
else:
logging_prefs = {}
if devtools_events_to_log:
assert type(devtools_events_to_log) is list
options['devToolsEventsToLog'] = devtools_events_to_log
if download_dir:
if 'prefs' not in options:
options['prefs'] = {}
if 'download' not in options['prefs']:
options['prefs']['download'] = {}
options['prefs']['download']['default_directory'] = download_dir
if send_w3c_capability is not None:
options['w3c'] = send_w3c_capability
params = {
'goog:chromeOptions': options,
'se:options': {
'loggingPrefs': logging_prefs
}
}
if page_load_strategy:
assert type(page_load_strategy) is str
params['pageLoadStrategy'] = page_load_strategy
if unexpected_alert_behaviour:
assert type(unexpected_alert_behaviour) is str
if send_w3c_request:
params['unhandledPromptBehavior'] = unexpected_alert_behaviour
else:
params['unexpectedAlertBehaviour'] = unexpected_alert_behaviour
if network_connection:
params['networkConnectionEnabled'] = network_connection
if accept_insecure_certs is not None:
params['acceptInsecureCerts'] = accept_insecure_certs
if timeouts is not None:
params['timeouts'] = timeouts
if test_name is not None:
params['goog:testName'] = test_name
if web_socket_url is not None:
params['webSocketUrl'] = web_socket_url
if browser_name is not None:
params['browserName'] = browser_name
if send_w3c_request:
params = {'capabilities': {'alwaysMatch': params}}
else:
params = {'desiredCapabilities': params}
response = self._ExecuteCommand(Command.NEW_SESSION, params)
if len(response.keys()) == 1 and 'value' in response.keys():
self.w3c_compliant = True
self._session_id = response['value']['sessionId']
self.capabilities = self._UnwrapValue(response['value']['capabilities'])
if ('goog:chromeOptions' in self.capabilities
and 'debuggerAddress' in self.capabilities['goog:chromeOptions']):
self.debuggerAddress = str(
self.capabilities['goog:chromeOptions']['debuggerAddress'])
elif isinstance(response['status'], int):
self.w3c_compliant = False
self._session_id = response['sessionId']
self.capabilities = self._UnwrapValue(response['value'])
else:
raise UnknownError("unexpected response")
def _WrapValue(self, value):
"""Wrap value from client side for chromedriver side."""
if isinstance(value, dict):
converted = {}
for key, val in value.items():
converted[key] = self._WrapValue(val)
return converted
elif isinstance(value, WebElement):
if (self.w3c_compliant):
return {ELEMENT_KEY_W3C: value._id}
else:
return {ELEMENT_KEY: value._id}
elif isinstance(value, WebShadowRoot):
return {SHADOW_KEY: value._id}
elif isinstance(value, list):
return list(self._WrapValue(item) for item in value)
else:
return value
def _UnwrapValue(self, value):
if isinstance(value, dict):
if (self.w3c_compliant and len(value) == 1
and ELEMENT_KEY_W3C in value
and isinstance(
value[ELEMENT_KEY_W3C], str)):
return WebElement(self, value[ELEMENT_KEY_W3C])
elif (len(value) == 1 and SHADOW_KEY in value
and isinstance(value[SHADOW_KEY], str)):
return WebShadowRoot(self, value[SHADOW_KEY])
elif (len(value) == 1 and ELEMENT_KEY in value
and isinstance(value[ELEMENT_KEY], str)):
return WebElement(self, value[ELEMENT_KEY])
else:
unwraped = {}
for key, val in value.items():
unwraped[key] = self._UnwrapValue(val)
return unwraped
elif isinstance(value, list):
return list(self._UnwrapValue(item) for item in value)
else:
return value
def _ExecuteCommand(self, command, params={}):
params = self._WrapValue(params)
try:
response = self._executor.Execute(command, params)
except Exception as e:
if str(e).startswith('timed out'):
self._RequestCrash()
raise e
if ('status' in response
and response['status'] != 0):
raise _ExceptionForLegacyResponse(response)
elif (type(response['value']) is dict
and 'error' in response['value']):
raise _ExceptionForStandardResponse(response)
return response
def _RequestCrash(self):
# Can't issue a new command without session_id
if not hasattr(self, '_session_id') or self._session_id == None:
return
tempDriver = ChromeDriver(self._server_url, None,
debugger_address=self.debuggerAddress, test_name='_forceCrash')
try:
tempDriver.SendCommandAndGetResult("Page.crash", {})
# allow time to complete writing the minidump
time.sleep(5)
except Exception as e:
# In some cases, Chrome will not honor the request
# Print the exception as it may give information on the Chrome state
# but Page.crash will also generate exception, so filter that out
message = str(e)
if 'session deleted because of page crash' not in message:
print('\n Exception from Page.crash: ' + message + '\n')
tempDriver.Quit()
def ExecuteCommand(self, command, params={}):
params['sessionId'] = self._session_id
response = self._ExecuteCommand(command, params)
return self._UnwrapValue(response['value'])
def CreateWebSocketConnection(self):
return WebSocketConnection(self._server_url, self._session_id)
def CreateWebSocketConnectionIPv6(self):
url_components = urllib.parse.urlparse(self._server_url)
new_url = urllib.parse.urlunparse(
url_components._replace(
netloc=('%s:%d' % ('[::1]', url_components.port))))
return WebSocketConnection(new_url, self._session_id)
def GetWindowHandles(self):
return self.ExecuteCommand(Command.GET_WINDOW_HANDLES)
def SwitchToWindow(self, handle_or_name):
if self.w3c_compliant:
self.ExecuteCommand(Command.SWITCH_TO_WINDOW, {'handle': handle_or_name})
else:
self.ExecuteCommand(Command.SWITCH_TO_WINDOW, {'name': handle_or_name})
def GetCurrentWindowHandle(self):
return self.ExecuteCommand(Command.GET_CURRENT_WINDOW_HANDLE)
def CloseWindow(self):
return self.ExecuteCommand(Command.CLOSE)
def Load(self, url):
self.ExecuteCommand(Command.GET, {'url': url})
def LaunchApp(self, app_id):
self.ExecuteCommand(Command.LAUNCH_APP, {'id': app_id})
def ExecuteScript(self, script, *args):
converted_args = list(args)
return self.ExecuteCommand(
Command.EXECUTE_SCRIPT, {'script': script, 'args': converted_args})
def CreateVirtualSensor(self, sensor_type, sensor_params=None):
params = {'type': sensor_type}
if sensor_params is not None:
params.update(sensor_params)
return self.ExecuteCommand(Command.CREATE_VIRTUAL_SENSOR, params)
def UpdateVirtualSensor(self, sensor_type, reading):
params = {'type': sensor_type, 'reading': reading}
return self.ExecuteCommand(Command.UPDATE_VIRTUAL_SENSOR, params)
def RemoveVirtualSensor(self, sensor_type):
params = {'type': sensor_type}
return self.ExecuteCommand(Command.REMOVE_VIRTUAL_SENSOR, params)
def GetVirtualSensorInformation(self, sensor_type):
params = {'type': sensor_type}
return self.ExecuteCommand(Command.GET_VIRTUAL_SENSOR_INFORMATION, params)
def SetPermission(self, parameters):
return self.ExecuteCommand(Command.SET_PERMISSION, parameters)
def ExecuteAsyncScript(self, script, *args):
converted_args = list(args)
return self.ExecuteCommand(
Command.EXECUTE_ASYNC_SCRIPT,
{'script': script, 'args': converted_args})
def SwitchToFrame(self, id_or_name):
if isinstance(id_or_name, str) and self.w3c_compliant:
try:
id_or_name = self.FindElement('css selector',
'[id="%s"]' % id_or_name)
except NoSuchElement:
try:
id_or_name = self.FindElement('css selector',
'[name="%s"]' % id_or_name)
except NoSuchElement:
raise NoSuchFrame(id_or_name)
self.ExecuteCommand(Command.SWITCH_TO_FRAME, {'id': id_or_name})
def SwitchToFrameByIndex(self, index):
self.SwitchToFrame(index)
def SwitchToMainFrame(self):
self.SwitchToFrame(None)
def SwitchToParentFrame(self):
self.ExecuteCommand(Command.SWITCH_TO_PARENT_FRAME)
def GetSessions(self):
return self.ExecuteCommand(Command.GET_SESSIONS)
def GetTitle(self):
return self.ExecuteCommand(Command.GET_TITLE)
def GetPageSource(self):
return self.ExecuteCommand(Command.GET_PAGE_SOURCE)
def FindElement(self, strategy, target):
return self.ExecuteCommand(
Command.FIND_ELEMENT, {'using': strategy, 'value': target})
def FindElements(self, strategy, target):
return self.ExecuteCommand(
Command.FIND_ELEMENTS, {'using': strategy, 'value': target})
def GetTimeouts(self):
return self.ExecuteCommand(Command.GET_TIMEOUTS)
def SetTimeouts(self, params):
if (len(params) == 0):
return;
sorted_params = sorted(params.items(), key=lambda x: x[1])
max_kv = sorted_params[-1];
# make sure that we have ms on the both sides of inequality
if (self._executor.HttpTimeout() * 500 < max_kv[1]):
raise ChromeDriverException(
'Timeout "%s" for ChromeDriver exceeds 50%% of the '
'HTTP connection timeout'
% max_kv[0])
return self.ExecuteCommand(Command.SET_TIMEOUTS, params)
def GetCurrentUrl(self):
return self.ExecuteCommand(Command.GET_CURRENT_URL)
def GoBack(self):
return self.ExecuteCommand(Command.GO_BACK)
def GoForward(self):
return self.ExecuteCommand(Command.GO_FORWARD)
def Refresh(self):
return self.ExecuteCommand(Command.REFRESH)
def MouseMoveTo(self, element=None, x_offset=None, y_offset=None):
params = {}
if element is not None:
params['element'] = element._id
if x_offset is not None:
params['xoffset'] = x_offset
if y_offset is not None:
params['yoffset'] = y_offset
self.ExecuteCommand(Command.MOUSE_MOVE_TO, params)
def MouseClick(self, button=0):
self.ExecuteCommand(Command.MOUSE_CLICK, {'button': button})
def MouseButtonDown(self, button=0):
self.ExecuteCommand(Command.MOUSE_BUTTON_DOWN, {'button': button})
def MouseButtonUp(self, button=0):
self.ExecuteCommand(Command.MOUSE_BUTTON_UP, {'button': button})
def MouseDoubleClick(self, button=0):
self.ExecuteCommand(Command.MOUSE_DOUBLE_CLICK, {'button': button})
def TouchDown(self, x, y):
self.ExecuteCommand(Command.TOUCH_DOWN, {'x': x, 'y': y})
def TouchUp(self, x, y):
self.ExecuteCommand(Command.TOUCH_UP, {'x': x, 'y': y})
def TouchMove(self, x, y):
self.ExecuteCommand(Command.TOUCH_MOVE, {'x': x, 'y': y})
def TouchScroll(self, element, xoffset, yoffset):
params = {'element': element._id, 'xoffset': xoffset, 'yoffset': yoffset}
self.ExecuteCommand(Command.TOUCH_SCROLL, params)
def TouchFlick(self, element, xoffset, yoffset, speed):
params = {
'element': element._id,
'xoffset': xoffset,
'yoffset': yoffset,
'speed': speed
}
self.ExecuteCommand(Command.TOUCH_FLICK, params)
def PerformActions(self, actions):
"""
actions: a dictionary containing the specified actions users wish to perform
"""
self.ExecuteCommand(Command.PERFORM_ACTIONS, actions)
def ReleaseActions(self):
self.ExecuteCommand(Command.RELEASE_ACTIONS)
def GetCookies(self):
return self.ExecuteCommand(Command.GET_COOKIES)
def GetNamedCookie(self, name):
return self.ExecuteCommand(Command.GET_NAMED_COOKIE, {'name': name})
def AddCookie(self, cookie):
self.ExecuteCommand(Command.ADD_COOKIE, {'cookie': cookie})
def DeleteCookie(self, name):
self.ExecuteCommand(Command.DELETE_COOKIE, {'name': name})
def DeleteAllCookies(self):
self.ExecuteCommand(Command.DELETE_ALL_COOKIES)
def IsAlertOpen(self):
return self.ExecuteCommand(Command.GET_ALERT)
def GetAlertMessage(self):
return self.ExecuteCommand(Command.GET_ALERT_TEXT)
def HandleAlert(self, accept, prompt_text=''):
if prompt_text:
self.ExecuteCommand(Command.SET_ALERT_VALUE, {'text': prompt_text})
if accept:
cmd = Command.ACCEPT_ALERT
else:
cmd = Command.DISMISS_ALERT
self.ExecuteCommand(cmd)
def IsLoading(self):
return self.ExecuteCommand(Command.IS_LOADING)
def GetWindowPosition(self):
position = self.ExecuteCommand(Command.GET_WINDOW_POSITION,
{'windowHandle': 'current'})
return [position['x'], position['y']]
def SetWindowPosition(self, x, y):
self.ExecuteCommand(Command.SET_WINDOW_POSITION,
{'windowHandle': 'current', 'x': x, 'y': y})
def GetWindowSize(self):
size = self.ExecuteCommand(Command.GET_WINDOW_SIZE,
{'windowHandle': 'current'})
return [size['width'], size['height']]
def NewWindow(self, window_type="window"):
return self.ExecuteCommand(Command.NEW_WINDOW,
{'type': window_type})
def GetWindowRect(self):
rect = self.ExecuteCommand(Command.GET_WINDOW_RECT)
return [rect['width'], rect['height'], rect['x'], rect['y']]
def SetWindowSize(self, width, height):
return self.ExecuteCommand(
Command.SET_WINDOW_SIZE,
{'windowHandle': 'current', 'width': width, 'height': height})
def SetWindowRect(self, width, height, x, y):
return self.ExecuteCommand(
Command.SET_WINDOW_RECT,
{'width': width, 'height': height, 'x': x, 'y': y})
def MaximizeWindow(self):
return self.ExecuteCommand(Command.MAXIMIZE_WINDOW,
{'windowHandle': 'current'})
def MinimizeWindow(self):
return self.ExecuteCommand(Command.MINIMIZE_WINDOW,
{'windowHandle': 'current'})
def FullScreenWindow(self):
return self.ExecuteCommand(Command.FULLSCREEN_WINDOW)
def SetDevicePosture(self, posture):
return self.ExecuteCommand(Command.SET_DEVICE_POSTURE, {'posture': posture})
def ClearDevicePosture(self):
return self.ExecuteCommand(Command.CLEAR_DEVICE_POSTURE)
def TakeScreenshot(self):
return self.ExecuteCommand(Command.SCREENSHOT)
def TakeFullPageScreenshot(self):
return self.ExecuteCommand(Command.FULL_PAGE_SCREENSHOT)
def PrintPDF(self, params={}):
return self.ExecuteCommand(Command.PRINT, params)
def Quit(self):
"""Quits the browser and ends the session."""
self.ExecuteCommand(Command.QUIT)
def GetLog(self, type):
return self.ExecuteCommand(Command.GET_LOG, {'type': type})
def GetAvailableLogTypes(self):
return self.ExecuteCommand(Command.GET_AVAILABLE_LOG_TYPES)
def SetNetworkConditions(self, latency, download_throughput,
upload_throughput, offline=False):
# Until http://crbug.com/456324 is resolved, we'll always set 'offline' to
# False, as going "offline" will sever Chromedriver's connection to Chrome.
params = {
'network_conditions': {
'offline': offline,
'latency': latency,
'download_throughput': download_throughput,
'upload_throughput': upload_throughput
}
}
self.ExecuteCommand(Command.SET_NETWORK_CONDITIONS, params)
def SetNetworkConditionsName(self, network_name):
self.ExecuteCommand(
Command.SET_NETWORK_CONDITIONS, {'network_name': network_name})
def GetNetworkConditions(self):
conditions = self.ExecuteCommand(Command.GET_NETWORK_CONDITIONS)
return {
'latency': conditions['latency'],
'download_throughput': conditions['download_throughput'],
'upload_throughput': conditions['upload_throughput'],
'offline': conditions['offline']
}
def GetNetworkConnection(self):
return self.ExecuteCommand(Command.GET_NETWORK_CONNECTION)
def DeleteNetworkConditions(self):
self.ExecuteCommand(Command.DELETE_NETWORK_CONDITIONS)
def SetNetworkConnection(self, connection_type):
params = {'parameters': {'type': connection_type}}
return self.ExecuteCommand(Command.SET_NETWORK_CONNECTION, params)
def SendCommandAndGetResult(self, cmd, cmd_params):
params = {'cmd': cmd, 'params': cmd_params};
return self.ExecuteCommand(Command.SEND_COMMAND_AND_GET_RESULT, params)
def SendKeys(self, *values):
typing = []
for value in values:
if isinstance(value, int):
value = str(value)
for i in range(len(value)):
typing.append(value[i])
self.ExecuteCommand(Command.SEND_KEYS_TO_ACTIVE_ELEMENT, {'value': typing})
def GenerateTestReport(self, message):
self.ExecuteCommand(Command.GENERATE_TEST_REPORT, {'message': message})
def SetTimeZone(self, timeZone):
return self.ExecuteCommand(Command.SET_TIME_ZONE, {'time_zone': timeZone})
def AddVirtualAuthenticator(self, protocol=None, transport=None,
hasResidentKey=None, hasUserVerification=None,
isUserConsenting=None, isUserVerified=None,
extensions=None, defaultBackupState=None,
defaultBackupEligibility=None):
options = {}
if protocol is not None:
options['protocol'] = protocol
if transport is not None:
options['transport'] = transport
if hasResidentKey is not None:
options['hasResidentKey'] = hasResidentKey
if hasUserVerification is not None:
options['hasUserVerification'] = hasUserVerification
if isUserConsenting is not None:
options['isUserConsenting'] = isUserConsenting
if isUserVerified is not None:
options['isUserVerified'] = isUserVerified
if extensions is not None:
options['extensions'] = extensions
if defaultBackupState is not None:
options['defaultBackupState'] = defaultBackupState
if defaultBackupEligibility is not None:
options['defaultBackupEligibility'] = defaultBackupEligibility
return self.ExecuteCommand(Command.ADD_VIRTUAL_AUTHENTICATOR, options)
def RemoveVirtualAuthenticator(self, authenticatorId):
params = {'authenticatorId': authenticatorId}
return self.ExecuteCommand(Command.REMOVE_VIRTUAL_AUTHENTICATOR, params)
def AddCredential(self, authenticatorId=None, credentialId=None,
isResidentCredential=None, rpId=None, privateKey=None,
userHandle=None, signCount=None, largeBlob=None,
backupState=None, backupEligibility=None):
options = {}
if authenticatorId is not None:
options['authenticatorId'] = authenticatorId
if credentialId is not None:
options['credentialId'] = credentialId
if isResidentCredential is not None:
options['isResidentCredential'] = isResidentCredential
if rpId is not None:
options['rpId'] = rpId
if privateKey is not None:
options['privateKey'] = privateKey
if userHandle is not None:
options['userHandle'] = userHandle
if signCount is not None:
options['signCount'] = signCount
if largeBlob is not None:
options['largeBlob'] = largeBlob
if backupState is not None:
options['backupState'] = backupState
if backupEligibility is not None:
options['backupEligibility'] = backupEligibility
return self.ExecuteCommand(Command.ADD_CREDENTIAL, options)
def GetCredentials(self, authenticatorId):
params = {'authenticatorId': authenticatorId}
return self.ExecuteCommand(Command.GET_CREDENTIALS, params)
def RemoveCredential(self, authenticatorId, credentialId):
params = {'authenticatorId': authenticatorId,
'credentialId': credentialId}
return self.ExecuteCommand(Command.REMOVE_CREDENTIAL, params)
def RemoveAllCredentials(self, authenticatorId):
params = {'authenticatorId': authenticatorId}
return self.ExecuteCommand(Command.REMOVE_ALL_CREDENTIALS, params)
def SetUserVerified(self, authenticatorId, isUserVerified):
params = {'authenticatorId': authenticatorId,
'isUserVerified': isUserVerified}
return self.ExecuteCommand(Command.SET_USER_VERIFIED, params)
def SetCredentialProperties(self, authenticatorId, credentialId,
backupState=None, backupEligibility=None):
params = {'authenticatorId': authenticatorId, 'credentialId': credentialId}
if backupState is not None:
params['backupState'] = backupState
if backupEligibility is not None:
params['backupEligibility'] = backupEligibility
return self.ExecuteCommand(Command.SET_CREDENTIAL_PROPERTIES, params)
def SetSPCTransactionMode(self, mode):
params = {'mode': mode}
return self.ExecuteCommand(Command.SET_SPC_TRANSACTION_MODE, params)
def SetRPHRegistrationMode(self, mode):
params = {'mode': mode}
return self.ExecuteCommand(Command.SET_RPH_REGISTRATION_MODE, params)
def CancelFedCmDialog(self):
return self.ExecuteCommand(Command.CANCEL_FEDCM_DIALOG, {})
def SelectAccount(self, index):
params = {'accountIndex': index}
return self.ExecuteCommand(Command.SELECT_ACCOUNT, params)
def ClickFedCmDialogButton(self, dialogButton, index=None):
params = {'dialogButton': dialogButton}
if index is not None:
params['index'] = index
return self.ExecuteCommand(Command.CLICK_FEDCM_DIALOG_BUTTON, params)
def GetAccounts(self):
return self.ExecuteCommand(Command.GET_ACCOUNTS, {})
def GetFedCmTitle(self):
return self.ExecuteCommand(Command.GET_FEDCM_TITLE, {})
def GetDialogType(self):
return self.ExecuteCommand(Command.GET_DIALOG_TYPE, {})
def SetDelayEnabled(self, enabled):
params = {'enabled': enabled}
return self.ExecuteCommand(Command.SET_DELAY_ENABLED, params)
def ResetCooldown(self):
return self.ExecuteCommand(Command.RESET_COOLDOWN, {})
def RunBounceTrackingMitigations(self):
return self.ExecuteCommand(Command.RUN_BOUNCE_TRACKING_MITIGATIONS, {})
def GetSessionId(self):
if not hasattr(self, '_session_id'):
return None
return self._session_id
def GetCastSinks(self, vendorId):
params = {'vendorId': vendorId}
return self.ExecuteCommand(Command.GET_CAST_SINKS, params)
def CreateVirtualPressureSource(self, type, metadata=None):
params = {'type': type}
if metadata is not None:
params.update(metadata)
return self.ExecuteCommand(Command.CREATE_VIRTUAL_PRESSURE_SOURCE, params)
def UpdateVirtualPressureSource(self, type, sample):
params = {'type': type, 'sample': sample}
return self.ExecuteCommand(Command.UPDATE_VIRTUAL_PRESSURE_SOURCE, params)
def RemoveVirtualPressureSource(self, type):
params = {'type': type}
return self.ExecuteCommand(Command.REMOVE_VIRTUAL_PRESSURE_SOURCE, params)
def __enter__(self):
return self
def __exit__(self, *args):
self.Quit()