Source code

Revision control

Other Tools

1
import json
2
import re
3
import sys
4
5
from mozlog.structured.formatters.base import BaseFormatter
6
from ..executors.base import strip_server
7
8
9
LONE_SURROGATE_RE = re.compile(u"[\uD800-\uDFFF]")
10
11
12
def surrogate_replacement_ucs4(match):
13
return "U+" + hex(ord(match.group()))[2:]
14
15
16
class SurrogateReplacementUcs2(object):
17
def __init__(self):
18
self.skip = False
19
20
def __call__(self, match):
21
char = match.group()
22
23
if self.skip:
24
self.skip = False
25
return char
26
27
is_low = 0xD800 <= ord(char) <= 0xDBFF
28
29
escape = True
30
if is_low:
31
next_idx = match.end()
32
if next_idx < len(match.string):
33
next_char = match.string[next_idx]
34
if 0xDC00 <= ord(next_char) <= 0xDFFF:
35
escape = False
36
37
if not escape:
38
self.skip = True
39
return char
40
41
return "U+" + hex(ord(match.group()))[2:]
42
43
44
if sys.maxunicode == 0x10FFFF:
45
surrogate_replacement = surrogate_replacement_ucs4
46
else:
47
surrogate_replacement = SurrogateReplacementUcs2()
48
49
50
def replace_lone_surrogate(data):
51
return LONE_SURROGATE_RE.subn(surrogate_replacement, data)[0]
52
53
54
class WptreportFormatter(BaseFormatter):
55
"""Formatter that produces results in the format that wptreport expects."""
56
57
def __init__(self):
58
self.raw_results = {}
59
self.results = {}
60
61
def suite_start(self, data):
62
if 'run_info' in data:
63
self.results['run_info'] = data['run_info']
64
self.results['time_start'] = data['time']
65
self.results["results"] = []
66
67
def suite_end(self, data):
68
self.results['time_end'] = data['time']
69
for test_name in self.raw_results:
70
result = {"test": test_name}
71
result.update(self.raw_results[test_name])
72
self.results["results"].append(result)
73
return json.dumps(self.results) + "\n"
74
75
def find_or_create_test(self, data):
76
test_name = data["test"]
77
if test_name not in self.raw_results:
78
self.raw_results[test_name] = {
79
"subtests": [],
80
"status": "",
81
"message": None
82
}
83
return self.raw_results[test_name]
84
85
def test_start(self, data):
86
test = self.find_or_create_test(data)
87
test["start_time"] = data["time"]
88
89
def create_subtest(self, data):
90
test = self.find_or_create_test(data)
91
subtest_name = replace_lone_surrogate(data["subtest"])
92
93
subtest = {
94
"name": subtest_name,
95
"status": "",
96
"message": None
97
}
98
test["subtests"].append(subtest)
99
100
return subtest
101
102
def test_status(self, data):
103
subtest = self.create_subtest(data)
104
subtest["status"] = data["status"]
105
if "expected" in data:
106
subtest["expected"] = data["expected"]
107
if "known_intermittent" in data:
108
subtest["known_intermittent"] = data["known_intermittent"]
109
if "message" in data:
110
subtest["message"] = replace_lone_surrogate(data["message"])
111
112
def test_end(self, data):
113
test = self.find_or_create_test(data)
114
start_time = test.pop("start_time")
115
test["duration"] = data["time"] - start_time
116
test["status"] = data["status"]
117
if "expected" in data:
118
test["expected"] = data["expected"]
119
if "known_intermittent" in data:
120
test["known_intermittent"] = data["known_intermittent"]
121
if "message" in data:
122
test["message"] = replace_lone_surrogate(data["message"])
123
if "reftest_screenshots" in data.get("extra", {}):
124
test["screenshots"] = {
125
strip_server(item["url"]): "sha1:" + item["hash"]
126
for item in data["extra"]["reftest_screenshots"]
127
if type(item) == dict
128
}
129
test_name = data["test"]
130
result = {"test": data["test"]}
131
result.update(self.raw_results[test_name])
132
self.results["results"].append(result)
133
self.raw_results.pop(test_name)
134
135
def assertion_count(self, data):
136
test = self.find_or_create_test(data)
137
test["asserts"] = {
138
"count": data["count"],
139
"min": data["min_expected"],
140
"max": data["max_expected"]
141
}
142
143
def lsan_leak(self, data):
144
if "lsan_leaks" not in self.results:
145
self.results["lsan_leaks"] = []
146
lsan_leaks = self.results["lsan_leaks"]
147
lsan_leaks.append({"frames": data["frames"],
148
"scope": data["scope"],
149
"allowed_match": data.get("allowed_match")})
150
151
def find_or_create_mozleak(self, data):
152
if "mozleak" not in self.results:
153
self.results["mozleak"] = {}
154
scope = data["scope"]
155
if scope not in self.results["mozleak"]:
156
self.results["mozleak"][scope] = {"objects": [], "total": []}
157
return self.results["mozleak"][scope]
158
159
def mozleak_object(self, data):
160
scope_data = self.find_or_create_mozleak(data)
161
scope_data["objects"].append({"process": data["process"],
162
"name": data["name"],
163
"allowed": data.get("allowed", False),
164
"bytes": data["bytes"]})
165
166
def mozleak_total(self, data):
167
scope_data = self.find_or_create_mozleak(data)
168
scope_data["total"].append({"bytes": data["bytes"],
169
"threshold": data.get("threshold", 0),
170
"process": data["process"]})