Source code

Revision control

Copy as Markdown

Other Tools

# mypy: allow-untyped-defs
import os
import time
import traceback
from typing import Type
from urllib.parse import urljoin
from webdriver import error
from .base import (
CrashtestExecutor,
TestharnessExecutor,
get_pages,
)
from .executorwebdriver import (
WebDriverCrashtestExecutor,
WebDriverFedCMProtocolPart,
WebDriverProtocol,
WebDriverRefTestExecutor,
WebDriverRun,
WebDriverTestharnessExecutor,
WebDriverTestharnessProtocolPart,
)
from .protocol import PrintProtocolPart
here = os.path.dirname(__file__)
def make_sanitizer_mixin(crashtest_executor_cls: Type[CrashtestExecutor]): # type: ignore[no-untyped-def]
class SanitizerMixin:
def __new__(cls, logger, browser, **kwargs):
# Overriding `__new__` is the least worst way we can force tests to run
# as crashtests at runtime while still supporting:
# * Class attributes (e.g., `extra_timeout`)
# * Pickleability for `multiprocessing` transport
# * The `__wptrunner__` product interface
#
# These requirements rule out approaches with `functools.partial(...)`
# or global variables.
if kwargs.get("sanitizer_enabled"):
executor = crashtest_executor_cls(logger, browser, **kwargs)
def convert_from_crashtest_result(test, result):
if issubclass(cls, TestharnessExecutor):
status = result["status"]
if status == "PASS":
status = "OK"
harness_result = test.make_result(status, result["message"])
# Don't report subtests.
return harness_result, []
# `crashtest` statuses are a subset of `(print-)reftest`
# ones, so no extra conversion necessary.
return cls.convert_result(executor, test, result)
executor.convert_result = convert_from_crashtest_result
return executor
return super().__new__(cls)
return SanitizerMixin
_SanitizerMixin = make_sanitizer_mixin(WebDriverCrashtestExecutor)
class ChromeDriverTestharnessProtocolPart(WebDriverTestharnessProtocolPart):
"""Implementation of `testharness.js` tests controlled by ChromeDriver.
The main difference from the default WebDriver testharness implementation is
that the test window can be reused between tests for better performance.
"""
def setup(self):
super().setup()
# Handle (an alphanumeric string) that may be set if window reuse is
# enabled. This state allows the protocol to distinguish the test
# window from other windows a test itself may create that the "Get
# Window Handles" command also returns.
#
# Because test window persistence is a Chrome-only feature, it's not
# exposed to the base WebDriver testharness executor.
self.test_window = None
self.reuse_window = self.parent.reuse_window
# Company prefix to apply to vendor-specific WebDriver extension commands.
self.cdp_company_prefix = "goog"
def close_test_window(self):
if self.test_window:
self._close_window(self.test_window)
self.test_window = None
def close_old_windows(self):
self.webdriver.actions.release()
for handle in self.webdriver.handles:
if handle not in {self.runner_handle, self.test_window}:
self._close_window(handle)
if not self.reuse_window:
self.close_test_window()
self.webdriver.window_handle = self.runner_handle
return self.runner_handle
def open_test_window(self, window_id):
if self.test_window:
# Try to reuse the existing test window by emulating the `about:blank`
# page with no history you would get with a new window.
try:
self.webdriver.window_handle = self.test_window
# Reset navigation history with Chrome DevTools Protocol:
body = {
"cmd": "Page.resetNavigationHistory",
"params": {},
}
self.webdriver.send_session_command("POST",
self.cdp_company_prefix + "/cdp/execute",
body=body)
self.webdriver.url = "about:blank"
return
except error.NoSuchWindowException:
self.test_window = None
super().open_test_window(window_id)
def get_test_window(self, window_id, parent, timeout=5):
if self.test_window:
return self.test_window
# Poll the handles endpoint for the test window like the base WebDriver
# protocol part, but don't bother checking for the serialized
# WindowProxy (not supported by Chrome currently).
deadline = time.time() + timeout
while time.time() < deadline:
self.test_window = self._poll_handles_for_test_window(parent)
if self.test_window is not None:
assert self.test_window != parent
return self.test_window
time.sleep(0.03)
raise Exception("unable to find test window")
class ChromeDriverPrintProtocolPart(PrintProtocolPart):
def setup(self):
self.webdriver = self.parent.webdriver
self.runner_handle = None
# Company prefix to apply to vendor-specific WebDriver extension commands.
self.cdp_company_prefix = "goog"
def load_runner(self):
url = urljoin(self.parent.executor.server_url("http"), "/print_pdf_runner.html")
self.logger.debug("Loading %s" % url)
try:
self.webdriver.url = url
except Exception as e:
self.logger.critical(
"Loading initial page %s failed. Ensure that the "
"there are no other programs bound to this port and "
"that your firewall rules or network setup does not "
"prevent access.\n%s" % (url, traceback.format_exc(e)))
raise
self.runner_handle = self.webdriver.window_handle
def render_as_pdf(self, width, height):
margin = 0.5
body = {
"cmd": "Page.printToPDF",
"params": {
# Chrome accepts dimensions in inches; we are using cm
"paperWidth": width / 2.54,
"paperHeight": height / 2.54,
"marginLeft": margin,
"marginRight": margin,
"marginTop": margin,
"marginBottom": margin,
"shrinkToFit": False,
"printBackground": True,
}
}
return self.webdriver.send_session_command("POST",
self.cdp_company_prefix + "/cdp/execute",
body=body)["data"]
def pdf_to_png(self, pdf_base64, ranges):
handle = self.webdriver.window_handle
self.webdriver.window_handle = self.runner_handle
try:
rv = self.webdriver.execute_async_script("""
let callback = arguments[arguments.length - 1];
render('%s').then(result => callback(result))""" % pdf_base64)
page_numbers = get_pages(ranges, len(rv))
rv = [item for i, item in enumerate(rv) if i + 1 in page_numbers]
return rv
finally:
self.webdriver.window_handle = handle
class ChromeDriverFedCMProtocolPart(WebDriverFedCMProtocolPart):
def setup(self):
self.webdriver = self.parent.webdriver
# Company prefix to apply to vendor-specific WebDriver extension commands.
self.fedcm_company_prefix = "goog"
def confirm_idp_login(self):
return self.webdriver.send_session_command("POST",
self.fedcm_company_prefix + "/fedcm/confirmidplogin")
class ChromeDriverProtocol(WebDriverProtocol):
implements = [
ChromeDriverFedCMProtocolPart,
ChromeDriverPrintProtocolPart,
ChromeDriverTestharnessProtocolPart,
*(part for part in WebDriverProtocol.implements
if part.name != ChromeDriverTestharnessProtocolPart.name and
part.name != ChromeDriverFedCMProtocolPart.name)
]
reuse_window = False
class ChromeDriverRefTestExecutor(WebDriverRefTestExecutor, _SanitizerMixin): # type: ignore
protocol_cls = ChromeDriverProtocol
class ChromeDriverTestharnessExecutor(WebDriverTestharnessExecutor, _SanitizerMixin): # type: ignore
protocol_cls = ChromeDriverProtocol
def __init__(self, *args, reuse_window=False, **kwargs):
super().__init__(*args, **kwargs)
self.protocol.reuse_window = reuse_window
class ChromeDriverPrintRefTestExecutor(ChromeDriverRefTestExecutor):
protocol_cls = ChromeDriverProtocol
def setup(self, runner):
super().setup(runner)
self.protocol.pdf_print.load_runner()
self.has_window = False
with open(os.path.join(here, "reftest.js")) as f:
self.script = f.read()
def screenshot(self, test, viewport_size, dpi, page_ranges):
assert dpi is None
if not self.has_window:
self.protocol.base.execute_script(self.script)
self.protocol.base.set_window(self.protocol.webdriver.handles[-1])
self.has_window = True
self.viewport_size = viewport_size
self.page_ranges = page_ranges.get(test.url)
timeout = self.timeout_multiplier * test.timeout if self.debug_info is None else None
test_url = self.test_url(test)
return WebDriverRun(self.logger,
self._render,
self.protocol,
test_url,
timeout,
self.extra_timeout).run()
def _render(self, protocol, url, timeout):
protocol.webdriver.url = url
protocol.base.execute_script(self.wait_script, asynchronous=True)
pdf = protocol.pdf_print.render_as_pdf(*self.viewport_size)
screenshots = protocol.pdf_print.pdf_to_png(pdf, self.page_ranges)
for i, screenshot in enumerate(screenshots):
# strip off the data:img/png, part of the url
if screenshot.startswith("data:image/png;base64,"):
screenshots[i] = screenshot.split(",", 1)[1]
return screenshots