Source code

Revision control

Copy as Markdown

Other Tools

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
Drives the throttling feature when the test calls our
controlled server.
"""
import http.client
import json
import os
import sys
import time
from urllib.parse import urlparse
from mozperftest.test.browsertime import add_option
from mozperftest.utils import get_tc_secret
ENDPOINTS = {
"linux": "h3.dev.mozaws.net",
"darwin": "h3.mac.dev.mozaws.net",
"win32": "h3.win.dev.mozaws.net",
}
CTRL_SERVER = ENDPOINTS[sys.platform]
TASK_CLUSTER = "TASK_ID" in os.environ.keys()
_SECRET = {
"throttler_host": f"https://{CTRL_SERVER}/_throttler",
"throttler_key": os.environ.get("WEBNETEM_KEY", ""),
}
if TASK_CLUSTER:
_SECRET.update(get_tc_secret())
if _SECRET["throttler_key"] == "":
if TASK_CLUSTER:
raise Exception("throttler_key not found in secret")
raise Exception("WEBNETEM_KEY not set")
_TIMEOUT = 30
WAIT_TIME = 60 * 10
IDLE_TIME = 10
BREATHE_TIME = 20
class Throttler:
def __init__(self, env, host, key):
self.env = env
self.host = host
self.key = key
self.verbose = env.get_arg("verbose", False)
self.logger = self.verbose and self.env.info or self.env.debug
def log(self, msg):
self.logger("[throttler] " + msg)
def _request(self, action, data=None):
kw = {}
headers = {b"X-WEBNETEM-KEY": self.key}
verb = data is None and "GET" or "POST"
if data is not None:
data = json.dumps(data)
headers[b"Content-type"] = b"application/json"
parsed = urlparse(self.host)
server = parsed.netloc
path = parsed.path
if action != "status":
path += "/" + action
self.log(f"Calling {verb} {path}")
conn = http.client.HTTPSConnection(server, timeout=_TIMEOUT)
conn.request(verb, path, body=data, headers=headers, **kw)
resp = conn.getresponse()
res = resp.read()
if resp.status >= 400:
raise Exception(res)
res = json.loads(res)
return res
def start(self, data=None):
self.log("Starting")
now = time.time()
acquired = False
while time.time() - now < WAIT_TIME:
status = self._request("status")
if status.get("test_running"):
# a test is running
self.log("A test is already controlling the server")
self.log(f"Waiting {IDLE_TIME} seconds")
else:
try:
self._request("start_test")
acquired = True
break
except Exception:
# we got beat in the race
self.log("Someone else beat us")
time.sleep(IDLE_TIME)
if not acquired:
raise Exception("Could not acquire the test server")
if data is not None:
self._request("shape", data)
def stop(self):
self.log("Stopping")
try:
self._request("reset")
finally:
self._request("stop_test")
def get_throttler(env):
host = _SECRET["throttler_host"]
key = _SECRET["throttler_key"].encode()
return Throttler(env, host, key)
_PROTOCOL = "h2", "h3"
_PAGE = "gallery", "news", "shopping", "photoblog"
# set the network condition here.
# each item has a name and some netem options:
#
# loss_ratio: specify percentage of packets that will be lost
# loss_corr: specify a correlation factor for the random packet loss
# dup_ratio: specify percentage of packets that will be duplicated
# delay: specify an overall delay for each packet
# jitter: specify amount of jitter in milliseconds
# delay_jitter_corr: specify a correlation factor for the random jitter
# reorder_ratio: specify percentage of packets that will be reordered
# reorder_corr: specify a correlation factor for the random reordering
#
_THROTTLING = (
{"name": "full"}, # no throttling.
{"name": "one", "delay": "20"},
{"name": "two", "delay": "50"},
{"name": "three", "delay": "100"},
{"name": "four", "delay": "200"},
{"name": "five", "delay": "300"},
)
def get_test():
"""Iterate on test conditions.
For each cycle, we return a combination of: protocol, page, throttling
settings. Each combination has a name, and that name will be used along with
the protocol as a prefix for each metrics.
"""
for proto in _PROTOCOL:
for page in _PAGE:
for throttler_settings in _THROTTLING:
yield proto, page, url, throttler_settings
combo = get_test()
def before_cycle(metadata, env, cycle, script):
global combo
if "throttlable" not in script["tags"]:
return
throttler = get_throttler(env)
try:
proto, page, url, throttler_settings = next(combo)
except StopIteration:
combo = get_test()
proto, page, url, throttler_settings = next(combo)
# setting the url for the browsertime script
add_option(env, "browsertime.url", url, overwrite=True)
# enabling http if needed
if proto == "h3":
add_option(env, "firefox.preference", "network.http.http3.enable:true")
# prefix used to differenciate metrics
name = throttler_settings["name"]
script["name"] = f"{name}_{proto}_{page}"
# throttling the controlled server if needed
if throttler_settings != {"name": "full"}:
env.info("Calling the controlled server")
throttler.start(throttler_settings)
else:
env.info("No throttling for this call")
throttler.start()
def after_cycle(metadata, env, cycle, script):
if "throttlable" not in script["tags"]:
return
throttler = get_throttler(env)
try:
throttler.stop()
except Exception:
pass
# give a chance for a competitive job to take over
time.sleep(BREATHE_TIME)