Source code
Revision control
Copy as Markdown
Other Tools
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
import argparse
import glob
import io
import json
import os
import platform
import sys
from mach.decorators import Command
from mozbuild.base import BuildEnvironmentNotFoundException, MozbuildObject
here = os.path.abspath(os.path.dirname(__file__))
GVE = "org.mozilla.geckoview_example"
def get(url):
import requests
resp = requests.get(url)
resp.raise_for_status()
return resp
def untar(fileobj, dest):
import tarfile
with tarfile.open(fileobj=fileobj, mode="r") as tar_data:
tar_data.extractall(path=dest)
def unzip(fileobj, dest):
import zipfile
with zipfile.ZipFile(fileobj) as zip_data:
zip_data.extractall(path=dest)
def writable_dir(path):
if not os.path.isdir(path):
raise argparse.ArgumentTypeError(f"{path} is not a valid dir")
if os.access(path, os.W_OK):
return path
else:
raise argparse.ArgumentTypeError(f"{path} is not a writable dir")
def create_parser_interventions():
from mozlog import commandline
parser = argparse.ArgumentParser()
parser.add_argument("--webdriver-binary", help="Path to webdriver binary")
parser.add_argument(
"--webdriver-port",
action="store",
default="4444",
help="Port on which to run WebDriver",
)
parser.add_argument(
"--webdriver-ws-port",
action="store",
default="9222",
help="Port on which to run WebDriver BiDi websocket",
)
parser.add_argument("-b", "--bugs", nargs="*", help="Bugs to run tests for")
parser.add_argument(
"--do2fa",
action="store_true",
default=False,
help="Do two-factor auth live in supporting tests",
)
parser.add_argument(
"--config", help="Path to JSON file containing logins and other settings"
)
parser.add_argument(
"--debug", action="store_true", default=False, help="Debug failing tests"
)
parser.add_argument(
"-H",
"--headless",
action="store_true",
default=False,
help="Run firefox in headless mode",
)
parser.add_argument(
"--interventions",
action="store",
default="both",
choices=["enabled", "disabled", "both", "none"],
help="Enable webcompat interventions",
)
parser.add_argument(
"--shims",
action="store",
default="none",
choices=["enabled", "disabled", "both", "none"],
help="Enable SmartBlock shims",
)
parser.add_argument(
"--platform",
action="store",
choices=["android", "desktop"],
help="Platform to target",
)
parser.add_argument(
"--failure-screenshots-dir",
action="store",
type=writable_dir,
help="Path to save failure screenshots",
)
parser.add_argument(
"-s",
"--no-failure-screenshots",
action="store_true",
default=False,
help="Do not save a screenshot for each test failure",
)
parser.add_argument(
"-P",
"--platform-override",
action="store",
choices=["android", "linux", "mac", "windows"],
help="Override key navigator properties to match the given platform and/or use responsive design mode to mimic the given platform",
)
desktop_group = parser.add_argument_group("Desktop-specific arguments")
desktop_group.add_argument("--binary", help="Path to browser binary")
android_group = parser.add_argument_group("Android-specific arguments")
android_group.add_argument(
"--device-serial",
action="store",
help="Running Android instances to connect to, if not emulator-5554",
)
android_group.add_argument(
"--package-name",
action="store",
default=GVE,
help="Android package name to use",
)
commandline.add_logging_group(parser)
return parser
class InterventionTest(MozbuildObject):
def set_default_kwargs(self, logger, command_context, kwargs):
platform = kwargs["platform"]
binary = kwargs["binary"]
device_serial = kwargs["device_serial"]
try:
is_gve_build = command_context.substs.get("MOZ_APP_NAME") == "fennec"
except BuildEnvironmentNotFoundException:
# If we don't have a build, just use the logic below to choose between
# desktop and Android
is_gve_build = False
if platform == "android" or (
platform is None and binary is None and (device_serial or is_gve_build)
):
kwargs["platform"] = "android"
else:
kwargs["platform"] = "desktop"
if kwargs["platform"] == "desktop" and kwargs["binary"] is None:
kwargs["binary"] = self.get_binary_path()
if kwargs["webdriver_binary"] is None:
webdriver_binary = self.get_binary_path(
"geckodriver", validate_exists=False
)
if not os.path.exists(webdriver_binary):
webdriver_binary = self.install_geckodriver(
logger, dest=os.path.dirname(webdriver_binary)
)
if not os.path.exists(webdriver_binary):
logger.error("Can't find geckodriver")
sys.exit(1)
kwargs["webdriver_binary"] = webdriver_binary
def platform_string_geckodriver(self):
uname = platform.uname()
platform_name = {"Linux": "linux", "Windows": "win", "Darwin": "macos"}.get(
uname[0]
)
arch = uname[4].lower()
if arch in {"aarch64", "arm64"}:
bits = "-aarch64"
elif platform_name in ("linux", "win"):
bits = "64" if arch == "x86_64" else "32"
elif platform_name == "macos":
bits = ""
else:
raise ValueError(f"No precompiled geckodriver for platform {uname}")
return f"{platform_name}{bits}"
def install_geckodriver(self, logger, dest):
"""Install latest Geckodriver."""
if dest is None:
dest = os.path.join(self.distdir, "dist", "bin")
is_windows = platform.uname()[0] == "Windows"
release = get(
).json()
ext = "zip" if is_windows else "tar.gz"
platform_name = self.platform_string_geckodriver()
name_suffix = f"-{platform_name}.{ext}"
for item in release["assets"]:
if item["name"].endswith(name_suffix):
url = item["browser_download_url"]
break
else:
raise ValueError(f"Failed to find geckodriver for platform {platform_name}")
logger.info(f"Installing geckodriver from {url}")
data = io.BytesIO(get(url).content)
data.seek(0)
decompress = unzip if ext == "zip" else untar
decompress(data, dest=dest)
exe_ext = ".exe" if is_windows else ""
path = os.path.join(dest, f"geckodriver{exe_ext}")
return path
def setup_device(self, command_context, kwargs):
if kwargs["platform"] != "android":
return
app = kwargs["package_name"]
device_serial = kwargs["device_serial"]
if not device_serial:
from mozrunner.devices.android_device import (
InstallIntent,
verify_android_device,
)
# verify_android_device sets up device/emulator and records selected
# one to DEVICE_SERIAL environment.
verify_android_device(
command_context, app=app, network=True, install=InstallIntent.YES
)
kwargs["device_serial"] = os.environ.get("DEVICE_SERIAL")
# GVE does not have the webcompat addon by default. Add it.
if app == GVE:
kwargs["addon"] = "/data/local/tmp/webcompat.xpi"
push_to_device(
command_context.substs["ADB"],
device_serial,
webcompat_addon(command_context),
kwargs["addon"],
)
def run(self, command_context, **kwargs):
import mozlog
import runner
mozlog.commandline.setup_logging(
"test-interventions", kwargs, {"mach": sys.stdout}
)
logger = mozlog.get_default_logger("test-interventions")
log_level = "INFO"
# It's not trivial to get a single log level out of mozlog, because we might have
# different levels going to different outputs. We look for the maximum (i.e. most
# verbose) level of any handler with an attached formatter.
configured_level_number = None
for handler in logger.handlers:
if hasattr(handler, "formatter") and hasattr(handler.formatter, "level"):
formatter_level = handler.formatter.level
configured_level_number = (
formatter_level
if configured_level_number is None
else max(configured_level_number, formatter_level)
)
if configured_level_number is not None:
for level, number in mozlog.structuredlog.log_levels.items():
if number == configured_level_number:
log_level = level
break
status_handler = mozlog.handlers.StatusHandler()
logger.add_handler(status_handler)
self.set_default_kwargs(logger, command_context, kwargs)
self.setup_device(command_context, kwargs)
if kwargs["interventions"] != "none":
interventions = (
["enabled", "disabled"]
if kwargs["interventions"] == "both"
else [kwargs["interventions"]]
)
for interventions_setting in interventions:
runner.run(
logger,
os.path.join(here, "interventions"),
kwargs["webdriver_binary"],
kwargs["webdriver_port"],
kwargs["webdriver_ws_port"],
browser_binary=kwargs.get("binary"),
device_serial=kwargs.get("device_serial"),
package_name=kwargs.get("package_name"),
addon=kwargs.get("addon"),
bugs=kwargs["bugs"],
debug=kwargs["debug"],
interventions=interventions_setting,
config=kwargs["config"],
headless=kwargs["headless"],
do2fa=kwargs["do2fa"],
log_level=log_level,
failure_screenshots_dir=kwargs.get("failure_screenshots_dir"),
no_failure_screenshots=kwargs.get("no_failure_screenshots"),
platform_override=kwargs.get("platform_override"),
)
if kwargs["shims"] != "none":
shims = (
["enabled", "disabled"]
if kwargs["shims"] == "both"
else [kwargs["shims"]]
)
for shims_setting in shims:
runner.run(
logger,
os.path.join(here, "shims"),
kwargs["webdriver_binary"],
kwargs["webdriver_port"],
kwargs["webdriver_ws_port"],
browser_binary=kwargs.get("binary"),
device_serial=kwargs.get("device_serial"),
package_name=kwargs.get("package_name"),
addon=kwargs.get("addon"),
bugs=kwargs["bugs"],
debug=kwargs["debug"],
shims=shims_setting,
config=kwargs["config"],
headless=kwargs["headless"],
do2fa=kwargs["do2fa"],
failure_screenshots_dir=kwargs.get("failure_screenshots_dir"),
no_failure_screenshots=kwargs.get("no_failure_screenshots"),
platform_override=kwargs.get("platform_override"),
)
summary = status_handler.summarize()
passed = (
summary.unexpected_statuses == 0
and summary.log_level_counts.get("ERROR", 0) == 0
and summary.log_level_counts.get("CRITICAL", 0) == 0
)
return passed
def webcompat_addon(command_context):
import shutil
import tempfile
src = os.path.join(command_context.topsrcdir, "browser", "extensions", "webcompat")
# We use #include directives in the system addon's moz.build (to inject our JSON config
# into interventions.js), so we must do that here to make a working XPI.
tmpdir_kwargs = {}
if sys.version_info.major >= 3 and sys.version_info.minor >= 10:
tmpdir_kwargs["ignore_cleanup_errors"] = True
with tempfile.TemporaryDirectory(**tmpdir_kwargs) as src_copy:
def process_includes(path):
fullpath = os.path.join(src_copy, path)
in_lines = None
with open(fullpath) as f:
in_lines = f.readlines()
with open(fullpath, "w") as f:
addonpath = os.path.dirname(fullpath)
for line in in_lines:
if not line.startswith("const AVAILABLE_INTERVENTIONS = {}"):
f.write(line)
continue
jsons = f"{addonpath}/data/interventions/*.json"
interventions = {}
for ipath in glob.glob(jsons):
bug_number = os.path.splitext(os.path.basename(ipath))[0].split(
"-"
)[0]
with open(ipath) as fd:
interventions[bug_number] = json.load(fd)
str = json.dumps(dict(sorted(interventions.items())), indent=2)
f.write(f"const AVAILABLE_INTERVENTIONS = {str};\n")
shutil.copytree(src, src_copy, dirs_exist_ok=True)
process_includes("run.js")
dst = os.path.join(
command_context.virtualenv_manager.virtualenv_root, "webcompat.xpi"
)
shutil.make_archive(dst, "zip", src_copy)
shutil.move(f"{dst}.zip", dst)
return dst
def push_to_device(adb_path, device_serial, local_path, remote_path):
from mozdevice import ADBDeviceFactory
device = ADBDeviceFactory(adb=adb_path, device=device_serial)
device.push(local_path, remote_path)
device.chmod(remote_path)
@Command(
"test-interventions",
category="testing",
description="Test the webcompat interventions",
parser=create_parser_interventions,
virtualenv_name="webcompat",
)
def test_interventions(command_context, **params):
here = os.path.abspath(os.path.dirname(__file__))
command_context.virtualenv_manager.activate()
command_context.virtualenv_manager.install_pip_requirements(
os.path.join(here, "requirements.txt"),
require_hashes=False,
)
intervention_test = command_context._spawn(InterventionTest)
return 0 if intervention_test.run(command_context, **params) else 1