Source code

Revision control

Copy as Markdown

Other Tools

#!/usr/bin/env python3
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
##########################################################################
#
# This is a collection of helper tools to get stuff done in NSS.
#
import sys
import argparse
import fnmatch
import glob
import io
import logging
import re
import subprocess
import os
import platform
import shutil
import tarfile
import tempfile
import urllib.request
import zipfile
from hashlib import sha256
DEVNULL = open(os.devnull, 'wb')
cwd = os.path.dirname(os.path.abspath(__file__))
# --- Logging setup for coverage commands -------------------------------------
def _colors_supported():
"""Return True if stderr supports ANSI color codes."""
if os.environ.get("NO_COLOR"):
return False
if os.environ.get("FORCE_COLOR"):
return True
return hasattr(sys.stderr, "isatty") and sys.stderr.isatty()
class _ColorFormatter(logging.Formatter):
"""Formatter that adds ANSI colors based on log level."""
RESET = "\033[0m"
COLORS = {
logging.CRITICAL: "\033[1;31m", # Bold red
logging.ERROR: "\033[31m", # Red
logging.WARNING: "\033[33m", # Yellow
logging.DEBUG: "\033[90m", # Dark gray
}
def format(self, record):
if record.levelno == logging.INFO:
return record.getMessage()
color = self.COLORS.get(record.levelno, "")
return "{}{}:{} {}".format(
color, record.levelname, self.RESET, record.getMessage())
class _PlainFormatter(logging.Formatter):
"""Plain formatter without colors."""
def format(self, record):
if record.levelno == logging.INFO:
return record.getMessage()
return "{}: {}".format(record.levelname, record.getMessage())
def _setup_coverage_logger():
"""Create and configure the logger for coverage commands."""
logger = logging.getLogger("nss.coverage")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.INFO)
if _colors_supported():
handler.setFormatter(_ColorFormatter())
else:
handler.setFormatter(_PlainFormatter())
logger.addHandler(handler)
return logger
cov_log = _setup_coverage_logger()
# -----------------------------------------------------------------------------
def run_tests(test=None, cycles="standard", env={}, silent=False, logfile=None):
domsuf = os.getenv('DOMSUF', "localdomain")
host = os.getenv('HOST', "localhost")
env = env.copy()
env.update({
"NSS_CYCLES": cycles,
"DOMSUF": domsuf,
"HOST": host
})
if test is not None:
env["NSS_TESTS"] = test
os_env = os.environ
os_env.update(env)
kwargs = {"env": os_env, "cwd": os.path.join(cwd, "tests")}
if logfile:
kwargs["stdout"] = open(logfile, 'w')
kwargs["stderr"] = subprocess.STDOUT
elif silent:
kwargs["stdout"] = kwargs["stderr"] = DEVNULL
subprocess.check_call("./all.sh", **kwargs)
class cfAction(argparse.Action):
docker_command = None
restorecon = None
def __call__(self, parser, args, values, option_string=None):
self.setDockerCommand(args)
if values:
files = [os.path.relpath(os.path.abspath(x), start=cwd) for x in values]
else:
files = self.modifiedFiles()
# First check if we can run docker.
try:
with open(os.devnull, "w") as f:
subprocess.check_call(
self.docker_command + ["images"], stdout=f)
except:
self.docker_command = None
if self.docker_command is None:
print("warning: running clang-format directly, which isn't guaranteed to be correct")
command = [cwd + "/automation/clang-format/run_clang_format.sh"] + files
repr(command)
subprocess.call(command)
return
files = [os.path.join('/home/worker/nss', x) for x in files]
docker_image = 'clang-format-service:latest'
cf_docker_folder = cwd + "/automation/clang-format"
# Build the image if necessary.
if self.filesChanged(cf_docker_folder):
self.buildImage(docker_image, cf_docker_folder)
# Check if we have the docker image.
try:
command = self.docker_command + [
"image", "inspect", "clang-format-service:latest"
]
with open(os.devnull, "w") as f:
subprocess.check_call(command, stdout=f)
except:
print("I have to build the docker image first.")
self.buildImage(docker_image, cf_docker_folder)
command = self.docker_command + [
'run', '-v', cwd + ':/home/worker/nss:Z', '--rm', '-ti', docker_image
]
# The clang format script returns 1 if something's to do. We don't
# care.
subprocess.call(command + files)
if self.restorecon is not None:
subprocess.call([self.restorecon, '-R', cwd])
def filesChanged(self, path):
hash = sha256()
for dirname, dirnames, files in os.walk(path):
for file in files:
with open(os.path.join(dirname, file), "rb") as f:
hash.update(f.read())
chk_file = cwd + "/.chk"
old_chk = ""
new_chk = hash.hexdigest()
if os.path.exists(chk_file):
with open(chk_file) as f:
old_chk = f.readline()
if old_chk != new_chk:
with open(chk_file, "w+") as f:
f.write(new_chk)
return True
return False
def buildImage(self, docker_image, cf_docker_folder):
command = self.docker_command + [
"build", "-t", docker_image, cf_docker_folder
]
subprocess.check_call(command)
return
def setDockerCommand(self, args):
if platform.system() == "Linux":
self.restorecon = shutil.which("restorecon")
dcmd = shutil.which("docker")
if dcmd is not None:
self.docker_command = [dcmd]
if not args.noroot:
self.docker_command = ["sudo"] + self.docker_command
else:
self.docker_command = None
def modifiedFiles(self):
files = []
if os.path.exists(os.path.join(cwd, '.hg')):
st = subprocess.Popen(['hg', 'status', '-m', '-a'],
cwd=cwd, stdout=subprocess.PIPE, universal_newlines=True)
for line in iter(st.stdout.readline, ''):
files += [line[2:].rstrip()]
elif os.path.exists(os.path.join(cwd, '.git')):
st = subprocess.Popen(['git', 'status', '--porcelain'],
cwd=cwd, stdout=subprocess.PIPE)
for line in iter(st.stdout.readline, ''):
if line[1] == 'M' or line[1] != 'D' and \
(line[0] == 'M' or line[0] == 'A' or
line[0] == 'C' or line[0] == 'U'):
files += [line[3:].rstrip()]
elif line[0] == 'R':
files += [line[line.index(' -> ', beg=4) + 4:]]
else:
print('Warning: neither mercurial nor git detected!')
def isFormatted(x):
return x[-2:] == '.c' or x[-3:] == '.cc' or x[-2:] == '.h'
return [x for x in files if isFormatted(x)]
class buildAction(argparse.Action):
def __call__(self, parser, args, values, option_string=None):
subprocess.check_call([cwd + "/build.sh"] + values)
class testAction(argparse.Action):
def __call__(self, parser, args, values, option_string=None):
run_tests(values)
class covAction(argparse.Action):
def runSslGtests(self, outdir):
env = {
"GTESTFILTER": "*", # Prevent parallel test runs.
"ASAN_OPTIONS": "coverage=1:coverage_dir=" + outdir,
"NSS_DEFAULT_DB_TYPE": "sql",
"NSS_DISABLE_UNLOAD": "1"
}
run_tests("ssl_gtests", env=env)
def findSanCovFile(self, outdir):
for file in os.listdir(outdir):
if fnmatch.fnmatch(file, 'ssl_gtest.*.sancov'):
return os.path.join(outdir, file)
return None
def __call__(self, parser, args, values, option_string=None):
outdir = args.outdir
print("Output directory: " + outdir)
print("\nBuild with coverage sanitizers...\n")
sancov_args = "edge,no-prune,trace-pc-guard,trace-cmp"
subprocess.check_call([
os.path.join(cwd, "build.sh"), "-c", "--clang", "--asan", "--enable-legacy-db",
"--sancov=" + sancov_args
])
print("\nRun ssl_gtests to get a coverage report...")
self.runSslGtests(outdir)
print("Done.")
sancov_file = self.findSanCovFile(outdir)
if not sancov_file:
print("Couldn't find .sancov file.")
sys.exit(1)
symcov_file = os.path.join(outdir, "ssl_gtest.symcov")
out = open(symcov_file, 'wb')
# Don't exit immediately on error
symbol_retcode = subprocess.call([
"sancov",
"-ignorelist=" + os.path.join(cwd, ".sancov-blacklist"),
"-symbolize", sancov_file,
os.path.join(cwd, "../dist/Debug/bin/ssl_gtest")
], stdout=out)
out.close()
print("\nCopying ssl_gtests to artifacts...")
shutil.copyfile(os.path.join(cwd, "../dist/Debug/bin/ssl_gtest"),
os.path.join(outdir, "ssl_gtest"))
print("\nCoverage report: " + symcov_file)
if symbol_retcode > 0:
print("sancov failed to symbolize with return code {}".format(symbol_retcode))
sys.exit(symbol_retcode)
class sourceCovAction(argparse.Action):
# Regex for filenames to exclude from coverage reports.
IGNORE_REGEX = "gtests/google_test|lib/sqlite"
JOB_NAME = "NSS Test Coverage"
@staticmethod
def _vcs():
"""Return 'hg', 'git', or None."""
if os.path.exists(os.path.join(cwd, '.hg')):
return 'hg'
if os.path.exists(os.path.join(cwd, '.git')):
return 'git'
return None
@staticmethod
def _run_cmd(cmd):
"""Run cmd, return stripped stdout or None on failure."""
try:
r = subprocess.run(cmd, cwd=cwd, capture_output=True,
text=True, check=True)
return r.stdout.strip() or None
except (subprocess.CalledProcessError, FileNotFoundError):
return None
@staticmethod
def getCommitInfo():
"""Return 'shorthash: first line of commit' for the working directory.
Supports both Mercurial and Git. Returns None on failure.
"""
vcs = sourceCovAction._vcs()
if vcs == 'hg':
return sourceCovAction._run_cmd(
['hg', 'log', '-r', '.', '--template',
'{node|short}: {desc|firstline}'])
if vcs == 'git':
return sourceCovAction._run_cmd(
['git', 'log', '-1', '--format=%h: %s'])
return None
@staticmethod
def detectLlvmVersion():
"""Detect the major LLVM version from the clang on PATH."""
output = sourceCovAction._run_cmd(['clang', '--version'])
if output:
m = re.search(r'clang version (\d+)', output)
if m:
return m.group(1)
return "18" # Fallback for CI images where clang may not be on PATH.
_llvm_version_cached = None
def _llvm_version(self):
if type(self)._llvm_version_cached is None:
type(self)._llvm_version_cached = self.detectLlvmVersion()
return type(self)._llvm_version_cached
def detectBaseRev(self):
"""Auto-detect the base revision for diff coverage.
For Mercurial, returns the most recent public ancestor (i.e. the
boundary between pushed and local draft changes).
For Git, returns the merge-base with the upstream tracking branch,
falling back to origin/main or origin/master.
Returns None if detection fails.
"""
vcs = self._vcs()
if vcs == 'hg':
return self._run_cmd(
['hg', 'log', '-r', 'last(ancestors(.) and public())',
'--template', '{node|short}'])
if vcs == 'git':
for ref in ['@{upstream}', 'origin/main', 'origin/master']:
rev = self._run_cmd(['git', 'merge-base', 'HEAD', ref])
if rev:
return rev
return None
@staticmethod
def _cleanup_tmp_git():
"""Remove the temporary git repo."""
shutil.rmtree(os.path.join(cwd, '.git'), ignore_errors=True)
def runDiffCover(self, lcov_file, outdir, base_rev, fail_under=None):
"""Run diff-cover against the given base revision.
Generates a diff, runs diff-cover on it, and reports coverage of
changed lib/ lines. Requires the diff_cover package to be installed.
Skips gracefully if it is not available.
If fail_under is set, diff-cover exits non-zero when coverage of
changed lines is below that percentage (used by CI).
"""
if not shutil.which("diff-cover"):
cov_log.error(
"\ndiff-cover is not installed; skipping diff coverage.")
cov_log.info("Install with: pipx install diff-cover")
return
if not base_rev:
base_rev = self.detectBaseRev()
if not base_rev:
cov_log.error(
"\nCould not determine base revision; "
"skipping diff coverage.")
cov_log.info("Use --base-rev to specify manually.")
return
cov_log.info("\nGenerating diff coverage against base revision "
"%s...\n", base_rev)
is_hg = self._vcs() == 'hg'
tmp_git = False
if is_hg:
# diff-cover requires a real git repo. Build a temporary one
# with two commits (base -> current) so that diff-cover can
# resolve source paths and compute its diff natively.
diff_file = os.path.join(outdir, "changes.diff")
with open(diff_file, 'w') as f:
subprocess.check_call(
['hg', 'diff', '--git', '-r', base_rev],
cwd=cwd, stdout=f
)
if (not os.path.exists(diff_file)
or os.path.getsize(diff_file) == 0):
cov_log.info("No changes detected against base revision; "
"skipping diff coverage.")
return
cov_log.info("Creating temporary git repo for diff-cover...")
subprocess.check_call(
['git', 'init'], cwd=cwd,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
tmp_git = True
# Ensure git can commit in CI where no identity is configured.
for gcfg in [['user.email', 'nss@ci'], ['user.name', 'nss']]:
subprocess.check_call(
['git', 'config'] + gcfg, cwd=cwd,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
# Exclude hg metadata from the temporary git repo.
with open(os.path.join(cwd, '.git', 'info', 'exclude'), 'a') as f:
f.write('.hg\n')
# Commit the current (post-change) working-tree state.
subprocess.check_call(
['git', 'add', '-A'], cwd=cwd,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
subprocess.check_call(
['git', 'commit', '-m', 'current'], cwd=cwd,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
# Create a 'base' branch and reverse-apply the diff so that
# it represents the pre-change state.
subprocess.check_call(
['git', 'checkout', '-b', 'base'], cwd=cwd,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
try:
subprocess.check_call(
['git', 'apply', '--reverse', diff_file], cwd=cwd,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
except subprocess.CalledProcessError:
# Restore working tree and bail out.
subprocess.call(
['git', 'checkout', '--', '.'], cwd=cwd,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
cov_log.error(
"Could not reconstruct base revision in "
"temporary git repo; skipping diff coverage.")
self._cleanup_tmp_git()
return
subprocess.check_call(
['git', 'add', '-A'], cwd=cwd,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
subprocess.check_call(
['git', 'commit', '-m', 'base'], cwd=cwd,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
# Switch back so the working tree matches the current source.
subprocess.check_call(
['git', 'checkout', '-'], cwd=cwd,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
compare_branch = 'base'
else:
# Native git repo – just check whether there are changes.
result = subprocess.run(
['git', 'diff', '--quiet', base_rev],
cwd=cwd, capture_output=True
)
if result.returncode == 0:
cov_log.info("No changes detected against base revision; "
"skipping diff coverage.")
return
compare_branch = base_rev
try:
# Save the diff as an artifact for inspection.
diff_file = os.path.join(outdir, "changes.diff")
with open(diff_file, 'w') as f:
subprocess.call(
['git', 'diff', compare_branch + '..HEAD'],
cwd=cwd, stdout=f
)
# Run diff-cover.
diff_html = os.path.join(outdir, "diff_coverage.html")
cmd = [
'diff-cover',
lcov_file,
'--compare-branch', compare_branch,
'--diff-range-notation', '..',
'--format', 'html:' + diff_html,
'--include=lib/**',
]
if fail_under is not None:
cmd.append('--fail-under={}'.format(fail_under))
rc = subprocess.call(cmd, stderr=subprocess.STDOUT)
if rc == 1 and fail_under is not None:
cov_log.error(
"Coverage of changed lines is below %d%%.",
fail_under)
sys.exit(rc)
elif rc != 0:
cov_log.error("diff-cover exited with code %d.", rc)
if fail_under is not None:
sys.exit(rc)
cov_log.info("Diff coverage report: %s", diff_html)
cov_log.info("Diff file: %s", diff_file)
finally:
if tmp_git:
self._cleanup_tmp_git()
def generateReports(self, outdir, profraw_dir, args):
"""Merge profiles, generate coverage reports, and run diff-cover."""
# Merge raw profiles.
cov_log.info("\nMerging profile data...\n")
profraw_files = glob.glob(os.path.join(profraw_dir, "*.profraw"))
if not profraw_files:
cov_log.error("No .profraw files found in %s", profraw_dir)
sys.exit(1)
profdata = os.path.join(outdir, "merged.profdata")
filelist = os.path.join(outdir, "profraw_files.txt")
with open(filelist, "w") as f:
f.write("\n".join(profraw_files) + "\n")
try:
subprocess.check_call([
"llvm-profdata-" + self._llvm_version(), "merge", "-sparse",
"-f", filelist, "-o", profdata
])
finally:
os.unlink(filelist)
# Discover instrumented objects.
objects = self.findObjects()
if not objects:
cov_log.error("No instrumented objects found.")
sys.exit(1)
cov_log.info("Found %d instrumented objects.", len(objects))
# Scope reports to lib/ only.
lib_dir = os.path.join(cwd, "lib")
# Build a descriptive title for the HTML report.
commit_info = self.getCommitInfo()
if commit_info:
title = "{} - {}".format(self.JOB_NAME, commit_info)
else:
title = self.JOB_NAME
# Generate HTML report.
cov_log.info("\nGenerating HTML coverage report...\n")
html_dir = os.path.join(outdir, "coverage_html")
self.runLlvmCov("show", objects, profdata, extra_args=[
"--format=html",
"--output-dir=" + html_dir,
"--show-directory-coverage",
"--project-title=" + title,
lib_dir,
])
# Generate LCOV report.
cov_log.info("Generating LCOV report...\n")
lcov_file = os.path.join(outdir, "coverage.lcov")
self.runLlvmCov("export", objects, profdata, outfile=lcov_file,
extra_args=["--format=lcov", lib_dir])
# Rewrite LCOV source paths from absolute to repo-relative so that
# diff-cover can match them against git diff paths.
prefix = cwd + os.sep
with open(lcov_file, 'r') as f:
lcov_data = f.read()
lcov_data = lcov_data.replace('SF:' + prefix, 'SF:')
with open(lcov_file, 'w') as f:
f.write(lcov_data)
cov_log.info("Coverage LCOV data: %s", lcov_file)
# Clean up raw profile data now that reports have been generated.
shutil.rmtree(profraw_dir)
# Bundle large directories into zip archives to reduce artifact
# clutter in CI.
cov_log.info("\nBundling artifacts...\n")
# Zip coverage HTML report.
html_zip = os.path.join(outdir, "full_coverage_html_report.zip")
with zipfile.ZipFile(html_zip, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, _dirs, files in os.walk(html_dir):
for fname in files:
fpath = os.path.join(root, fname)
zf.write(fpath, os.path.relpath(fpath, outdir))
shutil.rmtree(html_dir)
cov_log.info("Coverage HTML zip: %s", html_zip)
# Diff coverage (optional).
base_rev = getattr(args, 'base_rev', None)
fail_under = getattr(args, 'diff_fail_under', None)
self.runDiffCover(lcov_file, outdir, base_rev, fail_under)
def findDistDir(self):
"""Find the dist output directory via dist/latest."""
latest = os.path.join(cwd, "..", "dist", "latest")
with open(latest) as f:
objdir = f.read().strip()
return os.path.join(cwd, "..", "dist", objdir)
@staticmethod
def _is_elf(path):
"""Check whether a file is an ELF binary."""
try:
with open(path, 'rb') as f:
return f.read(4) == b'\x7fELF'
except (IOError, OSError):
return False
def _find_shared_libs(self):
"""Find instrumented shared libraries in the dist lib directory."""
dist_dir = self.findDistDir()
return glob.glob(os.path.join(dist_dir, "lib", "*.so"))
def findObjects(self):
"""Find instrumented shared libraries and executables."""
objects = self._find_shared_libs()
bin_dir = os.path.join(self.findDistDir(), "bin")
# ELF executables (skip scripts like nspr-config).
objects += [os.path.join(bin_dir, f) for f in os.listdir(bin_dir)
if os.path.isfile(os.path.join(bin_dir, f))
and self._is_elf(os.path.join(bin_dir, f))]
return objects
def runLlvmCov(self, command, objects, profdata, outfile=None,
extra_args=None):
"""Run an llvm-cov subcommand with the given objects and profile."""
obj_args = [a for obj in objects[1:] for a in ("--object", obj)]
cmd = [
"llvm-cov-" + self._llvm_version(), command,
"--instr-profile=" + profdata,
"--ignore-filename-regex=" + self.IGNORE_REGEX,
objects[0],
] + obj_args + (extra_args or [])
if outfile:
with open(outfile, 'w') as f:
subprocess.check_call(cmd, stdout=f)
else:
subprocess.check_call(cmd)
def __call__(self, parser, args, values, option_string=None):
outdir = os.path.abspath(args.outdir)
profraw_dir = os.path.join(outdir, "profraw")
os.makedirs(profraw_dir, exist_ok=True)
cov_log.info("Output directory: %s", outdir)
# Build with source-based coverage instrumentation.
cov_log.info("\nBuild with source-based coverage...\n")
subprocess.check_call([
os.path.join(cwd, "build.sh"), "-c", "--clang", "--opt",
"--sourcecov"
])
# Run test suites. If --test was given, run only that suite;
# otherwise run all suites (uses defaults from tests/all.sh).
# Failures are logged but do not abort — we still want coverage
# data from whichever tests did execute.
test_suite = getattr(args, 'test_suite', None)
test_logfile = os.path.join(outdir, "test_output.log")
if test_suite:
cov_log.info("\nRunning test suite: %s...\n", test_suite)
else:
cov_log.info("\nRunning all test suites...\n")
cov_log.warning("Test output is being written to %s", test_logfile)
cov_log.info("This will take approximately 40 minutes. "
"No output will appear until tests complete.\n")
env = {
"LLVM_PROFILE_FILE": os.path.join(profraw_dir, "%p-%m.profraw"),
"GTESTFILTER": "*", # Prevent parallel test runs.
"NSS_DEFAULT_DB_TYPE": "sql",
"NSS_DISABLE_UNLOAD": "1",
}
try:
run_tests(test=test_suite, cycles="standard", env=env,
logfile=test_logfile)
cov_log.info("Done running tests.")
except subprocess.CalledProcessError as e:
cov_log.warning(
"Test suite exited with code %d; "
"continuing to generate coverage reports.", e.returncode)
self.generateReports(outdir, profraw_dir, args)
class fuzzCovAction(sourceCovAction):
IGNORE_REGEX = "fuzz/targets|lib/sqlite"
JOB_NAME = "NSS Fuzz Coverage"
GCS_CORPUS_URL = ("https://storage.googleapis.com/"
"nss-backup.clusterfuzz-external.appspot.com/"
"corpus/libFuzzer/nss_{corpus}/public.zip")
def _find_fuzz_binaries(self):
"""Find nssfuzz-* binaries in the dist bin directory."""
bin_dir = os.path.join(self.findDistDir(), "bin")
return [f for f in glob.glob(os.path.join(bin_dir, "nssfuzz-*"))
if os.path.isfile(f)]
def findObjects(self):
"""Find instrumented shared libraries and fuzz binaries."""
return self._find_shared_libs() + self._find_fuzz_binaries()
def discoverTargets(self):
"""Find fuzz target binaries in the build output."""
return {os.path.basename(f).replace("nssfuzz-", "", 1): f
for f in sorted(self._find_fuzz_binaries())}
def getCorpusName(self, target):
"""Determine the base corpus name for a target.
Targets with a -no_fuzzer_mode suffix share the same corpus
as their base target (e.g. pkcs12-no_fuzzer_mode uses pkcs12).
"""
return target.removesuffix("-no_fuzzer_mode")
def downloadCorpus(self, corpus, corpus_dir):
"""Download corpus from GCS if the local directory is empty."""
if os.listdir(corpus_dir):
cov_log.info(" Using existing local corpus (%d files)",
len(os.listdir(corpus_dir)))
return
url = self.GCS_CORPUS_URL.format(corpus=corpus)
zip_path = os.path.join(corpus_dir, "public.zip")
cov_log.info(" Downloading corpus from GCS...")
try:
urllib.request.urlretrieve(url, zip_path)
with zipfile.ZipFile(zip_path, 'r') as z:
z.extractall(corpus_dir)
except Exception as e:
cov_log.warning("Could not download corpus for %s: %s",
corpus, e)
finally:
if os.path.exists(zip_path):
os.unlink(zip_path)
def getTargetOptions(self, corpus):
"""Parse libFuzzer options for a target."""
options_file = os.path.join(cwd, "fuzz", "options",
corpus + ".options")
if not os.path.isfile(options_file):
return []
result = subprocess.run(
[sys.executable, os.path.join(cwd, "fuzz", "config",
"libfuzzer_options.py"),
options_file],
capture_output=True, text=True, check=True
)
return [opt for opt in result.stdout.strip().split('\n') if opt]
def __call__(self, parser, args, values, option_string=None):
outdir = os.path.abspath(args.outdir)
max_time = args.max_total_time
target_filter = None
if args.targets:
target_filter = set(args.targets.split(','))
skip_build = args.skip_build
skip_download = args.skip_download
profraw_dir = os.path.join(outdir, "profraw")
os.makedirs(profraw_dir, exist_ok=True)
cov_log.info("Output directory: %s", outdir)
# Build with source-based coverage + fuzzing instrumentation.
if not skip_build:
cov_log.info("\nBuild with source-based coverage + fuzzing...\n")
subprocess.check_call([
os.path.join(cwd, "build.sh"), "-c", "--clang",
"--sourcecov", "--fuzz", "--disable-tests"
])
else:
cov_log.info("\nSkipping build (--skip-build).\n")
# Set up profile data collection.
os.environ["LLVM_PROFILE_FILE"] = os.path.join(
profraw_dir, "%p-%m.profraw")
# Discover fuzz targets.
targets = self.discoverTargets()
if not targets:
cov_log.error("No fuzz targets found.")
sys.exit(1)
if target_filter:
unknown = target_filter - set(targets.keys())
if unknown:
cov_log.warning("Unknown target(s): %s",
", ".join(sorted(unknown)))
targets = {k: v for k, v in targets.items()
if k in target_filter}
if not targets:
cov_log.error("None of the specified targets were found.")
sys.exit(1)
cov_log.info("Found %d fuzz target(s): %s",
len(targets), ", ".join(sorted(targets.keys())))
# Run each fuzz target.
fuzz_failures = 0
for target, binary in sorted(targets.items()):
corpus = self.getCorpusName(target)
corpus_dir = os.path.join(cwd, "fuzz", "corpus", corpus)
os.makedirs(corpus_dir, exist_ok=True)
cov_log.info("\n=== Fuzz target: %s (corpus: %s) ===",
target, corpus)
if not skip_download:
self.downloadCorpus(corpus, corpus_dir)
options = self.getTargetOptions(target)
cmd = [binary, corpus_dir] + options + [
"-max_total_time=" + str(max_time),
"-print_final_stats=1",
"-close_fd_mask=3",
"-verbosity=0",
"-fork=8",
]
cov_log.info("Running: %s", " ".join(cmd))
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError as e:
cov_log.warning("Fuzz target %s failed (exit code %d); "
"continuing.", target, e.returncode)
fuzz_failures += 1
if fuzz_failures > 0:
cov_log.warning("\n%d fuzz target(s) had failures.",
fuzz_failures)
self.generateReports(outdir, profraw_dir, args)
class docLintAction(argparse.Action):
def __call__(self, parser, args, values, option_string=None):
env = os.environ.copy()
env['VCS_PATH'] = os.path.dirname(cwd)
script = os.path.join(cwd, 'automation', 'taskcluster', 'scripts', 'check_doc_lint.sh')
try:
subprocess.check_call([script], env=env)
except subprocess.CalledProcessError as e:
print("doc-lint failed with exit status {}".format(e.returncode), file=sys.stderr)
sys.exit(e.returncode)
class commandsAction(argparse.Action):
commands = []
def __call__(self, parser, args, values, option_string=None):
for c in commandsAction.commands:
print(c)
def parse_arguments():
parser = argparse.ArgumentParser(
description='NSS helper script. ' +
'Make sure to separate sub-command arguments with --.')
subparsers = parser.add_subparsers()
parser_build = subparsers.add_parser(
'build', help='All arguments are passed to build.sh')
parser_build.add_argument(
'build_args', nargs='*', help="build arguments", action=buildAction)
parser_cf = subparsers.add_parser(
'clang-format',
help="""
Run clang-format.
By default this runs against any files that you have modified. If
there are no modified files, it checks everything.
""")
parser_cf.add_argument(
'--noroot',
help='On linux, suppress the use of \'sudo\' for running docker.',
action='store_true')
parser_cf.add_argument(
'<file/dir>',
nargs='*',
help="Specify files or directories to run clang-format on",
action=cfAction)
parser_test = subparsers.add_parser(
'tests', help='Run tests through tests/all.sh.')
tests = [
"cipher", "lowhash", "chains", "cert", "dbtests", "tools", "fips",
"sdr", "crmf", "smime", "ssl", "ocsp", "merge", "pkits", "ec",
"gtests", "ssl_gtests", "bogo", "interop", "policy"
]
parser_test.add_argument(
'test', choices=tests, help="Available tests", action=testAction)
parser_cov = subparsers.add_parser(
'coverage', help='Generate coverage report')
cov_modules = ["ssl_gtests"]
parser_cov.add_argument(
'--outdir', help='Output directory for coverage report data.',
default=tempfile.mkdtemp())
parser_cov.add_argument(
'module', choices=cov_modules, help="Available coverage modules",
action=covAction)
parser_tcov = subparsers.add_parser(
'test-coverage',
help='Generate source-based test coverage report '
'(HTML, LCOV, summary)')
parser_tcov.add_argument(
'--outdir', help='Output directory for coverage report data.',
default=tempfile.mkdtemp())
parser_tcov.add_argument(
'--base-rev',
help='Base revision for diff coverage (auto-detected if omitted)')
parser_tcov.add_argument(
'--diff-fail-under', type=int, default=90,
help='Fail if diff coverage is below this percentage (e.g. 90)')
parser_tcov.add_argument(
'--test', choices=tests, dest='test_suite',
help='Run only the specified test suite (default: all)')
parser_tcov.add_argument(
'action', nargs='?', default='run',
help="Run test coverage (default action)",
action=sourceCovAction)
parser_fcov = subparsers.add_parser(
'fuzz-coverage',
help='Generate source-based fuzz coverage report '
'(HTML, LCOV, summary)')
parser_fcov.add_argument(
'--outdir', help='Output directory for coverage report data.',
default=tempfile.mkdtemp())
parser_fcov.add_argument(
'--max-total-time', type=int, default=10,
help='Maximum time in seconds to fuzz each target (default: 10)')
parser_fcov.add_argument(
'--targets',
help='Comma-separated list of fuzz target names to run')
parser_fcov.add_argument(
'--skip-build', action='store_true',
help='Skip the build step (use existing build)')
parser_fcov.add_argument(
'--skip-download', action='store_true',
help='Skip corpus download (use only local corpus data)')
parser_fcov.add_argument(
'--base-rev',
help='Base revision for diff coverage (auto-detected if omitted)')
parser_fcov.add_argument(
'--diff-fail-under', type=int, default=None,
help='Fail if diff coverage is below this percentage (e.g. 90)')
parser_fcov.add_argument(
'action', nargs='?', default='run',
help="Run fuzz coverage (default action)",
action=fuzzCovAction)
parser_doc_lint = subparsers.add_parser(
'doc-lint', help='Check RST documentation for lint errors using sphinx-build.')
parser_doc_lint.add_argument(
'doc_lint', nargs='*', action=docLintAction)
parser_commands = subparsers.add_parser(
'mach-completion',
help="list commands")
parser_commands.add_argument(
'mach-completion',
nargs='*',
action=commandsAction)
commandsAction.commands = [c for c in subparsers.choices]
return parser.parse_args()
def main():
parse_arguments()
if __name__ == '__main__':
main()