Source code

Revision control

Copy as Markdown

Other Tools

# mypy: allow-untyped-defs
import json
import os
import socket
import threading
import time
import traceback
import uuid
from urllib.parse import urljoin
from .base import (CallbackHandler,
RefTestExecutor,
RefTestImplementation,
TestharnessExecutor,
TimedRunner,
strip_server)
from .protocol import (BaseProtocolPart,
TestharnessProtocolPart,
Protocol,
SelectorProtocolPart,
ClickProtocolPart,
CookiesProtocolPart,
SendKeysProtocolPart,
WindowProtocolPart,
ActionSequenceProtocolPart,
TestDriverProtocolPart)
here = os.path.dirname(__file__)
webdriver = None
exceptions = None
RemoteConnection = None
Command = None
def do_delayed_imports():
global webdriver
global exceptions
global RemoteConnection
global Command
from selenium import webdriver
from selenium.common import exceptions
from selenium.webdriver.remote.remote_connection import RemoteConnection
from selenium.webdriver.remote.command import Command
class SeleniumBaseProtocolPart(BaseProtocolPart):
def setup(self):
self.webdriver = self.parent.webdriver
def execute_script(self, script, asynchronous=False):
method = self.webdriver.execute_async_script if asynchronous else self.webdriver.execute_script
return method(script)
def set_timeout(self, timeout):
self.webdriver.set_script_timeout(timeout * 1000)
@property
def current_window(self):
return self.webdriver.current_window_handle
def set_window(self, handle):
self.webdriver.switch_to_window(handle)
def window_handles(self):
return self.webdriver.window_handles
def load(self, url):
self.webdriver.get(url)
def wait(self):
while True:
try:
return self.webdriver.execute_async_script("""let callback = arguments[arguments.length - 1];
addEventListener("__test_restart", e => {e.preventDefault(); callback(true)})""")
except exceptions.TimeoutException:
pass
except (socket.timeout, exceptions.NoSuchWindowException, exceptions.ErrorInResponseException, OSError):
break
except Exception:
self.logger.error(traceback.format_exc())
break
return False
class SeleniumTestharnessProtocolPart(TestharnessProtocolPart):
def setup(self):
self.webdriver = self.parent.webdriver
self.runner_handle = None
with open(os.path.join(here, "runner.js")) as f:
self.runner_script = f.read()
with open(os.path.join(here, "window-loaded.js")) as f:
self.window_loaded_script = f.read()
def load_runner(self, url_protocol):
if self.runner_handle:
self.webdriver.switch_to_window(self.runner_handle)
url = urljoin(self.parent.executor.server_url(url_protocol),
"/testharness_runner.html")
self.logger.debug("Loading %s" % url)
self.webdriver.get(url)
self.runner_handle = self.webdriver.current_window_handle
format_map = {"title": threading.current_thread().name.replace("'", '"')}
self.parent.base.execute_script(self.runner_script % format_map)
def close_old_windows(self):
handles = [item for item in self.webdriver.window_handles if item != self.runner_handle]
for handle in handles:
try:
self.webdriver.switch_to_window(handle)
self.webdriver.close()
except exceptions.NoSuchWindowException:
pass
self.webdriver.switch_to_window(self.runner_handle)
return self.runner_handle
def get_test_window(self, window_id, parent, timeout=5):
"""Find the test window amongst all the open windows.
This is assumed to be either the named window or the one after the parent in the list of
window handles
:param window_id: The DOM name of the Window
:param parent: The handle of the runner window
:param timeout: The time in seconds to wait for the window to appear. This is because in
some implementations there's a race between calling window.open and the
window being added to the list of WebDriver accessible windows."""
test_window = None
end_time = time.time() + timeout
while time.time() < end_time:
try:
# Try using the JSON serialization of the WindowProxy object,
# it's in Level 1 but nothing supports it yet
win_s = self.webdriver.execute_script("return window['%s'];" % window_id)
win_obj = json.loads(win_s)
test_window = win_obj["window-fcc6-11e5-b4f8-330a88ab9d7f"]
except Exception:
pass
if test_window is None:
after = self.webdriver.window_handles
if len(after) == 2:
test_window = next(iter(set(after) - {parent}))
elif after[0] == parent and len(after) > 2:
# Hope the first one here is the test window
test_window = after[1]
if test_window is not None:
assert test_window != parent
return test_window
time.sleep(0.1)
raise Exception("unable to find test window")
def test_window_loaded(self):
"""Wait until the page in the new window has been loaded.
Hereby ignore Javascript execptions that are thrown when
the document has been unloaded due to a process change.
"""
while True:
try:
self.webdriver.execute_async_script(self.window_loaded_script)
break
except exceptions.JavascriptException:
pass
class SeleniumSelectorProtocolPart(SelectorProtocolPart):
def setup(self):
self.webdriver = self.parent.webdriver
def elements_by_selector(self, selector):
return self.webdriver.find_elements_by_css_selector(selector)
def elements_by_selector_and_frame(self, element_selector, frame):
return self.webdriver.find_elements_by_css_selector(element_selector)
class SeleniumClickProtocolPart(ClickProtocolPart):
def setup(self):
self.webdriver = self.parent.webdriver
def element(self, element):
return element.click()
class SeleniumCookiesProtocolPart(CookiesProtocolPart):
def setup(self):
self.webdriver = self.parent.webdriver
def delete_all_cookies(self):
self.logger.info("Deleting all cookies")
return self.webdriver.delete_all_cookies()
def get_all_cookies(self):
self.logger.info("Getting all cookies")
return self.webdriver.get_all_cookies()
def get_named_cookie(self, name):
self.logger.info("Getting cookie named %s" % name)
try:
return self.webdriver.get_named_cookie(name)
except exceptions.NoSuchCookieException:
return None
class SeleniumWindowProtocolPart(WindowProtocolPart):
def setup(self):
self.webdriver = self.parent.webdriver
def minimize(self):
self.previous_rect = self.webdriver.window.rect
self.logger.info("Minimizing")
return self.webdriver.minimize()
def set_rect(self, rect):
self.logger.info("Setting window rect")
self.webdriver.window.rect = rect
def get_rect(self):
self.logger.info("Getting window rect")
return self.webdriver.window.rect
class SeleniumSendKeysProtocolPart(SendKeysProtocolPart):
def setup(self):
self.webdriver = self.parent.webdriver
def send_keys(self, element, keys):
return element.send_keys(keys)
class SeleniumActionSequenceProtocolPart(ActionSequenceProtocolPart):
def setup(self):
self.webdriver = self.parent.webdriver
def send_actions(self, actions):
self.webdriver.execute(Command.W3C_ACTIONS, {"actions": actions})
def release(self):
self.webdriver.execute(Command.W3C_CLEAR_ACTIONS, {})
class SeleniumTestDriverProtocolPart(TestDriverProtocolPart):
def setup(self):
self.webdriver = self.parent.webdriver
def send_message(self, cmd_id, message_type, status, message=None):
obj = {
"cmd_id": cmd_id,
"type": "testdriver-%s" % str(message_type),
"status": str(status)
}
if message:
obj["message"] = str(message)
self.webdriver.execute_script("window.postMessage(%s, '*')" % json.dumps(obj))
class SeleniumProtocol(Protocol):
implements = [SeleniumBaseProtocolPart,
SeleniumTestharnessProtocolPart,
SeleniumSelectorProtocolPart,
SeleniumClickProtocolPart,
SeleniumCookiesProtocolPart,
SeleniumSendKeysProtocolPart,
SeleniumTestDriverProtocolPart,
SeleniumWindowProtocolPart,
SeleniumActionSequenceProtocolPart]
def __init__(self, executor, browser, capabilities, **kwargs):
do_delayed_imports()
super().__init__(executor, browser)
self.capabilities = capabilities
self.url = browser.webdriver_url
self.webdriver = None
def connect(self):
"""Connect to browser via Selenium's WebDriver implementation."""
self.logger.debug("Connecting to Selenium on URL: %s" % self.url)
self.webdriver = webdriver.Remote(command_executor=RemoteConnection(self.url.strip("/"),
resolve_ip=False),
desired_capabilities=self.capabilities)
def teardown(self):
self.logger.debug("Hanging up on Selenium session")
try:
self.webdriver.quit()
except Exception:
pass
del self.webdriver
def is_alive(self):
try:
# Get a simple property over the connection
self.webdriver.current_window_handle
# TODO what exception?
except (socket.timeout, exceptions.ErrorInResponseException):
return False
return True
def after_connect(self):
self.testharness.load_runner(self.executor.last_environment["protocol"])
class SeleniumRun(TimedRunner):
def set_timeout(self):
timeout = self.timeout
try:
self.protocol.base.set_timeout(timeout + self.extra_timeout)
except exceptions.ErrorInResponseException:
msg = "Lost WebDriver connection"
self.logger.error(msg)
return ("INTERNAL-ERROR", msg)
def run_func(self):
try:
self.result = True, self.func(self.protocol, self.url, self.timeout)
except exceptions.TimeoutException:
self.result = False, ("EXTERNAL-TIMEOUT", None)
except (socket.timeout, exceptions.ErrorInResponseException):
self.result = False, ("CRASH", None)
except Exception as e:
message = str(getattr(e, "message", ""))
if message:
message += "\n"
message += traceback.format_exc()
self.result = False, ("INTERNAL-ERROR", message)
finally:
self.result_flag.set()
class SeleniumTestharnessExecutor(TestharnessExecutor):
supports_testdriver = True
def __init__(self, logger, browser, server_config, timeout_multiplier=1,
close_after_done=True, capabilities=None, debug_info=None,
**kwargs):
"""Selenium-based executor for testharness.js tests"""
TestharnessExecutor.__init__(self, logger, browser, server_config,
timeout_multiplier=timeout_multiplier,
debug_info=debug_info)
self.protocol = SeleniumProtocol(self, browser, capabilities)
with open(os.path.join(here, "testharness_webdriver_resume.js")) as f:
self.script_resume = f.read()
self.close_after_done = close_after_done
self.window_id = str(uuid.uuid4())
def is_alive(self):
return self.protocol.is_alive()
def on_environment_change(self, new_environment):
if new_environment["protocol"] != self.last_environment["protocol"]:
self.protocol.testharness.load_runner(new_environment["protocol"])
def do_test(self, test):
url = self.test_url(test)
success, data = SeleniumRun(self.logger,
self.do_testharness,
self.protocol,
url,
test.timeout * self.timeout_multiplier,
self.extra_timeout).run()
if success:
return self.convert_result(test, data)
return (test.make_result(*data), [])
def do_testharness(self, protocol, url, timeout):
format_map = {"url": strip_server(url)}
parent_window = protocol.testharness.close_old_windows()
# Now start the test harness
protocol.base.execute_script("window.open('about:blank', '%s', 'noopener')" % self.window_id)
test_window = protocol.testharness.get_test_window(self.window_id,
parent_window,
timeout=5*self.timeout_multiplier)
self.protocol.base.set_window(test_window)
protocol.testharness.test_window_loaded()
protocol.base.load(url)
handler = CallbackHandler(self.logger, protocol, test_window)
while True:
result = protocol.base.execute_script(
self.script_resume % format_map, asynchronous=True)
done, rv = handler(result)
if done:
break
return rv
class SeleniumRefTestExecutor(RefTestExecutor):
def __init__(self, logger, browser, server_config, timeout_multiplier=1,
screenshot_cache=None, close_after_done=True,
debug_info=None, capabilities=None, **kwargs):
"""Selenium WebDriver-based executor for reftests"""
RefTestExecutor.__init__(self,
logger,
browser,
server_config,
screenshot_cache=screenshot_cache,
timeout_multiplier=timeout_multiplier,
debug_info=debug_info)
self.protocol = SeleniumProtocol(self, browser,
capabilities=capabilities)
self.implementation = RefTestImplementation(self)
self.close_after_done = close_after_done
self.has_window = False
with open(os.path.join(here, "test-wait.js")) as f:
self.wait_script = f.read() % {"classname": "reftest-wait"}
def reset(self):
self.implementation.reset()
def is_alive(self):
return self.protocol.is_alive()
def do_test(self, test):
self.logger.info("Test requires OS-level window focus")
width_offset, height_offset = self.protocol.webdriver.execute_script(
"""return [window.outerWidth - window.innerWidth,
window.outerHeight - window.innerHeight];"""
)
self.protocol.webdriver.set_window_position(0, 0)
self.protocol.webdriver.set_window_size(800 + width_offset, 600 + height_offset)
result = self.implementation.run_test(test)
return self.convert_result(test, result)
def screenshot(self, test, viewport_size, dpi, page_ranges):
assert viewport_size is None
assert dpi is None
return SeleniumRun(self.logger,
self._screenshot,
self.protocol,
self.test_url(test),
test.timeout,
self.extra_timeout).run()
def _screenshot(self, protocol, url, timeout):
webdriver = protocol.webdriver
webdriver.get(url)
webdriver.execute_async_script(self.wait_script)
screenshot = webdriver.get_screenshot_as_base64()
# strip off the data:img/png, part of the url
if screenshot.startswith("data:image/png;base64,"):
screenshot = screenshot.split(",", 1)[1]
return screenshot