Source code

Revision control

Copy as Markdown

Other Tools

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import base64
import datetime
import json
import os
import socket
import sys
import time
import traceback
from contextlib import contextmanager
import six
from six import reraise
from . import errors, transport
from .decorators import do_process_check
from .geckoinstance import GeckoInstance
from .keys import Keys
from .timeout import Timeouts
WEB_ELEMENT_KEY = "element-6066-11e4-a52e-4f735466cecf"
WEB_FRAME_KEY = "frame-075b-4da1-b6ba-e579c2d3230a"
WEB_SHADOW_ROOT_KEY = "shadow-6066-11e4-a52e-4f735466cecf"
WEB_WINDOW_KEY = "window-fcc6-11e5-b4f8-330a88ab9d7f"
class MouseButton(object):
"""Enum-like class for mouse button constants."""
LEFT = 0
MIDDLE = 1
RIGHT = 2
class ActionSequence(object):
r"""API for creating and performing action sequences.
Each action method adds one or more actions to a queue. When perform()
is called, the queued actions fire in order.
May be chained together as in::
ActionSequence(self.marionette, "key", id) \
.key_down("a") \
.key_up("a") \
.perform()
"""
def __init__(self, marionette, action_type, input_id, pointer_params=None):
self.marionette = marionette
self._actions = []
self._id = input_id
self._pointer_params = pointer_params
self._type = action_type
@property
def dict(self):
d = {
"type": self._type,
"id": self._id,
"actions": self._actions,
}
if self._pointer_params is not None:
d["parameters"] = self._pointer_params
return d
def perform(self):
"""Perform all queued actions."""
self.marionette.actions.perform([self.dict])
def _key_action(self, subtype, value):
self._actions.append({"type": subtype, "value": value})
def _pointer_action(self, subtype, button):
self._actions.append({"type": subtype, "button": button})
def pause(self, duration):
self._actions.append({"type": "pause", "duration": duration})
return self
def pointer_move(self, x, y, duration=None, origin=None):
"""Queue a pointerMove action.
:param x: Destination x-axis coordinate of pointer in CSS pixels.
:param y: Destination y-axis coordinate of pointer in CSS pixels.
:param duration: Number of milliseconds over which to distribute the
move. If None, remote end defaults to 0.
:param origin: Origin of coordinates, either "viewport", "pointer" or
an Element. If None, remote end defaults to "viewport".
"""
action = {"type": "pointerMove", "x": x, "y": y}
if duration is not None:
action["duration"] = duration
if origin is not None:
if isinstance(origin, WebElement):
action["origin"] = {origin.kind: origin.id}
else:
action["origin"] = origin
self._actions.append(action)
return self
def pointer_up(self, button=MouseButton.LEFT):
"""Queue a pointerUp action for `button`.
:param button: Pointer button to perform action with.
Default: 0, which represents main device button.
"""
self._pointer_action("pointerUp", button)
return self
def pointer_down(self, button=MouseButton.LEFT):
"""Queue a pointerDown action for `button`.
:param button: Pointer button to perform action with.
Default: 0, which represents main device button.
"""
self._pointer_action("pointerDown", button)
return self
def click(self, element=None, button=MouseButton.LEFT):
"""Queue a click with the specified button.
If an element is given, move the pointer to that element first,
otherwise click current pointer coordinates.
:param element: Optional element to click.
:param button: Integer representing pointer button to perform action
with. Default: 0, which represents main device button.
"""
if element:
self.pointer_move(0, 0, origin=element)
return self.pointer_down(button).pointer_up(button)
def key_down(self, value):
"""Queue a keyDown action for `value`.
:param value: Single character to perform key action with.
"""
self._key_action("keyDown", value)
return self
def key_up(self, value):
"""Queue a keyUp action for `value`.
:param value: Single character to perform key action with.
"""
self._key_action("keyUp", value)
return self
def scroll(self, x, y, delta_x, delta_y, duration=None, origin=None):
"""Queue a scroll action.
:param x: Destination x-axis coordinate of pointer in CSS pixels.
:param y: Destination y-axis coordinate of pointer in CSS pixels.
:param delta_x: Scroll delta for x-axis in CSS pixels.
:param delta_y: Scroll delta for y-axis in CSS pixels.
:param duration: Number of milliseconds over which to distribute the
scroll. If None, remote end defaults to 0.
:param origin: Origin of coordinates, either "viewport", "pointer" or
an Element. If None, remote end defaults to "viewport".
"""
action = {
"type": "scroll",
"x": x,
"y": y,
"deltaX": delta_x,
"deltaY": delta_y,
}
if duration is not None:
action["duration"] = duration
if origin is not None:
if isinstance(origin, WebElement):
action["origin"] = {origin.kind: origin.id}
else:
action["origin"] = origin
self._actions.append(action)
return self
def send_keys(self, keys):
"""Queue a keyDown and keyUp action for each character in `keys`.
:param keys: String of keys to perform key actions with.
"""
for c in keys:
self.key_down(c)
self.key_up(c)
return self
class Actions(object):
def __init__(self, marionette):
self.marionette = marionette
def perform(self, actions=None):
"""Perform actions by tick from each action sequence in `actions`.
:param actions: List of input source action sequences. A single action
sequence may be created with the help of
``ActionSequence.dict``.
"""
body = {"actions": [] if actions is None else actions}
return self.marionette._send_message("WebDriver:PerformActions", body)
def release(self):
return self.marionette._send_message("WebDriver:ReleaseActions")
def sequence(self, *args, **kwargs):
"""Return an empty ActionSequence of the designated type.
See ActionSequence for parameter list.
"""
return ActionSequence(self.marionette, *args, **kwargs)
class WebElement(object):
"""Represents a DOM Element."""
identifiers = (WEB_ELEMENT_KEY,)
def __init__(self, marionette, id, kind=WEB_ELEMENT_KEY):
self.marionette = marionette
assert id is not None
self.id = id
self.kind = kind
def __str__(self):
return self.id
def __eq__(self, other_element):
return self.id == other_element.id
def __hash__(self):
# pylint --py3k: W1641
return hash(self.id)
def find_element(self, method, target):
"""Returns an ``WebElement`` instance that matches the specified
method and target, relative to the current element.
For more details on this function, see the
:func:`~marionette_driver.marionette.Marionette.find_element` method
in the Marionette class.
"""
return self.marionette.find_element(method, target, self.id)
def find_elements(self, method, target):
"""Returns a list of all ``WebElement`` instances that match the
specified method and target in the current context.
For more details on this function, see the
:func:`~marionette_driver.marionette.Marionette.find_elements` method
in the Marionette class.
"""
return self.marionette.find_elements(method, target, self.id)
def get_attribute(self, name):
"""Returns the requested attribute, or None if no attribute
is set.
"""
body = {"id": self.id, "name": name}
return self.marionette._send_message(
"WebDriver:GetElementAttribute", body, key="value"
)
def get_property(self, name):
"""Returns the requested property, or None if the property is
not set.
"""
try:
body = {"id": self.id, "name": name}
return self.marionette._send_message(
"WebDriver:GetElementProperty", body, key="value"
)
except errors.UnknownCommandException:
# Keep backward compatibility for code which uses get_attribute() to
# also retrieve element properties.
# Remove when Firefox 55 is stable.
return self.get_attribute(name)
def click(self):
"""Simulates a click on the element."""
self.marionette._send_message("WebDriver:ElementClick", {"id": self.id})
@property
def text(self):
"""Returns the visible text of the element, and its child elements."""
body = {"id": self.id}
return self.marionette._send_message(
"WebDriver:GetElementText", body, key="value"
)
def send_keys(self, *strings):
"""Sends the string via synthesized keypresses to the element.
If an array is passed in like `marionette.send_keys(Keys.SHIFT, "a")` it
will be joined into a string.
If an integer is passed in like `marionette.send_keys(1234)` it will be
coerced into a string.
"""
keys = Marionette.convert_keys(*strings)
self.marionette._send_message(
"WebDriver:ElementSendKeys", {"id": self.id, "text": keys}
)
def clear(self):
"""Clears the input of the element."""
self.marionette._send_message("WebDriver:ElementClear", {"id": self.id})
def is_selected(self):
"""Returns True if the element is selected."""
body = {"id": self.id}
return self.marionette._send_message(
"WebDriver:IsElementSelected", body, key="value"
)
def is_enabled(self):
"""This command will return False if all the following criteria
are met otherwise return True:
* A form control is disabled.
* A ``WebElement`` has a disabled boolean attribute.
"""
body = {"id": self.id}
return self.marionette._send_message(
"WebDriver:IsElementEnabled", body, key="value"
)
def is_displayed(self):
"""Returns True if the element is displayed, False otherwise."""
body = {"id": self.id}
return self.marionette._send_message(
"WebDriver:IsElementDisplayed", body, key="value"
)
@property
def tag_name(self):
"""The tag name of the element."""
body = {"id": self.id}
return self.marionette._send_message(
"WebDriver:GetElementTagName", body, key="value"
)
@property
def rect(self):
"""Gets the element's bounding rectangle.
This will return a dictionary with the following:
* x and y represent the top left coordinates of the ``WebElement``
relative to top left corner of the document.
* height and the width will contain the height and the width
of the DOMRect of the ``WebElement``.
"""
return self.marionette._send_message(
"WebDriver:GetElementRect", {"id": self.id}
)
def value_of_css_property(self, property_name):
"""Gets the value of the specified CSS property name.
:param property_name: Property name to get the value of.
"""
body = {"id": self.id, "propertyName": property_name}
return self.marionette._send_message(
"WebDriver:GetElementCSSValue", body, key="value"
)
@property
def shadow_root(self):
"""Gets the shadow root of the current element"""
return self.marionette._send_message(
"WebDriver:GetShadowRoot", {"id": self.id}, key="value"
)
@property
def computed_label(self):
"""Gets the computed accessibility label of the current element"""
return self.marionette._send_message(
"WebDriver:GetComputedLabel", {"id": self.id}, key="value"
)
@property
def computed_role(self):
"""Gets the computed accessibility role of the current element"""
return self.marionette._send_message(
"WebDriver:GetComputedRole", {"id": self.id}, key="value"
)
@classmethod
def _from_json(cls, json, marionette):
if isinstance(json, dict):
if WEB_ELEMENT_KEY in json:
return cls(marionette, json[WEB_ELEMENT_KEY], WEB_ELEMENT_KEY)
raise ValueError("Unrecognised web element")
class ShadowRoot(object):
"""A Class to handling Shadow Roots"""
identifiers = (WEB_SHADOW_ROOT_KEY,)
def __init__(self, marionette, id, kind=WEB_SHADOW_ROOT_KEY):
self.marionette = marionette
assert id is not None
self.id = id
self.kind = kind
def __str__(self):
return self.id
def __eq__(self, other_element):
return self.id == other_element.id
def __hash__(self):
# pylint --py3k: W1641
return hash(self.id)
def find_element(self, method, target):
"""Returns a ``WebElement`` instance that matches the specified
method and target, relative to the current shadow root.
For more details on this function, see the
:func:`~marionette_driver.marionette.Marionette.find_element` method
in the Marionette class.
"""
body = {"shadowRoot": self.id, "value": target, "using": method}
return self.marionette._send_message(
"WebDriver:FindElementFromShadowRoot", body, key="value"
)
def find_elements(self, method, target):
"""Returns a list of all ``WebElement`` instances that match the
specified method and target in the current shadow root.
For more details on this function, see the
:func:`~marionette_driver.marionette.Marionette.find_elements` method
in the Marionette class.
"""
body = {"shadowRoot": self.id, "value": target, "using": method}
return self.marionette._send_message(
"WebDriver:FindElementsFromShadowRoot", body
)
@classmethod
def _from_json(cls, json, marionette):
if isinstance(json, dict):
if WEB_SHADOW_ROOT_KEY in json:
return cls(marionette, json[WEB_SHADOW_ROOT_KEY])
raise ValueError("Unrecognised shadow root")
class WebFrame(object):
"""A Class to handle frame windows"""
identifiers = (WEB_FRAME_KEY,)
def __init__(self, marionette, id, kind=WEB_FRAME_KEY):
self.marionette = marionette
assert id is not None
self.id = id
self.kind = kind
def __str__(self):
return self.id
def __eq__(self, other_element):
return self.id == other_element.id
def __hash__(self):
# pylint --py3k: W1641
return hash(self.id)
@classmethod
def _from_json(cls, json, marionette):
if isinstance(json, dict):
if WEB_FRAME_KEY in json:
return cls(marionette, json[WEB_FRAME_KEY])
raise ValueError("Unrecognised web frame")
class WebWindow(object):
"""A Class to handle top-level windows"""
identifiers = (WEB_WINDOW_KEY,)
def __init__(self, marionette, id, kind=WEB_WINDOW_KEY):
self.marionette = marionette
assert id is not None
self.id = id
self.kind = kind
def __str__(self):
return self.id
def __eq__(self, other_element):
return self.id == other_element.id
def __hash__(self):
# pylint --py3k: W1641
return hash(self.id)
@classmethod
def _from_json(cls, json, marionette):
if isinstance(json, dict):
if WEB_WINDOW_KEY in json:
return cls(marionette, json[WEB_WINDOW_KEY])
raise ValueError("Unrecognised web window")
class Alert(object):
"""A class for interacting with alerts.
::
Alert(marionette).accept()
Alert(marionette).dismiss()
"""
def __init__(self, marionette):
self.marionette = marionette
def accept(self):
"""Accept a currently displayed modal dialog."""
self.marionette._send_message("WebDriver:AcceptAlert")
def dismiss(self):
"""Dismiss a currently displayed modal dialog."""
self.marionette._send_message("WebDriver:DismissAlert")
@property
def text(self):
"""Return the currently displayed text in a tab modal."""
return self.marionette._send_message("WebDriver:GetAlertText", key="value")
def send_keys(self, *string):
"""Send keys to the currently displayed text input area in an open
tab modal dialog."""
self.marionette._send_message(
"WebDriver:SendAlertText", {"text": Marionette.convert_keys(*string)}
)
class Marionette(object):
"""Represents a Marionette connection to a browser or device."""
CONTEXT_CHROME = "chrome" # non-browser content: windows, dialogs, etc.
CONTEXT_CONTENT = "content" # browser content: iframes, divs, etc.
DEFAULT_STARTUP_TIMEOUT = 120
DEFAULT_SHUTDOWN_TIMEOUT = (
70 # By default Firefox will kill hanging threads after 60s
)
# Bug 1336953 - Until we can remove the socket timeout parameter it has to be
# set a default value which is larger than the longest timeout as defined by the
# WebDriver spec. In that case its 300s for page load. Also add another minute
# so that slow builds have enough time to send the timeout error to the client.
DEFAULT_SOCKET_TIMEOUT = 360
def __init__(
self,
host="127.0.0.1",
port=2828,
app=None,
bin=None,
baseurl=None,
socket_timeout=None,
startup_timeout=None,
**instance_args
):
"""Construct a holder for the Marionette connection.
Remember to call ``start_session`` in order to initiate the
connection and start a Marionette session.
:param host: Host where the Marionette server listens.
Defaults to 127.0.0.1.
:param port: Port where the Marionette server listens.
Defaults to port 2828.
:param baseurl: Where to look for files served from Marionette's
www directory.
:param socket_timeout: Timeout for Marionette socket operations.
:param startup_timeout: Seconds to wait for a connection with
binary.
:param bin: Path to browser binary. If any truthy value is given
this will attempt to start a Gecko instance with the specified
`app`.
:param app: Type of ``instance_class`` to use for managing app
instance. See ``marionette_driver.geckoinstance``.
:param instance_args: Arguments to pass to ``instance_class``.
"""
self.host = "127.0.0.1" # host
if int(port) == 0:
port = Marionette.check_port_available(port)
self.port = self.local_port = int(port)
self.bin = bin
self.client = None
self.instance = None
self.requested_capabilities = None
self.session = None
self.session_id = None
self.process_id = None
self.profile = None
self.window = None
self.chrome_window = None
self.baseurl = baseurl
self._test_name = None
self.crashed = 0
self.is_shutting_down = False
self.cleanup_ran = False
if socket_timeout is None:
self.socket_timeout = self.DEFAULT_SOCKET_TIMEOUT
else:
self.socket_timeout = float(socket_timeout)
if startup_timeout is None:
self.startup_timeout = self.DEFAULT_STARTUP_TIMEOUT
else:
self.startup_timeout = int(startup_timeout)
self.shutdown_timeout = self.DEFAULT_SHUTDOWN_TIMEOUT
if self.bin:
self.instance = GeckoInstance.create(
app, host=self.host, port=self.port, bin=self.bin, **instance_args
)
self.start_binary(self.startup_timeout)
self.actions = Actions(self)
self.timeout = Timeouts(self)
@property
def profile_path(self):
if self.instance and self.instance.profile:
return self.instance.profile.profile
def start_binary(self, timeout):
try:
self.check_port_available(self.port, host=self.host)
except socket.error:
_, value, tb = sys.exc_info()
msg = "Port {}:{} is unavailable ({})".format(self.host, self.port, value)
reraise(IOError, IOError(msg), tb)
try:
self.instance.start()
self.raise_for_port(timeout=timeout)
except socket.timeout:
# Something went wrong with starting up Marionette server. Given
# that the process will not quit itself, force a shutdown immediately.
self.cleanup()
msg = (
"Process killed after {}s because no connection to Marionette "
"server could be established. Check gecko.log for errors"
)
reraise(IOError, IOError(msg.format(timeout)), sys.exc_info()[2])
def cleanup(self):
if self.session is not None:
try:
self.delete_session()
except (errors.MarionetteException, IOError):
# These exceptions get thrown if the Marionette server
# hit an exception/died or the connection died. We can
# do no further server-side cleanup in this case.
pass
if self.instance:
# stop application and, if applicable, stop emulator
self.instance.close(clean=True)
if self.instance.unresponsive_count >= 3:
raise errors.UnresponsiveInstanceException(
"Application clean-up has failed >2 consecutive times."
)
self.cleanup_ran = True
def __del__(self):
if not self.cleanup_ran:
self.cleanup()
@staticmethod
def check_port_available(port, host=""):
"""Check if "host:port" is available.
Raise socket.error if port is not available.
"""
port = int(port)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
s.bind((host, port))
port = s.getsockname()[1]
finally:
s.close()
return port
def raise_for_port(self, timeout=None, check_process_status=True):
"""Raise socket.timeout if no connection can be established.
:param timeout: Optional timeout in seconds for the server to be ready.
:param check_process_status: Optional, if `True` the process will be
continuously checked if it has exited, and the connection
attempt will be aborted.
"""
if timeout is None:
timeout = self.startup_timeout
runner = None
if self.instance is not None:
runner = self.instance.runner
poll_interval = 0.1
starttime = datetime.datetime.now()
timeout_time = starttime + datetime.timedelta(seconds=timeout)
client = transport.TcpTransport(self.host, self.port, 0.5)
connected = False
while datetime.datetime.now() < timeout_time:
# If the instance we want to connect to is not running return immediately
if check_process_status and runner is not None and not runner.is_running():
break
try:
client.connect()
return True
except socket.error:
pass
finally:
client.close()
time.sleep(poll_interval)
if not connected:
# There might have been a startup crash of the application
if runner is not None and self.check_for_crash() > 0:
raise IOError("Process crashed (Exit code: {})".format(runner.wait(0)))
raise socket.timeout(
"Timed out waiting for connection on {0}:{1}!".format(
self.host, self.port
)
)
@do_process_check
def _send_message(self, name, params=None, key=None):
"""Send a blocking message to the server.
Marionette provides an asynchronous, non-blocking interface and
this attempts to paper over this by providing a synchronous API
to the user.
:param name: Requested command key.
:param params: Optional dictionary of key/value arguments.
:param key: Optional key to extract from response.
:returns: Full response from the server, or if `key` is given,
the value of said key in the response.
"""
if not self.session_id and name != "WebDriver:NewSession":
raise errors.InvalidSessionIdException("Please start a session")
try:
msg = self.client.request(name, params)
except IOError:
self.delete_session(send_request=False)
raise
res, err = msg.result, msg.error
if err:
self._handle_error(err)
if key is not None:
return self._from_json(res.get(key))
else:
return self._from_json(res)
def _handle_error(self, obj):
error = obj["error"]
message = obj["message"]
stacktrace = obj["stacktrace"]
raise errors.lookup(error)(message, stacktrace=stacktrace)
def check_for_crash(self):
"""Check if the process crashed.
:returns: True, if a crash happened since the method has been called the last time.
"""
crash_count = 0
if self.instance:
name = self.test_name or "marionette.py"
crash_count = self.instance.runner.check_for_crashes(test_name=name)
self.crashed = self.crashed + crash_count
return crash_count > 0
def _handle_socket_failure(self):
"""Handle socket failures for the currently connected application.
If the application crashed then clean-up internal states, or in case of a content
crash also kill the process. If there are other reasons for a socket failure,
wait for the process to shutdown itself, or force kill it.
Please note that the method expects an exception to be handled on the current stack
frame, and is only called via the `@do_process_check` decorator.
"""
exc_cls, exc, tb = sys.exc_info()
# If the application hasn't been launched by Marionette no further action can be done.
# In such cases we simply re-throw the exception.
if not self.instance:
reraise(exc_cls, exc, tb)
else:
# Somehow the socket disconnected. Give the application some time to shutdown
# itself before killing the process.
returncode = self.instance.runner.wait(timeout=self.shutdown_timeout)
if returncode is None:
message = (
"Process killed because the connection to Marionette server is "
"lost. Check gecko.log for errors"
)
# This will force-close the application without sending any other message.
self.cleanup()
else:
# If Firefox quit itself check if there was a crash
crash_count = self.check_for_crash()
if crash_count > 0:
# SIGUSR1 indicates a forced shutdown due to a content process crash
if returncode == 245:
message = "Content process crashed"
else:
message = "Process crashed (Exit code: {returncode})"
else:
message = (
"Process has been unexpectedly closed (Exit code: {returncode})"
)
self.delete_session(send_request=False)
message += " (Reason: {reason})"
reraise(
IOError, IOError(message.format(returncode=returncode, reason=exc)), tb
)
@staticmethod
def convert_keys(*string):
typing = []
for val in string:
if isinstance(val, Keys):
typing.append(val)
elif isinstance(val, int):
val = str(val)
for i in range(len(val)):
typing.append(val[i])
else:
for i in range(len(val)):
typing.append(val[i])
return "".join(typing)
def clear_pref(self, pref):
"""Clear the user-defined value from the specified preference.
:param pref: Name of the preference.
"""
with self.using_context(self.CONTEXT_CHROME):
self.execute_script(
"""
const { Preferences } = ChromeUtils.importESModule(
);
Preferences.reset(arguments[0]);
""",
script_args=(pref,),
)
def get_pref(self, pref, default_branch=False, value_type="unspecified"):
"""Get the value of the specified preference.
:param pref: Name of the preference.
:param default_branch: Optional, if `True` the preference value will be read
from the default branch. Otherwise the user-defined
value if set is returned. Defaults to `False`.
:param value_type: Optional, XPCOM interface of the pref's complex value.
Possible values are: `nsIFile` and
`nsIPrefLocalizedString`.
Usage example::
marionette.get_pref("browser.tabs.warnOnClose")
"""
with self.using_context(self.CONTEXT_CHROME):
pref_value = self.execute_script(
"""
const { Preferences } = ChromeUtils.importESModule(
);
let pref = arguments[0];
let defaultBranch = arguments[1];
let valueType = arguments[2];
prefs = new Preferences({defaultBranch: defaultBranch});
return prefs.get(pref, null, Components.interfaces[valueType]);
""",
script_args=(pref, default_branch, value_type),
)
return pref_value
def set_pref(self, pref, value, default_branch=False):
"""Set the value of the specified preference.
:param pref: Name of the preference.
:param value: The value to set the preference to. If the value is None,
reset the preference to its default value. If no default
value exists, the preference will cease to exist.
:param default_branch: Optional, if `True` the preference value will
be written to the default branch, and will remain until
the application gets restarted. Otherwise a user-defined
value is set. Defaults to `False`.
Usage example::
marionette.set_pref("browser.tabs.warnOnClose", True)
"""
with self.using_context(self.CONTEXT_CHROME):
if value is None:
self.clear_pref(pref)
return
self.execute_script(
"""
const { Preferences } = ChromeUtils.importESModule(
);
let pref = arguments[0];
let value = arguments[1];
let defaultBranch = arguments[2];
prefs = new Preferences({defaultBranch: defaultBranch});
prefs.set(pref, value);
""",
script_args=(pref, value, default_branch),
)
def set_prefs(self, prefs, default_branch=False):
"""Set the value of a list of preferences.
:param prefs: A dict containing one or more preferences and their values
to be set. See :func:`set_pref` for further details.
:param default_branch: Optional, if `True` the preference value will
be written to the default branch, and will remain until
the application gets restarted. Otherwise a user-defined
value is set. Defaults to `False`.
Usage example::
marionette.set_prefs({"browser.tabs.warnOnClose": True})
"""
for pref, value in prefs.items():
self.set_pref(pref, value, default_branch=default_branch)
@contextmanager
def using_prefs(self, prefs, default_branch=False):
"""Set preferences for code executed in a `with` block, and restores them on exit.
:param prefs: A dict containing one or more preferences and their values
to be set. See :func:`set_prefs` for further details.
:param default_branch: Optional, if `True` the preference value will
be written to the default branch, and will remain until
the application gets restarted. Otherwise a user-defined
value is set. Defaults to `False`.
Usage example::
with marionette.using_prefs({"browser.tabs.warnOnClose": True}):
# ... do stuff ...
"""
original_prefs = {p: self.get_pref(p) for p in prefs}
self.set_prefs(prefs, default_branch=default_branch)
try:
yield
finally:
self.set_prefs(original_prefs, default_branch=default_branch)
@do_process_check
def enforce_gecko_prefs(self, prefs):
"""Checks if the running instance has the given prefs. If not,
it will kill the currently running instance, and spawn a new
instance with the requested preferences.
:param prefs: A dictionary whose keys are preference names.
"""
if not self.instance:
raise errors.MarionetteException(
"enforce_gecko_prefs() can only be called "
"on Gecko instances launched by Marionette"
)
pref_exists = True
with self.using_context(self.CONTEXT_CHROME):
for pref, value in six.iteritems(prefs):
if type(value) is not str:
value = json.dumps(value)
pref_exists = self.execute_script(
"""
let prefInterface = Components.classes["@mozilla.org/preferences-service;1"]
.getService(Components.interfaces.nsIPrefBranch);
let pref = '{0}';
let value = '{1}';
let type = prefInterface.getPrefType(pref);
switch(type) {{
case prefInterface.PREF_STRING:
return value == prefInterface.getCharPref(pref).toString();
case prefInterface.PREF_BOOL:
return value == prefInterface.getBoolPref(pref).toString();
case prefInterface.PREF_INT:
return value == prefInterface.getIntPref(pref).toString();
case prefInterface.PREF_INVALID:
return false;
}}
""".format(
pref, value
)
)
if not pref_exists:
break
if not pref_exists:
context = self._send_message("Marionette:GetContext", key="value")
self.delete_session()
self.instance.restart(prefs)
self.raise_for_port()
self.start_session(self.requested_capabilities)
# Restore the context as used before the restart
self.set_context(context)
def _request_in_app_shutdown(self, flags=None, safe_mode=False):
"""Attempt to quit the currently running instance from inside the
application. If shutdown is prevented by some component the quit
will be forced.
This method effectively calls `Services.startup.quit` in Gecko.
Possible flag values are listed at https://bit.ly/3IYcjYi.
:param flags: Optional additional quit masks to include.
:param safe_mode: Optional flag to indicate that the application has to
be restarted in safe mode.
:returns: A dictionary containing details of the application shutdown.
The `cause` property reflects the reason, and `forced` indicates
that something prevented the shutdown and the application had
to be forced to shutdown.
:throws InvalidArgumentException: If there are multiple
`shutdown_flags` ending with `"Quit"`.
"""
body = {}
if flags is not None:
body["flags"] = list(
flags,
)
if safe_mode:
body["safeMode"] = safe_mode
return self._send_message("Marionette:Quit", body)
@do_process_check
def quit(self, clean=False, in_app=True, callback=None):
"""
By default this method will trigger a normal shutdown of the currently running instance.
But it can also be used to force terminate the process.
This command will delete the active marionette session. It also allows
manipulation of eg. the profile data while the application is not running.
To start the application again, :func:`start_session` has to be called.
:param clean: If True a new profile will be used after the next start of
the application. Note that the in_app initiated quit always
maintains the same profile.
:param in_app: If True, marionette will cause a quit from within the
application. Otherwise the application will be restarted
immediately by killing the process.
:param callback: If provided and `in_app` is True, the callback will
be used to trigger the shutdown.
:returns: A dictionary containing details of the application shutdown.
The `cause` property reflects the reason, and `forced` indicates
that something prevented the shutdown and the application had
to be forced to shutdown.
"""
if not self.instance:
raise errors.MarionetteException(
"quit() can only be called " "on Gecko instances launched by Marionette"
)
quit_details = {"cause": "shutdown", "forced": False}
if in_app:
if clean:
raise ValueError(
"An in_app restart cannot be triggered with the clean flag set"
)
if callback is not None and not callable(callback):
raise ValueError(
"Specified callback '{}' is not callable".format(callback)
)
# Block Marionette from accepting new connections
self._send_message("Marionette:AcceptConnections", {"value": False})
try:
self.is_shutting_down = True
if callback is not None:
callback()
quit_details["in_app"] = True
else:
quit_details = self._request_in_app_shutdown()
except IOError:
# A possible IOError should be ignored at this point, given that
# quit() could have been called inside of `using_context`,
# which wants to reset the context but fails sending the message.
pass
except Exception:
# For any other error assume the application is not going to shutdown.
# As such allow Marionette to accept new connections again.
self.is_shutting_down = False
self._send_message("Marionette:AcceptConnections", {"value": True})
raise
try:
self.delete_session(send_request=False)
# Try to wait for the process to end itself before force-closing it.
returncode = self.instance.runner.wait(timeout=self.shutdown_timeout)
if returncode is None:
self.cleanup()
message = "Process still running {}s after quit request"
raise IOError(message.format(self.shutdown_timeout))
finally:
self.is_shutting_down = False
else:
self.delete_session(send_request=False)
self.instance.close(clean=clean)
quit_details.update({"in_app": False, "forced": True})
if quit_details.get("cause") not in (None, "shutdown"):
raise errors.MarionetteException(
"Unexpected shutdown reason '{}' for "
"quitting the process.".format(quit_details["cause"])
)
return quit_details
@do_process_check
def restart(
self, callback=None, clean=False, in_app=True, safe_mode=False, silent=False
):
"""
By default this method will restart the currently running instance by using the same
profile. But it can also be forced to terminate the currently running instance, and
to spawn a new instance with the same or different profile.
:param callback: If provided and `in_app` is True, the callback will be
used to trigger the restart.
:param clean: If True a new profile will be used after the restart. Note
that the in_app initiated restart always maintains the same
profile.
:param in_app: If True, marionette will cause a restart from within the
application. Otherwise the application will be restarted
immediately by killing the process.
:param safe_mode: Optional flag to indicate that the application has to
be restarted in safe mode.
:param silent: Optional flag to indicate that the application should
not open any window after a restart. Note that this flag is only
supported on MacOS and requires "in_app" to be True.
:returns: A dictionary containing details of the application restart.
The `cause` property reflects the reason, and `forced` indicates
that something prevented the shutdown and the application had
to be forced to shutdown.
"""
if not self.instance:
raise errors.MarionetteException(
"restart() can only be called "
"on Gecko instances launched by Marionette"
)
context = self._send_message("Marionette:GetContext", key="value")
restart_details = {"cause": "restart", "forced": False}
# Safe mode and the silent flag require an in_app restart.
if (safe_mode or silent) and not in_app:
raise ValueError("An in_app restart is required for safe or silent mode")
if in_app:
if clean:
raise ValueError(
"An in_app restart cannot be triggered with the clean flag set"
)
if callback is not None and not callable(callback):
raise ValueError(
"Specified callback '{}' is not callable".format(callback)
)
# Block Marionette from accepting new connections
self._send_message("Marionette:AcceptConnections", {"value": False})
try:
self.is_shutting_down = True
if callback is not None:
callback()
restart_details["in_app"] = True
else:
flags = ["eRestart"]
if silent:
flags.append("eSilently")
try:
restart_details = self._request_in_app_shutdown(
flags=flags, safe_mode=safe_mode
)
except Exception as e:
self._send_message(
"Marionette:AcceptConnections", {"value": True}
)
raise e
except IOError:
# A possible IOError should be ignored at this point, given that
# restart() could have been called inside of `using_context`,
# which wants to reset the context but fails sending the message.
pass
timeout_restart = self.shutdown_timeout + self.startup_timeout
try:
# Wait for a new Marionette connection to appear while the
# process restarts itself.
self.raise_for_port(timeout=timeout_restart, check_process_status=False)
except socket.timeout:
exc_cls, _, tb = sys.exc_info()
if self.instance.runner.returncode is None:
# The process is still running, which means the shutdown
# request was not correct or the application ignored it.
# Allow Marionette to accept connections again.
self._send_message("Marionette:AcceptConnections", {"value": True})
message = "Process still running {}s after restart request"
reraise(exc_cls, exc_cls(message.format(timeout_restart)), tb)
else:
# The process shutdown but didn't start again.
self.cleanup()
msg = "Process unexpectedly quit without restarting (exit code: {})"
reraise(
exc_cls,
exc_cls(msg.format(self.instance.runner.returncode)),
tb,
)
finally:
self.is_shutting_down = False
self.delete_session(send_request=False)
else:
self.delete_session()
self.instance.restart(clean=clean)
self.raise_for_port(timeout=self.DEFAULT_STARTUP_TIMEOUT)
restart_details.update({"in_app": False, "forced": True})
if restart_details.get("cause") not in (None, "restart"):
raise errors.MarionetteException(
"Unexpected shutdown reason '{}' for "
"restarting the process".format(restart_details["cause"])
)
self.start_session(self.requested_capabilities)
# Restore the context as used before the restart
self.set_context(context)
if in_app and self.process_id:
# In some cases Firefox restarts itself by spawning into a new process group.
# As long as mozprocess cannot track that behavior (bug 1284864) we assist by
# informing about the new process id.
self.instance.runner.process_handler.check_for_detached(self.process_id)
return restart_details
def absolute_url(self, relative_url):
"""
Returns an absolute url for files served from Marionette's www directory.
:param relative_url: The url of a static file, relative to Marionette's www directory.
"""
return "{0}{1}".format(self.baseurl, relative_url)
@do_process_check
def start_session(self, capabilities=None, timeout=None):
"""Create a new WebDriver session.
This method must be called before performing any other action.
:param capabilities: An optional dictionary of
Marionette-recognised capabilities. It does not
accept a WebDriver conforming capabilities dictionary
(including alwaysMatch, firstMatch, desiredCapabilities,
or requriedCapabilities), and only recognises extension
capabilities that are specific to Marionette.
:param timeout: Optional timeout in seconds for the server to be ready.
:returns: A dictionary of the capabilities offered.
"""
if capabilities is None:
capabilities = {"strictFileInteractability": True}
self.requested_capabilities = capabilities
if timeout is None:
timeout = self.startup_timeout
self.crashed = 0
if self.instance:
returncode = self.instance.runner.returncode
# We're managing a binary which has terminated. Start it again
# and implicitely wait for the Marionette server to be ready.
if returncode is not None:
self.start_binary(timeout)
else:
# In the case when Marionette doesn't manage the binary wait until
# its server component has been started.
self.raise_for_port(timeout=timeout)
self.client = transport.TcpTransport(self.host, self.port, self.socket_timeout)
self.protocol, _ = self.client.connect()
try:
resp = self._send_message("WebDriver:NewSession", capabilities)
except errors.UnknownException:
# Force closing the managed process when the session cannot be
# created due to global JavaScript errors.
exc_type, value, tb = sys.exc_info()
if self.instance and self.instance.runner.is_running():
self.instance.close()
reraise(exc_type, exc_type(value.message), tb)
self.session_id = resp["sessionId"]
self.session = resp["capabilities"]
self.cleanup_ran = False
# fallback to processId can be removed in Firefox 55
self.process_id = self.session.get(
"moz:processID", self.session.get("processId")
)
self.profile = self.session.get("moz:profile")
timeout = self.session.get("moz:shutdownTimeout")
if timeout is not None:
# pylint --py3k W1619
self.shutdown_timeout = timeout / 1000 + 10
return self.session
@property
def test_name(self):
return self._test_name
@test_name.setter
def test_name(self, test_name):
self._test_name = test_name
def delete_session(self, send_request=True):
"""Close the current session and disconnect from the server.
:param send_request: Optional, if `True` a request to close the session on
the server side will be sent. Use `False` in case of eg. in_app restart()
or quit(), which trigger a deletion themselves. Defaults to `True`.
"""
try:
if send_request:
try:
self._send_message("WebDriver:DeleteSession")
except errors.InvalidSessionIdException:
pass
finally:
self.process_id = None
self.profile = None
self.session = None
self.session_id = None
self.window = None
if self.client is not None:
self.client.close()
@property
def session_capabilities(self):
"""A JSON dictionary representing the capabilities of the
current session.
"""
return self.session
@property
def current_window_handle(self):
"""Get the current window's handle.
Returns an opaque server-assigned identifier to this window
that uniquely identifies it within this Marionette instance.
This can be used to switch to this window at a later point.
:returns: unique window handle
:rtype: string
"""
with self.using_context("content"):
self.window = self._send_message("WebDriver:GetWindowHandle", key="value")
return self.window
@property
def current_chrome_window_handle(self):
"""Get the current chrome window's handle. Corresponds to
a chrome window that may itself contain tabs identified by
window_handles.
Returns an opaque server-assigned identifier to this window
that uniquely identifies it within this Marionette instance.
This can be used to switch to this window at a later point.
:returns: unique window handle
:rtype: string
"""
with self.using_context("chrome"):
self.chrome_window = self._send_message(
"WebDriver:GetWindowHandle", key="value"
)
return self.chrome_window
def set_window_rect(self, x=None, y=None, height=None, width=None):
"""Set the position and size of the current window.
The supplied width and height values refer to the window outerWidth
and outerHeight values, which include scroll bars, title bars, etc.
An error will be returned if the requested window size would result
in the window being in the maximised state.
:param x: x coordinate for the top left of the window
:param y: y coordinate for the top left of the window
:param width: The width to resize the window to.
:param height: The height to resize the window to.
"""
if (x is None and y is None) and (height is None and width is None):
raise errors.InvalidArgumentException(
"x and y or height and width need values"
)
body = {"x": x, "y": y, "height": height, "width": width}
return self._send_message("WebDriver:SetWindowRect", body)
@property
def window_rect(self):
return self._send_message("WebDriver:GetWindowRect")
@property
def title(self):
"""Current title of the active window."""
return self._send_message("WebDriver:GetTitle", key="value")
@property
def window_handles(self):
"""Get list of windows in the current context.
If called in the content context it will return a list of
references to all available browser windows.
Each window handle is assigned by the server, and the list of
strings returned does not have a guaranteed ordering.
:returns: Unordered list of unique window handles as strings
"""
with self.using_context("content"):
return self._send_message("WebDriver:GetWindowHandles")
@property
def chrome_window_handles(self):
"""Get a list of currently open chrome windows.
Each window handle is assigned by the server, and the list of
strings returned does not have a guaranteed ordering.
:returns: Unordered list of unique chrome window handles as strings
"""
with self.using_context("chrome"):
return self._send_message("WebDriver:GetWindowHandles")
@property
def page_source(self):
"""A string representation of the DOM."""
return self._send_message("WebDriver:GetPageSource", key="value")
def open(self, type=None, focus=False, private=False):
"""Open a new window, or tab based on the specified context type.
If no context type is given the application will choose the best
option based on tab and window support.
:param type: Type of window to be opened. Can be one of "tab" or "window"
:param focus: If true, the opened window will be focused
:param private: If true, open a private window
:returns: Dict with new window handle, and type of opened window
"""
body = {"type": type, "focus": focus, "private": private}
return self._send_message("WebDriver:NewWindow", body)
def close(self):
"""Close the current window, ending the session if it's the last
window currently open.
:returns: Unordered list of remaining unique window handles as strings
"""
return self._send_message("WebDriver:CloseWindow")
def close_chrome_window(self):
"""Close the currently selected chrome window, ending the session
if it's the last window open.
:returns: Unordered list of remaining unique chrome window handles as strings
"""
return self._send_message("WebDriver:CloseChromeWindow")
def set_context(self, context):
"""Sets the context that Marionette commands are running in.
:param context: Context, may be one of the class properties
`CONTEXT_CHROME` or `CONTEXT_CONTENT`.
Usage example::
marionette.set_context(marionette.CONTEXT_CHROME)
"""
if context not in [self.CONTEXT_CHROME, self.CONTEXT_CONTENT]:
raise ValueError("Unknown context: {}".format(context))
self._send_message("Marionette:SetContext", {"value": context})
@contextmanager
def using_context(self, context):
"""Sets the context that Marionette commands are running in using
a `with` statement. The state of the context on the server is
saved before entering the block, and restored upon exiting it.
:param context: Context, may be one of the class properties
`CONTEXT_CHROME` or `CONTEXT_CONTENT`.
Usage example::
with marionette.using_context(marionette.CONTEXT_CHROME):
# chrome scope
... do stuff ...
"""
scope = self._send_message("Marionette:GetContext", key="value")
self.set_context(context)
try:
yield
finally:
self.set_context(scope)
def switch_to_alert(self):
"""Returns an :class:`~marionette_driver.marionette.Alert` object for
interacting with a currently displayed alert.
::
alert = self.marionette.switch_to_alert()
text = alert.text
alert.accept()
"""
return Alert(self)
def switch_to_window(self, handle, focus=True):
"""Switch to the specified window; subsequent commands will be
directed at the new window.
:param handle: The id of the window to switch to.
:param focus: A boolean value which determins whether to focus
the window that we just switched to.
"""
self._send_message(
"WebDriver:SwitchToWindow", {"handle": handle, "focus": focus}
)
self.window = handle
def switch_to_default_content(self):
"""Switch the current context to page's default content."""
return self.switch_to_frame()
def switch_to_parent_frame(self):
"""
Switch to the Parent Frame
"""
self._send_message("WebDriver:SwitchToParentFrame")
def switch_to_frame(self, frame=None):
"""Switch the current context to the specified frame. Subsequent
commands will operate in the context of the specified frame,
if applicable.
:param frame: A reference to the frame to switch to. This can
be an :class:`~marionette_driver.marionette.WebElement`,
or an integer index. If you call ``switch_to_frame`` without an
argument, it will switch to the top-level frame.
"""
body = {}
if isinstance(frame, WebElement):
body["element"] = frame.id
elif frame is not None:
body["id"] = frame
self._send_message("WebDriver:SwitchToFrame", body)
def get_url(self):
"""Get a string representing the current URL.
On Desktop this returns a string representation of the URL of
the current top level browsing context. This is equivalent to