Source code
Revision control
Copy as Markdown
Other Tools
#!/usr/bin/env python3
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
import base64
import io
import json
import os
import sys
import tempfile
import time
import traceback
from mozlog import formatters, handlers, structuredlog
from PIL import Image, ImageChops, ImageDraw
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
class SnapTestsBase:
def __init__(self, exp):
snap_profile_path = tempfile.mkdtemp(
prefix="snap-tests",
dir=os.path.expanduser("~/snap/firefox/common/.mozilla/firefox/"),
)
driver_service = Service(
executable_path=r"/snap/firefox/current/usr/lib/firefox/geckodriver",
log_output=os.path.join(
os.environ.get("ARTIFACT_DIR", ""), "geckodriver.log"
),
)
options = Options()
if "TEST_GECKODRIVER_TRACE" in os.environ.keys():
options.log.level = "trace"
options.binary_location = r"/snap/bin/firefox"
if not "TEST_NO_HEADLESS" in os.environ.keys():
options.add_argument("--headless")
if "MOZ_AUTOMATION" in os.environ.keys():
os.environ["MOZ_LOG_FILE"] = os.path.join(
os.environ.get("ARTIFACT_DIR"), "gecko.log"
)
options.add_argument("-profile")
options.add_argument(snap_profile_path)
self._driver = webdriver.Firefox(service=driver_service, options=options)
self._driver.set_window_position(0, 0)
self._driver.set_window_size(1280, 1024)
self._logger = structuredlog.StructuredLogger(self.__class__.__name__)
self._logger.add_handler(
handlers.StreamHandler(sys.stdout, formatters.TbplFormatter())
)
test_filter = "test_{}".format(os.environ.get("TEST_FILTER", ""))
object_methods = [
method_name
for method_name in dir(self)
if callable(getattr(self, method_name))
and method_name.startswith(test_filter)
]
self._logger.suite_start(object_methods)
assert self._dir is not None
self._update_channel = None
self._version_major = None
self._wait = WebDriverWait(self._driver, self.get_timeout())
self._longwait = WebDriverWait(self._driver, 60)
with open(exp, "r") as j:
self._expectations = json.load(j)
rv = False
first_tab = self._driver.window_handles[0]
channel = self.update_channel()
if self.is_esr_115():
channel = "esr-115"
if self.is_esr_128():
channel = "esr-128"
for m in object_methods:
self._logger.test_start(m)
expectations = (
self._expectations[m]
if not channel in self._expectations[m]
else self._expectations[m][channel]
)
self._driver.switch_to.window(first_tab)
try:
tabs_before = set(self._driver.window_handles)
rv = getattr(self, m)(expectations)
tabs_after = set(self._driver.window_handles)
self._logger.info("tabs_after OK {}".format(tabs_after))
self._driver.switch_to.parent_frame()
if rv:
self._logger.test_end(m, status="OK")
else:
self._logger.test_end(m, status="FAIL")
except Exception as ex:
rv = False
test_status = "ERROR"
if isinstance(ex, AssertionError):
test_status = "FAIL"
elif isinstance(ex, TimeoutException):
test_status = "TIMEOUT"
test_message = repr(ex)
self.save_screenshot(
"screenshot_{}_{}.png".format(m.lower(), test_status.lower())
)
self._driver.switch_to.parent_frame()
self.save_screenshot(
"screenshot_{}_{}_parent.png".format(m.lower(), test_status.lower())
)
self._logger.test_end(m, status=test_status, message=test_message)
traceback.print_exc()
tabs_after = set(self._driver.window_handles)
self._logger.info("tabs_after EXCEPTION {}".format(tabs_after))
finally:
self._logger.info("tabs_before {}".format(tabs_before))
tabs_opened = tabs_after - tabs_before
self._logger.info("opened {} tabs".format(len(tabs_opened)))
self._logger.info("opened {} tabs".format(tabs_opened))
closed = 0
for tab in tabs_opened:
self._logger.info("switch to {}".format(tab))
self._driver.switch_to.window(tab)
self._logger.info("close {}".format(tab))
self._driver.close()
closed += 1
self._logger.info(
"wait EC.number_of_windows_to_be({})".format(
len(tabs_after) - closed
)
)
self._wait.until(
EC.number_of_windows_to_be(len(tabs_after) - closed)
)
self._driver.switch_to.window(first_tab)
if not "TEST_NO_QUIT" in os.environ.keys():
self._driver.quit()
self._logger.info("Exiting with {}".format(rv))
self._logger.suite_end()
sys.exit(0 if rv is True else 1)
def get_screenshot_destination(self, name):
final_name = name
if "MOZ_AUTOMATION" in os.environ.keys():
final_name = os.path.join(os.environ.get("ARTIFACT_DIR"), name)
return final_name
def save_screenshot(self, name):
final_name = self.get_screenshot_destination(name)
self._logger.info("Saving screenshot '{}' to '{}'".format(name, final_name))
self._driver.save_screenshot(final_name)
def get_timeout(self):
if "TEST_TIMEOUT" in os.environ.keys():
return int(os.getenv("TEST_TIMEOUT"))
else:
return 5
def is_debug_build(self):
return "BUILD_IS_DEBUG" in os.environ.keys()
def maybe_collect_reference(self):
return "TEST_COLLECT_REFERENCE" in os.environ.keys()
def open_tab(self, url):
opened_tabs = len(self._driver.window_handles)
self._driver.switch_to.new_window("tab")
self._wait.until(EC.number_of_windows_to_be(opened_tabs + 1))
self._driver.get(url)
return self._driver.current_window_handle
def update_channel(self):
if self._update_channel is None:
self._driver.set_context("chrome")
self._update_channel = self._driver.execute_script(
"return Services.prefs.getStringPref('app.update.channel');"
)
self._logger.info("Update channel: {}".format(self._update_channel))
self._driver.set_context("content")
return self._update_channel
def version_major(self):
if self._version_major is None:
self._driver.set_context("chrome")
self._version_major = self._driver.execute_script(
"return AppConstants.MOZ_APP_VERSION.split('.')[0];"
)
self._logger.info("Version major: {}".format(self._version_major))
self._driver.set_context("content")
return self._version_major
def is_esr_115(self):
return self.update_channel() == "esr" and self.version_major() == "115"
def is_esr_128(self):
return self.update_channel() == "esr" and self.version_major() == "128"
def assert_rendering(self, exp, element_or_driver):
# wait a bit for things to settle down
time.sleep(0.5)
# Convert as RGB otherwise we cannot get difference
png_bytes = (
element_or_driver.screenshot_as_png
if isinstance(element_or_driver, WebElement)
else (
element_or_driver.get_screenshot_as_png()
if isinstance(element_or_driver, WebDriver)
else base64.b64decode(element_or_driver)
)
)
svg_png = Image.open(io.BytesIO(png_bytes)).convert("RGB")
svg_png_cropped = svg_png.crop((0, 35, svg_png.width - 20, svg_png.height - 10))
if self.maybe_collect_reference():
new_ref = "new_{}".format(exp["reference"])
new_ref_file = self.get_screenshot_destination(new_ref)
self._logger.info(
"Collecting new reference screenshot: {} => {}".format(
new_ref, new_ref_file
)
)
with open(new_ref_file, "wb") as current_screenshot:
svg_png_cropped.save(current_screenshot)
return
svg_ref = Image.open(os.path.join(self._dir, exp["reference"])).convert("RGB")
diff = ImageChops.difference(svg_ref, svg_png_cropped)
bbox = diff.getbbox()
if bbox is not None:
(diff_r, diff_g, diff_b) = diff.getextrema()
min_is_black = (diff_r[0] == 0) and (diff_g[0] == 0) and (diff_b[0] == 0)
max_is_low_enough = (
(diff_r[1] <= 15) and (diff_g[1] <= 15) and (diff_b[1] <= 15)
)
draw_ref = ImageDraw.Draw(svg_ref)
draw_ref.rectangle(bbox, outline="red")
draw_rend = ImageDraw.Draw(svg_png_cropped)
draw_rend.rectangle(bbox, outline="red")
draw_diff = ImageDraw.Draw(diff)
draw_diff.rectangle(bbox, outline="red")
# Some differences have been found, let's verify
self._logger.info("Non empty differences bbox: {}".format(bbox))
buffered = io.BytesIO()
diff.save(buffered, format="PNG")
if "TEST_DUMP_DIFF" in os.environ.keys():
diff_b64 = base64.b64encode(buffered.getvalue())
self._logger.info(
"data:image/png;base64,{}".format(diff_b64.decode("utf-8"))
)
with open(
self.get_screenshot_destination("differences.png"), "wb"
) as diff_screenshot:
diff_screenshot.write(buffered.getvalue())
with open(
self.get_screenshot_destination("current_rendering.png"), "wb"
) as current_screenshot:
svg_png_cropped.save(current_screenshot)
with open(
self.get_screenshot_destination("reference_rendering.png"), "wb"
) as current_screenshot:
svg_ref.save(current_screenshot)
(left, upper, right, lower) = bbox
assert right >= left, "Inconsistent boundaries right={} left={}".format(
right, left
)
assert lower >= upper, "Inconsistent boundaries lower={} upper={}".format(
lower, upper
)
if ((right - left) <= 2) or ((lower - upper) <= 2):
self._logger.info("Difference is a <= 2 pixels band, ignoring")
return
# Assume a difference is mismatching if the minimum pixel value in
# image difference is NOT black or if the maximum pixel value is
# not low enough
assert (
min_is_black and max_is_low_enough
), "Mismatching screenshots for {} with extremas: {}".format(
exp["reference"], (diff_r, diff_g, diff_b)
)
class SnapTests(SnapTestsBase):
def __init__(self, exp):
self._dir = "basic_tests"
super(SnapTests, self).__init__(exp)
def test_about_support(self, exp):
self.open_tab("about:support")
version_box = self._wait.until(
EC.visibility_of_element_located((By.ID, "version-box"))
)
self._wait.until(lambda d: len(version_box.text) > 0)
self._logger.info("about:support version: {}".format(version_box.text))
assert version_box.text == exp["version_box"], "version text should match"
distributionid_box = self._wait.until(
EC.visibility_of_element_located((By.ID, "distributionid-box"))
)
self._wait.until(lambda d: len(distributionid_box.text) > 0)
self._logger.info(
"about:support distribution ID: {}".format(distributionid_box.text)
)
assert (
distributionid_box.text == exp["distribution_id"]
), "distribution_id should match"
windowing_protocol = self._driver.execute_script(
"return document.querySelector('th[data-l10n-id=\"graphics-window-protocol\"').parentNode.lastChild.textContent;"
)
self._logger.info(
"about:support windowing protocol: {}".format(windowing_protocol)
)
assert windowing_protocol == "wayland", "windowing protocol should be wayland"
return True
def test_about_buildconfig(self, exp):
self.open_tab("about:buildconfig")
source_link = self._wait.until(
EC.visibility_of_element_located((By.CSS_SELECTOR, "a"))
)
self._wait.until(lambda d: len(source_link.text) > 0)
self._logger.info("about:buildconfig source: {}".format(source_link.text))
assert source_link.text.startswith(
exp["source_repo"]
), "source repo should exists and match"
build_flags_box = self._wait.until(
EC.visibility_of_element_located((By.CSS_SELECTOR, "p:last-child"))
)
self._wait.until(lambda d: len(build_flags_box.text) > 0)
self._logger.info("about:support buildflags: {}".format(build_flags_box.text))
assert (
build_flags_box.text.find(exp["official"]) >= 0
), "official build flag should be there"
return True
def test_youtube(self, exp):
# Wait for the consent dialog and accept it
self._logger.info("Wait for consent form")
try:
self._wait.until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, "button[aria-label*=Accept]")
)
).click()
except TimeoutException:
self._logger.info("Wait for consent form: timed out, maybe it is not here")
# Find first video and click it
self._logger.info("Wait for one video")
self._wait.until(
EC.visibility_of_element_located((By.ID, "video-title-link"))
).click()
# Wait for duration to be set to something
self._logger.info("Wait for video to start")
video = self._wait.until(
EC.visibility_of_element_located((By.CLASS_NAME, "html5-main-video"))
)
self._wait.until(lambda d: type(video.get_property("duration")) == float)
self._logger.info("video duration: {}".format(video.get_property("duration")))
assert (
video.get_property("duration") > exp["duration"]
), "youtube video should have duration"
self._wait.until(lambda d: video.get_property("currentTime") > exp["playback"])
self._logger.info("video played: {}".format(video.get_property("currentTime")))
assert (
video.get_property("currentTime") > exp["playback"]
), "youtube video should perform playback"
return True
def wait_for_enable_drm(self):
rv = True
self._driver.set_context("chrome")
self._driver.execute_script(
"Services.prefs.setBoolPref('media.gmp-manager.updateEnabled', true);"
)
if self.is_esr_115():
rv = False
else:
enable_drm_button = self._wait.until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, ".notification-button[label='Enable DRM']")
)
)
self._logger.info("Enabling DRMs")
enable_drm_button.click()
self._wait.until(
EC.invisibility_of_element_located(
(By.CSS_SELECTOR, ".notification-button[label='Enable DRM']")
)
)
self._logger.info("Installing DRMs")
self._wait.until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, ".infobar[value='drmContentCDMInstalling']")
)
)
self._logger.info("Waiting for DRMs installation to complete")
self._longwait.until(
EC.invisibility_of_element_located(
(By.CSS_SELECTOR, ".infobar[value='drmContentCDMInstalling']")
)
)
self._driver.set_context("content")
return rv
def test_youtube_film(self, exp):
return True
if not self.wait_for_enable_drm():
self._logger.info("Skipped on ESR because cannot enable DRM")
return True
# Wait for duration to be set to something
self._logger.info("Wait for video to start")
video = self._wait.until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, "video.html5-main-video")
)
)
self._wait.until(lambda d: type(video.get_property("duration")) == float)
self._logger.info("video duration: {}".format(video.get_property("duration")))
assert (
video.get_property("duration") > exp["duration"]
), "youtube video should have duration"
self._driver.execute_script("arguments[0].click();", video)
video.send_keys("k")
self._wait.until(lambda d: video.get_property("currentTime") > exp["playback"])
self._logger.info("video played: {}".format(video.get_property("currentTime")))
assert (
video.get_property("currentTime") > exp["playback"]
), "youtube video should perform playback"
return True
if __name__ == "__main__":
SnapTests(exp=sys.argv[1])