Source code
Revision control
Copy as Markdown
Other Tools
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
"""
Shared module for fetching intermittent test failure data from Treeherder and Bugzilla.
"""
import datetime
import re
from typing import Literal, Optional, TypedDict
import requests
from wpt_path_utils import resolve_wpt_path
USER_AGENT = "mach-intermittent-failures/1.0"
class BugzillaFailure(TypedDict):
bug_id: int
bug_count: int
class BugzillaSummary(TypedDict):
summary: str
id: int
status: Optional[str]
resolution: Optional[str]
creation_time: Optional[str]
last_change_time: Optional[str]
comment_count: Optional[int]
class IntermittentFailure(TypedDict):
bug_id: int
failure_count: int
summary: str
status: str
resolution: str
test_path: Optional[str]
creation_time: Optional[str]
last_change_time: Optional[str]
comment_count: Optional[int]
class IntermittentFailuresFetcher:
"""Fetches intermittent test failure data from Treeherder and Bugzilla APIs."""
def __init__(self, days: int = 7, threshold: int = 30, verbose: bool = False):
self.days = days
self.threshold = threshold
self.verbose = verbose
self.end_date = datetime.datetime.now()
self.start_date = self.end_date - datetime.timedelta(days=self.days)
def get_failures(self, branch: str = "trunk") -> list[IntermittentFailure]:
"""
Fetch intermittent failures that meet the threshold.
Returns a list of intermittent failures with bug information.
"""
bugzilla_failures = self._get_bugzilla_failures(branch)
bug_list = self._keep_bugs_above_threshold(bugzilla_failures)
if not bug_list:
return []
bug_summaries = self._get_bugzilla_summaries(bug_list)
results = []
for bug in bug_summaries:
bug_id = bug["id"]
if bug_id in bug_list:
result: IntermittentFailure = {
"bug_id": bug_id,
"failure_count": self._get_failure_count(bugzilla_failures, bug_id),
"summary": bug["summary"],
"status": bug.get("status", "UNKNOWN"),
"resolution": bug.get("resolution", ""),
"test_path": None,
"creation_time": bug.get("creation_time"),
"last_change_time": bug.get("last_change_time"),
"comment_count": bug.get("comment_count"),
}
if "single tracking bug" in bug["summary"]:
match = re.findall(
r" ([^\s]+\/?\.[a-z0-9-A-Z]+) \|", bug["summary"]
)
if match:
test_path = match[0]
if test_path.startswith("/"):
test_path = resolve_wpt_path(test_path)
result["test_path"] = test_path
results.append(result)
return results
def get_single_tracking_bugs_with_paths(
self, branch: str = "trunk"
) -> list[tuple[int, str]]:
"""
Get only single tracking bugs that have test paths.
This is what high_freq_skipfails uses.
Returns a list of (bug_id, test_path) tuples.
"""
failures = self.get_failures(branch)
results = []
for failure in failures:
if failure["test_path"] and "single tracking bug" in failure["summary"]:
results.append((failure["bug_id"], failure["test_path"]))
return results
def _get_bugzilla_failures(self, branch: str = "trunk") -> list[BugzillaFailure]:
"""Fetch failure data from Treeherder API."""
url = (
f"?startday={self.start_date.date()}&endday={self.end_date.date()}"
f"&tree={branch}&failurehash=all"
)
if self.verbose:
print(f"[DEBUG] Fetching failures from Treeherder: {url}")
response = requests.get(url, headers={"User-agent": USER_AGENT})
response.raise_for_status()
return [
item
for item in response.json()
if "bug_id" in item and isinstance(item["bug_id"], int)
]
def _keep_bugs_above_threshold(
self, failure_list: list[BugzillaFailure]
) -> list[int]:
"""Filter bugs that have failure counts above the threshold."""
if not failure_list:
return []
bug_counts = {}
for failure in failure_list:
bug_id = failure["bug_id"]
bug_counts[bug_id] = bug_counts.get(bug_id, 0) + failure.get("bug_count", 1)
return [
bug_id for bug_id, count in bug_counts.items() if count >= self.threshold
]
def _get_failure_count(
self, failure_list: list[BugzillaFailure], bug_id: int
) -> int:
"""Get the total failure count for a specific bug."""
total = 0
for failure in failure_list:
if failure["bug_id"] == bug_id:
total += failure.get("bug_count", 1)
return total
def _get_bugzilla_summaries(self, bug_id_list: list[int]) -> list[BugzillaSummary]:
"""Fetch bug summaries from Bugzilla REST API."""
if not bug_id_list:
return []
url = (
f"?include_fields=summary,id,status,resolution,creation_time,last_change_time,comment_count"
f"&id={','.join(str(id) for id in bug_id_list)}"
)
if self.verbose:
print(f"[DEBUG] Fetching bug details from Bugzilla: {url}")
response = requests.get(url, headers={"User-agent": USER_AGENT})
response.raise_for_status()
json_response: dict[Literal["bugs"], list[BugzillaSummary]] = response.json()
return json_response.get("bugs", [])