Source code
Revision control
Copy as Markdown
Other Tools
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
import json
import os
import platform
import re
import signal
import subprocess
import sys
from pathlib import Path
from mozlint import result
here = os.path.abspath(os.path.dirname(__file__))
def default_bindir():
# We use sys.prefix to find executables as that gets modified with
# virtualenv's activate_this.py, whereas sys.executable doesn't.
if platform.system() == "Windows":
return os.path.join(sys.prefix, "Scripts")
else:
return os.path.join(sys.prefix, "bin")
def get_ruff_version(binary):
"""
Returns found binary's version
"""
try:
output = subprocess.check_output(
[binary, "--version"],
stderr=subprocess.STDOUT,
text=True,
)
except subprocess.CalledProcessError as e:
output = e.output
matches = re.match(r"ruff ([0-9\.]+)", output)
if matches:
return matches[1]
print("Error: Could not parse the version '{}'".format(output))
def run_process(config, cmd, **kwargs):
orig = signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True
)
signal.signal(signal.SIGINT, orig)
try:
output, _ = proc.communicate()
proc.wait()
except KeyboardInterrupt:
proc.kill()
return output
def lint(paths, config, log, **lintargs):
fixed = 0
results = []
if not paths:
return {"results": results, "fixed": fixed}
# Currently ruff only lints non `.py` files if they are explicitly passed
# in. So we need to find any non-py files manually. This can be removed
exts = [e for e in config["extensions"] if e != "py"]
non_py_files = []
for path in paths:
p = Path(path)
if not p.is_dir():
continue
for ext in exts:
non_py_files.extend([str(f) for f in p.glob(f"**/*.{ext}")])
args = ["ruff", "check", "--force-exclude"] + paths + non_py_files
if config.get("exclude"):
args.append(f"--extend-exclude={','.join(config['exclude'])}")
process_kwargs = {"processStderrLine": lambda line: log.debug(line)}
warning_rules = set(config.get("warning-rules", []))
if lintargs.get("fix"):
# Do a first pass with --fix-only as the json format doesn't return the
# number of fixed issues.
fix_args = args + ["--fix-only"]
if not lintargs.get("warning"):
# Don't fix warnings to limit unrelated changes sneaking into patches.
# except when --fix -W is passed
fix_args.append(f"--extend-ignore={','.join(warning_rules)}")
log.debug(f"Running --fix: {fix_args}")
output = run_process(config, fix_args, **process_kwargs)
matches = re.match(r"Fixed (\d+) errors?.", output)
if matches:
fixed = int(matches[1])
args += ["--output-format=json"]
log.debug(f"Running with args: {args}")
output = run_process(config, args, **process_kwargs)
if not output:
return []
try:
issues = json.loads(output)
except json.JSONDecodeError:
log.error(f"could not parse output: {output}")
return []
for issue in issues:
res = {
"path": issue["filename"],
"lineno": issue["location"]["row"],
"column": issue["location"]["column"],
"lineoffset": issue["end_location"]["row"] - issue["location"]["row"],
"message": issue["message"],
"rule": issue["code"],
"level": "error",
}
if any(issue["code"].startswith(w) for w in warning_rules):
res["level"] = "warning"
if issue["fix"]:
res["hint"] = issue["fix"]["message"]
results.append(result.from_config(config, **res))
return {"results": results, "fixed": fixed}