Source code

Revision control

Copy as Markdown

Other Tools

import json
import pipes
import re
from .progressbar import NullProgressBar, ProgressBar
from .structuredlog import TestLogger
# subprocess.list2cmdline does not properly escape for sh-like shells
def escape_cmdline(args):
return " ".join([pipes.quote(a) for a in args])
class TestOutput:
"""Output from a test run."""
def __init__(self, test, cmd, out, err, rc, dt, timed_out, extra=None):
self.test = test # Test
self.cmd = cmd # str: command line of test
self.out = out # str: stdout
self.err = err # str: stderr
self.rc = rc # int: return code
self.dt = dt # float: run time
self.timed_out = timed_out # bool: did the test time out
self.extra = extra # includes the pid on some platforms
def describe_failure(self):
if self.timed_out:
return "Timeout"
lines = self.err.splitlines()
for line in lines:
# Skip the asm.js compilation success message.
if "Successfully compiled asm.js code" not in line:
return line
return "Unknown"
class NullTestOutput:
"""Variant of TestOutput that indicates a test was not run."""
def __init__(self, test):
self.test = test
self.cmd = ""
self.out = ""
self.err = ""
self.rc = 0
self.dt = 0.0
self.timed_out = False
class TestResult:
PASS = "PASS"
FAIL = "FAIL"
CRASH = "CRASH"
"""Classified result from a test run."""
def __init__(self, test, result, results, wpt_results=None):
self.test = test
self.result = result
self.results = results
self.wpt_results = wpt_results # Only used for wpt tests.
@classmethod
def from_wpt_output(cls, output):
"""Parse the output from a web-platform test that uses testharness.js.
(The output is written to stdout in js/src/tests/testharnessreport.js.)
"""
from wptrunner.executors.base import testharness_result_converter
rc = output.rc
stdout = output.out.split("\n")
if rc != 0:
if rc == 3:
harness_status = "ERROR"
harness_message = "Exit code reported exception"
else:
harness_status = "CRASH"
harness_message = "Exit code reported crash"
tests = []
else:
for idx, line in enumerate(stdout):
if line.startswith("WPT OUTPUT: "):
msg = line[len("WPT OUTPUT: ") :]
data = [output.test.wpt.url] + json.loads(msg)
harness_status_obj, tests = testharness_result_converter(
output.test.wpt, data
)
harness_status = harness_status_obj.status
harness_message = "Reported by harness: %s" % (
harness_status_obj.message,
)
del stdout[idx]
break
else:
harness_status = "ERROR"
harness_message = "No harness output found"
tests = []
stdout.append("Harness status: %s (%s)" % (harness_status, harness_message))
result = cls.PASS
results = []
subtests = []
expected_harness_status = output.test.wpt.expected()
if harness_status != expected_harness_status:
if harness_status == "CRASH":
result = cls.CRASH
else:
result = cls.FAIL
else:
for test in tests:
test_output = 'Subtest "%s": ' % (test.name,)
expected = output.test.wpt.expected(test.name)
if test.status == expected:
test_result = (cls.PASS, "")
test_output += "as expected: %s" % (test.status,)
else:
test_result = (cls.FAIL, test.message)
result = cls.FAIL
test_output += "expected %s, found %s" % (expected, test.status)
if test.message:
test_output += ' (with message: "%s")' % (test.message,)
subtests.append(
{
"test": output.test.wpt.id,
"subtest": test.name,
"status": test.status,
"expected": expected,
}
)
results.append(test_result)
stdout.append(test_output)
output.out = "\n".join(stdout) + "\n"
wpt_results = {
"name": output.test.wpt.id,
"status": harness_status,
"expected": expected_harness_status,
"subtests": subtests,
}
return cls(output.test, result, results, wpt_results)
@classmethod
def from_output(cls, output):
test = output.test
result = None # str: overall result, see class-level variables
results = [] # (str,str) list: subtest results (pass/fail, message)
if test.wpt:
return cls.from_wpt_output(output)
out, err, rc = output.out, output.err, output.rc
failures = 0
passes = 0
expected_rcs = []
if test.path.endswith("-n.js"):
expected_rcs.append(3)
for line in out.split("\n"):
if line.startswith(" FAILED!"):
failures += 1
msg = line[len(" FAILED! ") :]
results.append((cls.FAIL, msg))
elif line.startswith(" PASSED!"):
passes += 1
msg = line[len(" PASSED! ") :]
results.append((cls.PASS, msg))
else:
m = re.match(
"--- NOTE: IN THIS TESTCASE, WE EXPECT EXIT CODE"
" ((?:-|\\d)+) ---",
line,
)
if m:
expected_rcs.append(int(m.group(1)))
if test.error is not None:
expected_rcs.append(3)
if test.error not in err:
failures += 1
results.append(
(cls.FAIL, "Expected uncaught error: {}".format(test.error))
)
if rc and rc not in expected_rcs:
if rc == 3:
result = cls.FAIL
else:
result = cls.CRASH
else:
if (rc or passes > 0) and failures == 0:
result = cls.PASS
else:
result = cls.FAIL
return cls(test, result, results)
class TestDuration:
def __init__(self, test, duration):
self.test = test
self.duration = duration
class ResultsSink:
def __init__(self, testsuite, options, testcount):
self.options = options
self.fp = options.output_fp
if self.options.format == "automation":
self.slog = TestLogger(testsuite)
self.slog.suite_start()
self.wptreport = None
if self.options.wptreport:
try:
from .wptreport import WptreportHandler
self.wptreport = WptreportHandler(self.options.wptreport)
self.wptreport.suite_start()
except ImportError:
pass
self.groups = {}
self.output_dict = {}
self.counts = {"PASS": 0, "FAIL": 0, "TIMEOUT": 0, "SKIP": 0}
self.slow_tests = []
self.n = 0
if options.hide_progress:
self.pb = NullProgressBar()
else:
fmt = [
{"value": "PASS", "color": "green"},
{"value": "FAIL", "color": "red"},
{"value": "TIMEOUT", "color": "blue"},
{"value": "SKIP", "color": "brightgray"},
]
self.pb = ProgressBar(testcount, fmt)
def push(self, output):
if self.options.show_slow and output.dt >= self.options.slow_test_threshold:
self.slow_tests.append(TestDuration(output.test, output.dt))
if output.timed_out:
self.counts["TIMEOUT"] += 1
if isinstance(output, NullTestOutput):
if self.options.format == "automation":
self.print_automation_result(
"TEST-KNOWN-FAIL", output.test, time=output.dt, skip=True
)
self.counts["SKIP"] += 1
self.n += 1
else:
result = TestResult.from_output(output)
if self.wptreport is not None and result.wpt_results:
self.wptreport.test(result.wpt_results, output.dt)
tup = (result.result, result.test.expect, result.test.random)
dev_label = self.LABELS[tup][1]
if self.options.check_output:
if output.test.path in self.output_dict.keys():
if self.output_dict[output.test.path] != output:
self.counts["FAIL"] += 1
self.print_automation_result(
"TEST-UNEXPECTED-FAIL",
result.test,
time=output.dt,
message="Same test with different flag producing different output",
)
else:
self.output_dict[output.test.path] = output
if output.timed_out:
dev_label = "TIMEOUTS"
self.groups.setdefault(dev_label, []).append(result)
if dev_label == "REGRESSIONS":
show_output = (
self.options.show_output or not self.options.no_show_failed
)
elif dev_label == "TIMEOUTS":
show_output = self.options.show_output
else:
show_output = self.options.show_output and not self.options.failed_only
if dev_label in ("REGRESSIONS", "TIMEOUTS"):
show_cmd = self.options.show_cmd
else:
show_cmd = self.options.show_cmd and not self.options.failed_only
if show_output or show_cmd:
self.pb.beginline()
if show_output:
print(
"## {}: rc = {:d}, run time = {}".format(
output.test.path, output.rc, output.dt
),
file=self.fp,
)
if show_cmd:
print(escape_cmdline(output.cmd), file=self.fp)
if show_output:
def write_with_fallback(fp, data):
try:
fp.write(data)
except UnicodeEncodeError as e:
# In case the data contains something not directly
# encodable, use \uXXXX.
fp.write(
"WARNING: Falling back from exception: {}\n".format(e)
)
fp.write("WARNING: The following output is escaped, ")
fp.write("and may be different than original one.\n")
fp.write(
data.encode("ascii", "namereplace").decode("ascii")
)
write_with_fallback(self.fp, output.out)
write_with_fallback(self.fp, output.err)
self.n += 1
if result.result == TestResult.PASS and not result.test.random:
self.counts["PASS"] += 1
elif result.test.expect and not result.test.random:
self.counts["FAIL"] += 1
else:
self.counts["SKIP"] += 1
if self.options.format == "automation":
if result.result != TestResult.PASS and len(result.results) > 1:
for sub_ok, msg in result.results:
tup = (sub_ok, result.test.expect, result.test.random)
label = self.LABELS[tup][0]
if label == "TEST-UNEXPECTED-PASS":
label = "TEST-PASS (EXPECTED RANDOM)"
self.print_automation_result(
label, result.test, time=output.dt, message=msg
)
tup = (result.result, result.test.expect, result.test.random)
self.print_automation_result(
self.LABELS[tup][0],
result.test,
time=output.dt,
extra=getattr(output, "extra", None),
)
return
if dev_label:
def singular(label):
return "FIXED" if label == "FIXES" else label[:-1]
self.pb.message("{} - {}".format(singular(dev_label), output.test.path))
self.pb.update(self.n, self.counts)
def finish(self, completed):
self.pb.finish(completed)
if self.options.format == "automation":
self.slog.suite_end()
else:
self.list(completed)
if self.wptreport is not None:
self.wptreport.suite_end()
# Conceptually, this maps (test result x test expectation) to text labels.
# key is (result, expect, random)
# value is (automation label, dev test category)
LABELS = {
(TestResult.CRASH, False, False): ("TEST-UNEXPECTED-FAIL", "REGRESSIONS"),
(TestResult.CRASH, False, True): ("TEST-UNEXPECTED-FAIL", "REGRESSIONS"),
(TestResult.CRASH, True, False): ("TEST-UNEXPECTED-FAIL", "REGRESSIONS"),
(TestResult.CRASH, True, True): ("TEST-UNEXPECTED-FAIL", "REGRESSIONS"),
(TestResult.FAIL, False, False): ("TEST-KNOWN-FAIL", ""),
(TestResult.FAIL, False, True): ("TEST-KNOWN-FAIL (EXPECTED RANDOM)", ""),
(TestResult.FAIL, True, False): ("TEST-UNEXPECTED-FAIL", "REGRESSIONS"),
(TestResult.FAIL, True, True): ("TEST-KNOWN-FAIL (EXPECTED RANDOM)", ""),
(TestResult.PASS, False, False): ("TEST-UNEXPECTED-PASS", "FIXES"),
(TestResult.PASS, False, True): ("TEST-PASS (EXPECTED RANDOM)", ""),
(TestResult.PASS, True, False): ("TEST-PASS", ""),
(TestResult.PASS, True, True): ("TEST-PASS (EXPECTED RANDOM)", ""),
}
def list(self, completed):
for label, results in sorted(self.groups.items()):
if label == "":
continue
print(label)
for result in results:
print(
" {}".format(
" ".join(
result.test.jitflags
+ result.test.options
+ [result.test.path]
)
)
)
if self.options.failure_file:
failure_file = open(self.options.failure_file, "w")
if not self.all_passed():
if "REGRESSIONS" in self.groups:
for result in self.groups["REGRESSIONS"]:
print(result.test.path, file=failure_file)
if "TIMEOUTS" in self.groups:
for result in self.groups["TIMEOUTS"]:
print(result.test.path, file=failure_file)
failure_file.close()
suffix = "" if completed else " (partial run -- interrupted by user)"
if self.all_passed():
print("PASS" + suffix)
else:
print("FAIL" + suffix)
if self.options.show_slow:
min_duration = self.options.slow_test_threshold
print("Slow tests (duration > {}s)".format(min_duration))
slow_tests = sorted(self.slow_tests, key=lambda x: x.duration, reverse=True)
any = False
for test in slow_tests:
print("{:>5} {}".format(round(test.duration, 2), test.test))
any = True
if not any:
print("None")
def all_passed(self):
return "REGRESSIONS" not in self.groups and "TIMEOUTS" not in self.groups
def print_automation_result(
self, label, test, message=None, skip=False, time=None, extra=None
):
result = label
result += " | " + test.path
args = []
if self.options.shell_args:
args.append(self.options.shell_args)
args += test.jitflags
result += ' | (args: "{}")'.format(" ".join(args))
if message:
result += " | " + message
if skip:
result += " | (SKIP)"
if time > self.options.timeout:
result += " | (TIMEOUT)"
result += " [{:.1f} s]".format(time)
print(result)
details = {"extra": extra.copy() if extra else {}}
if self.options.shell_args:
details["extra"]["shell_args"] = self.options.shell_args
details["extra"]["jitflags"] = test.jitflags
if message:
details["message"] = message
status = "FAIL" if "TEST-UNEXPECTED" in label else "PASS"
self.slog.test(test.path, status, time or 0, **details)