# mypy: allow-untyped-defs
from typing import Dict
from urllib import parse as urlparse
from . import error
from . import protocol
from . import transport
from .bidi.client import BidiSession
def command(func):
def inner(self, *args, **kwargs):
if hasattr(self, "session"):
session = self.session
else:
session = self
if session.session_id is None:
session.start()
return func(self, *args, **kwargs)
inner.__name__ = func.__name__
inner.__doc__ = func.__doc__
return inner
class Timeouts:
def __init__(self, session):
self.session = session
def _get(self, key=None):
timeouts = self.session.send_session_command("GET", "timeouts")
if key is not None:
return timeouts[key]
return timeouts
def _set(self, key, secs):
body = {key: secs * 1000}
self.session.send_session_command("POST", "timeouts", body)
return None
@property
def script(self):
return self._get("script")
@script.setter
def script(self, secs):
return self._set("script", secs)
@property
def page_load(self):
return self._get("pageLoad")
@page_load.setter
def page_load(self, secs):
return self._set("pageLoad", secs)
@property
def implicit(self):
return self._get("implicit")
@implicit.setter
def implicit(self, secs):
return self._set("implicit", secs)
def __str__(self):
name = "%s.%s" % (self.__module__, self.__class__.__name__)
return "<%s script=%d, load=%d, implicit=%d>" % \
(name, self.script, self.page_load, self.implicit)
class ActionSequence:
"""API for creating and performing action sequences.
Each action method adds one or more actions to a queue. When perform()
is called, the queued actions fire in order.
May be chained together as in::
ActionSequence(session, "key", id) \
.key_down("a") \
.key_up("a") \
.perform()
"""
def __init__(self, session, action_type, input_id, pointer_params=None):
"""Represents a sequence of actions of one type for one input source.
:param session: WebDriver session.
:param action_type: Action type; may be "none", "key", or "pointer".
:param input_id: ID of input source.
:param pointer_params: Optional dictionary of pointer parameters.
"""
self.session = session
self._id = input_id
self._type = action_type
self._actions = []
self._pointer_params = pointer_params
@property
def dict(self):
d = {
"type": self._type,
"id": self._id,
"actions": self._actions,
}
if self._pointer_params is not None:
d["parameters"] = self._pointer_params
return d
@command
def perform(self):
"""Perform all queued actions."""
self.session.actions.perform([self.dict])
def _key_action(self, subtype, value):
self._actions.append({"type": subtype, "value": value})
def _pointer_action(self, subtype, button=None, x=None, y=None, duration=None, origin=None, width=None,
height=None, pressure=None, tangential_pressure=None, tilt_x=None,
tilt_y=None, twist=None, altitude_angle=None, azimuth_angle=None):
action = {
"type": subtype
}
if button is not None:
action["button"] = button
if x is not None:
action["x"] = x
if y is not None:
action["y"] = y
if duration is not None:
action["duration"] = duration
if origin is not None:
action["origin"] = origin
if width is not None:
action["width"] = width
if height is not None:
action["height"] = height
if pressure is not None:
action["pressure"] = pressure
if tangential_pressure is not None:
action["tangentialPressure"] = tangential_pressure
if tilt_x is not None:
action["tiltX"] = tilt_x
if tilt_y is not None:
action["tiltY"] = tilt_y
if twist is not None:
action["twist"] = twist
if altitude_angle is not None:
action["altitudeAngle"] = altitude_angle
if azimuth_angle is not None:
action["azimuthAngle"] = azimuth_angle
self._actions.append(action)
def pause(self, duration):
self._actions.append({"type": "pause", "duration": duration})
return self
def pointer_move(self, x, y, duration=None, origin=None, width=None, height=None,
pressure=None, tangential_pressure=None, tilt_x=None, tilt_y=None,
twist=None, altitude_angle=None, azimuth_angle=None):
"""Queue a pointerMove action.
:param x: Destination x-axis coordinate of pointer in CSS pixels.
:param y: Destination y-axis coordinate of pointer in CSS pixels.
:param duration: Number of milliseconds over which to distribute the
move. If None, remote end defaults to 0.
:param origin: Origin of coordinates, either "viewport", "pointer" or
an Element. If None, remote end defaults to "viewport".
"""
self._pointer_action("pointerMove", x=x, y=y, duration=duration, origin=origin,
width=width, height=height, pressure=pressure,
tangential_pressure=tangential_pressure, tilt_x=tilt_x, tilt_y=tilt_y,
twist=twist, altitude_angle=altitude_angle, azimuth_angle=azimuth_angle)
return self
def pointer_up(self, button=0):
"""Queue a pointerUp action for `button`.
:param button: Pointer button to perform action with.
Default: 0, which represents main device button.
"""
self._pointer_action("pointerUp", button=button)
return self
def pointer_down(self, button=0, width=None, height=None, pressure=None,
tangential_pressure=None, tilt_x=None, tilt_y=None,
twist=None, altitude_angle=None, azimuth_angle=None):
"""Queue a pointerDown action for `button`.
:param button: Pointer button to perform action with.
Default: 0, which represents main device button.
"""
self._pointer_action("pointerDown", button=button, width=width, height=height,
pressure=pressure, tangential_pressure=tangential_pressure,
tilt_x=tilt_x, tilt_y=tilt_y, twist=twist, altitude_angle=altitude_angle,
azimuth_angle=azimuth_angle)
return self
def click(self, element=None, button=0):
"""Queue a click with the specified button.
If an element is given, move the pointer to that element first,
otherwise click current pointer coordinates.
:param element: Optional element to click.
:param button: Integer representing pointer button to perform action
with. Default: 0, which represents main device button.
"""
if element:
self.pointer_move(0, 0, origin=element)
return self.pointer_down(button).pointer_up(button)
def key_up(self, value):
"""Queue a keyUp action for `value`.
:param value: Character to perform key action with.
"""
self._key_action("keyUp", value)
return self
def key_down(self, value):
"""Queue a keyDown action for `value`.
:param value: Character to perform key action with.
"""
self._key_action("keyDown", value)
return self
def send_keys(self, keys):
"""Queue a keyDown and keyUp action for each character in `keys`.
:param keys: String of keys to perform key actions with.
"""
for c in keys:
self.key_down(c)
self.key_up(c)
return self
def scroll(self, x, y, delta_x, delta_y, duration=None, origin=None):
"""Queue a scroll action.
:param x: Destination x-axis coordinate of pointer in CSS pixels.
:param y: Destination y-axis coordinate of pointer in CSS pixels.
:param delta_x: scroll delta on x-axis in CSS pixels.
:param delta_y: scroll delta on y-axis in CSS pixels.
:param duration: Number of milliseconds over which to distribute the
scroll. If None, remote end defaults to 0.
:param origin: Origin of coordinates, either "viewport" or an Element.
If None, remote end defaults to "viewport".
"""
action = {
"type": "scroll",
"x": x,
"y": y,
"deltaX": delta_x,
"deltaY": delta_y
}
if duration is not None:
action["duration"] = duration
if origin is not None:
action["origin"] = origin
self._actions.append(action)
return self
class Actions:
def __init__(self, session):
self.session = session
@command
def perform(self, actions=None):
"""Performs actions by tick from each action sequence in `actions`.
:param actions: List of input source action sequences. A single action
sequence may be created with the help of
``ActionSequence.dict``.
"""
body = {"actions": [] if actions is None else actions}
actions = self.session.send_session_command("POST", "actions", body)
return actions
@command
def release(self):
return self.session.send_session_command("DELETE", "actions")
def sequence(self, *args, **kwargs):
"""Return an empty ActionSequence of the designated type.
See ActionSequence for parameter list.
"""
return ActionSequence(self.session, *args, **kwargs)
class BrowserWindow:
def __init__(self, session):
self.session = session
@command
def close(self):
handles = self.session.send_session_command("DELETE", "window")
if handles is not None and len(handles) == 0:
# With no more open top-level browsing contexts, the session is closed.
self.session.session_id = None
return handles
@property
@command
def rect(self):
return self.session.send_session_command("GET", "window/rect")
@rect.setter
@command
def rect(self, new_rect):
self.session.send_session_command("POST", "window/rect", new_rect)
@property
@command
def size(self):
"""Gets the window size as a tuple of `(width, height)`."""
rect = self.rect
return (rect["width"], rect["height"])
@size.setter
@command
def size(self, new_size):
"""Set window size by passing a tuple of `(width, height)`."""
try:
width, height = new_size
body = {"width": width, "height": height}
self.session.send_session_command("POST", "window/rect", body)
except (error.UnknownErrorException, error.InvalidArgumentException):
# silently ignore this error as the command is not implemented
# for Android. Revert this once it is implemented.
pass
@property
@command
def position(self):
"""Gets the window position as a tuple of `(x, y)`."""
rect = self.rect
return (rect["x"], rect["y"])
@position.setter
@command
def position(self, new_position):
"""Set window position by passing a tuple of `(x, y)`."""
try:
x, y = new_position
body = {"x": x, "y": y}
self.session.send_session_command("POST", "window/rect", body)
except error.UnknownErrorException:
# silently ignore this error as the command is not implemented
# for Android. Revert this once it is implemented.
pass
@command
def maximize(self):
return self.session.send_session_command("POST", "window/maximize")
@command
def minimize(self):
return self.session.send_session_command("POST", "window/minimize")
@command
def fullscreen(self):
return self.session.send_session_command("POST", "window/fullscreen")
class Find:
def __init__(self, session):
self.session = session
@command
def css(self, element_selector, all=True):
elements = self._find_element("css selector", element_selector, all)
return elements
def _find_element(self, strategy, selector, all):
route = "elements" if all else "element"
body = {"using": strategy,
"value": selector}
return self.session.send_session_command("POST", route, body)
class Cookies:
def __init__(self, session):
self.session = session
def __getitem__(self, name):
self.session.send_session_command("GET", "cookie/%s" % name, {})
def __setitem__(self, name, value):
cookie = {"name": name,
"value": None}
if isinstance(name, str):
cookie["value"] = value
elif hasattr(value, "value"):
cookie["value"] = value.value
self.session.send_session_command("POST", "cookie/%s" % name, {})
class UserPrompt:
def __init__(self, session):
self.session = session
@command
def dismiss(self):
self.session.send_session_command("POST", "alert/dismiss")
@command
def accept(self):
self.session.send_session_command("POST", "alert/accept")
@property
@command
def text(self):
return self.session.send_session_command("GET", "alert/text")
@text.setter
@command
def text(self, value):
body = {"text": value}
self.session.send_session_command("POST", "alert/text", body=body)
class Session:
def __init__(self,
host,
port,
url_prefix="/",
enable_bidi=False,
capabilities=None,
extension=None):
if enable_bidi:
if capabilities is not None:
capabilities.setdefault("alwaysMatch", {}).update({"webSocketUrl": True})
else:
capabilities = {"alwaysMatch": {"webSocketUrl": True}}
self.transport = transport.HTTPWireProtocol(host, port, url_prefix)
self.requested_capabilities = capabilities
self.capabilities = None
self.session_id = None
self.timeouts = None
self.window = None
self.find = None
self.enable_bidi = enable_bidi
self.bidi_session = None
self.extension = None
self.extension_cls = extension
self.timeouts = Timeouts(self)
self.window = BrowserWindow(self)
self.find = Find(self)
self.alert = UserPrompt(self)
self.actions = Actions(self)
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.session_id or "(disconnected)")
def __eq__(self, other):
return (self.session_id is not None and isinstance(other, Session) and
self.session_id == other.session_id)
def __enter__(self):
self.start()
return self
def __exit__(self, *args, **kwargs):
self.end()
def __del__(self):
self.end()
def match(self, capabilities):
return self.requested_capabilities == capabilities
def start(self):
"""Start a new WebDriver session.
:return: Dictionary with `capabilities` and `sessionId`.
:raises error.WebDriverException: If the remote end returns
an error.
"""
if self.session_id is not None:
return
self.transport.close()
body = {"capabilities": {}}
if self.requested_capabilities is not None:
body["capabilities"] = self.requested_capabilities
value = self.send_command("POST", "session", body=body)
assert isinstance(value["sessionId"], str)
assert isinstance(value["capabilities"], Dict)
self.session_id = value["sessionId"]
self.capabilities = value["capabilities"]
if "webSocketUrl" in self.capabilities:
self.bidi_session = BidiSession.from_http(self.session_id,
self.capabilities)
elif self.enable_bidi:
self.end()
raise error.SessionNotCreatedException(
"Requested bidi session, but webSocketUrl capability not found")
if self.extension_cls:
self.extension = self.extension_cls(self)
return value
def end(self):
"""Try to close the active session."""
if self.session_id is None:
return
try:
self.send_command("DELETE", "session/%s" % self.session_id)
except (OSError, error.InvalidSessionIdException):
pass
finally:
self.session_id = None
self.transport.close()
def send_command(self, method, url, body=None, timeout=None):
"""
Send a command to the remote end and validate its success.
:param method: HTTP method to use in request.
:param uri: "Command part" of the HTTP request URL,
e.g. `window/rect`.
:param body: Optional body of the HTTP request.
:return: `None` if the HTTP response body was empty, otherwise
the `value` field returned after parsing the response
body as JSON.
:raises error.WebDriverException: If the remote end returns
an error.
:raises ValueError: If the response body does not contain a
`value` key.
"""
response = self.transport.send(
method, url, body,
encoder=protocol.Encoder, decoder=protocol.Decoder,
session=self, timeout=timeout)
if response.status != 200:
err = error.from_response(response)
if isinstance(err, error.InvalidSessionIdException):
# The driver could have already been deleted the session.
self.session_id = None
raise err
if "value" in response.body:
value = response.body["value"]
"""
Edge does not yet return the w3c session ID.
We want the tests to run in Edge anyway to help with REC.
In order to run the tests in Edge, we need to hack around
bug:
https://developer.microsoft.com/en-us/microsoft-edge/platform/issues/14641972
"""
if url == "session" and method == "POST" and "sessionId" in response.body and "sessionId" not in value:
value["sessionId"] = response.body["sessionId"]
else:
raise ValueError("Expected 'value' key in response body:\n"
"%s" % response)
return value
def send_session_command(self, method, uri, body=None, timeout=None):
"""
Send a command to an established session and validate its success.
:param method: HTTP method to use in request.
:param url: "Command part" of the HTTP request URL,
e.g. `window/rect`.
:param body: Optional body of the HTTP request. Must be JSON
serialisable.
:return: `None` if the HTTP response body was empty, otherwise
the result of parsing the body as JSON.
:raises error.WebDriverException: If the remote end returns
an error.
"""
url = urlparse.urljoin("session/%s/" % self.session_id, uri)
return self.send_command(method, url, body, timeout)
@property
@command
def url(self):
return self.send_session_command("GET", "url")
@url.setter
@command
def url(self, url):
if urlparse.urlsplit(url).netloc is None:
return self.url(url)
body = {"url": url}
return self.send_session_command("POST", "url", body)
@command
def back(self):
return self.send_session_command("POST", "back")
@command
def forward(self):
return self.send_session_command("POST", "forward")
@command
def refresh(self):
return self.send_session_command("POST", "refresh")
@property
@command
def title(self):
return self.send_session_command("GET", "title")
@property
@command
def source(self):
return self.send_session_command("GET", "source")
@command
def new_window(self, type_hint="tab"):
body = {"type": type_hint}
value = self.send_session_command("POST", "window/new", body)
return value["handle"]
@property
@command
def window_handle(self):
return self.send_session_command("GET", "window")
@window_handle.setter
@command
def window_handle(self, handle):
body = {"handle": handle}
return self.send_session_command("POST", "window", body=body)
def switch_frame(self, frame):
if frame == "parent":
url = "frame/parent"
body = None
else:
url = "frame"
body = {"id": frame}
return self.send_session_command("POST", url, body)
@property
@command
def handles(self):
return self.send_session_command("GET", "window/handles")
@property
@command
def active_element(self):
return self.send_session_command("GET", "element/active")
@command
def cookies(self, name=None):
if name is None:
url = "cookie"
else:
url = "cookie/%s" % name
return self.send_session_command("GET", url, {})
@command
def set_cookie(self, name, value, path=None, domain=None,
secure=None, expiry=None, http_only=None):
body = {
"name": name,
"value": value,
}
if domain is not None:
body["domain"] = domain
if expiry is not None:
body["expiry"] = expiry
if http_only is not None:
body["httpOnly"] = http_only
if path is not None:
body["path"] = path
if secure is not None:
body["secure"] = secure
self.send_session_command("POST", "cookie", {"cookie": body})
def delete_cookie(self, name=None):
if name is None:
url = "cookie"
else:
url = "cookie/%s" % name
self.send_session_command("DELETE", url, {})
#[...]
@command
def execute_script(self, script, args=None):
if args is None:
args = []
body = {
"script": script,
"args": args
}
return self.send_session_command("POST", "execute/sync", body)
@command
def execute_async_script(self, script, args=None):
if args is None:
args = []
body = {
"script": script,
"args": args
}
return self.send_session_command("POST", "execute/async", body)
#[...]
@command
def screenshot(self):
return self.send_session_command("GET", "screenshot")
class ShadowRoot:
identifier = "shadow-6066-11e4-a52e-4f735466cecf"
def __init__(self, session, id):
"""
Construct a new shadow root representation.
:param id: Shadow root UUID which must be unique across
all browsing contexts.
:param session: Current ``webdriver.Session``.
"""
self.id = id
self.session = session
@classmethod
def from_json(cls, json, session):
uuid = json[ShadowRoot.identifier]
return cls(session, uuid)
def send_shadow_command(self, method, uri, body=None):
url = f"shadow/{self.id}/{uri}"
return self.session.send_session_command(method, url, body)
@command
def find_element(self, strategy, selector):
body = {"using": strategy,
"value": selector}
return self.send_shadow_command("POST", "element", body)
@command
def find_elements(self, strategy, selector):
body = {"using": strategy,
"value": selector}
return self.send_shadow_command("POST", "elements", body)
class WebElement:
"""
Representation of a web element.
A web element is an abstraction used to identify an element when
it is transported via the protocol, between remote- and local ends.
"""
identifier = "element-6066-11e4-a52e-4f735466cecf"
def __init__(self, session, id):
"""
Construct a new web element representation.
:param session: Current ``webdriver.Session``.
:param id: Web element UUID which must be unique across all browsing contexts.
"""
self.id = id
self.session = session
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.id)
def __eq__(self, other):
return (isinstance(other, WebElement) and self.id == other.id and
self.session == other.session)
@classmethod
def from_json(cls, json, session):
uuid = json[WebElement.identifier]
return cls(session, uuid)
def send_element_command(self, method, uri, body=None):
url = "element/%s/%s" % (self.id, uri)
return self.session.send_session_command(method, url, body)
@command
def find_element(self, strategy, selector):
body = {"using": strategy,
"value": selector}
return self.send_element_command("POST", "element", body)
@command
def click(self):
self.send_element_command("POST", "click", {})
@command
def tap(self):
self.send_element_command("POST", "tap", {})
@command
def clear(self):
self.send_element_command("POST", "clear", {})
@command
def send_keys(self, text):
return self.send_element_command("POST", "value", {"text": text})
@property
@command
def text(self):
return self.send_element_command("GET", "text")
@property
@command
def name(self):
return self.send_element_command("GET", "name")
@command
def style(self, property_name):
return self.send_element_command("GET", "css/%s" % property_name)
@property
@command
def rect(self):
return self.send_element_command("GET", "rect")
@property
@command
def selected(self):
return self.send_element_command("GET", "selected")
@command
def screenshot(self):
return self.send_element_command("GET", "screenshot")
@property
@command
def shadow_root(self):
return self.send_element_command("GET", "shadow")
@command
def attribute(self, name):
return self.send_element_command("GET", "attribute/%s" % name)
@command
def get_computed_label(self):
return self.send_element_command("GET", "computedlabel")
@command
def get_computed_role(self):
return self.send_element_command("GET", "computedrole")
# This MUST come last because otherwise @property decorators above
# will be overridden by this.
@command
def property(self, name):
return self.send_element_command("GET", "property/%s" % name)
class WebFrame:
identifier = "frame-075b-4da1-b6ba-e579c2d3230a"
def __init__(self, session, id):
self.id = id
self.session = session
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.id)
def __eq__(self, other):
return (isinstance(other, WebFrame) and self.id == other.id and
self.session == other.session)
@classmethod
def from_json(cls, json, session):
uuid = json[WebFrame.identifier]
return cls(session, uuid)
class WebWindow:
identifier = "window-fcc6-11e5-b4f8-330a88ab9d7f"
def __init__(self, session, id):
self.id = id
self.session = session
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.id)
def __eq__(self, other):
return (isinstance(other, WebWindow) and self.id == other.id and
self.session == other.session)
@classmethod
def from_json(cls, json, session):
uuid = json[WebWindow.identifier]
return cls(session, uuid)