Source code

Revision control

Copy as Markdown

Other Tools

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import re
import signal
import subprocess
from collections import namedtuple
from mozboot.util import get_tools_dir
from mozfile import which
from mozlint import result
from mozlint.pathutils import expand_exclusions
from packaging.version import Version
RUSTFMT_NOT_FOUND = """
Could not find rustfmt! Install rustfmt and try again.
$ rustup component add rustfmt
And make sure that it is in the PATH
""".strip()
RUSTFMT_INSTALL_ERROR = """
Unable to install correct version of rustfmt
Try to install it manually with:
$ rustup component add rustfmt
""".strip()
RUSTFMT_WRONG_VERSION = """
You are probably using an old version of rustfmt.
Expected version is {version}.
Try to update it:
$ rustup update stable
""".strip()
def parse_issues(config, output, paths):
RustfmtDiff = namedtuple("RustfmtDiff", ["file", "line", "diff"])
issues = []
diff_line = re.compile("^Diff in (.*)(?: at line |:)([0-9]*):")
file = ""
line_no = 0
diff = ""
for line in output.split(b"\n"):
processed_line = (
line.decode("utf-8", "replace") if isinstance(line, bytes) else line
).rstrip("\r\n")
match = diff_line.match(processed_line)
if match:
if diff:
issues.append(RustfmtDiff(file, line_no, diff.rstrip("\n")))
diff = ""
file, line_no = match.groups()
else:
diff += processed_line + "\n"
# the algorithm above will always skip adding the last issue
issues.append(RustfmtDiff(file, line_no, diff))
file = os.path.normcase(os.path.normpath(file))
results = []
for issue in issues:
# rustfmt can not be supplied the paths to the files we want to analyze
# therefore, for each issue detected, we check if any of the the paths
# supplied are part of the file name.
# This just filters out the issues that are not part of paths.
if any([os.path.normcase(os.path.normpath(path)) in file for path in paths]):
res = {
"path": issue.file,
"diff": issue.diff,
"level": "warning",
"lineno": issue.line,
}
results.append(result.from_config(config, **res))
return {"results": results, "fixed": 0}
def run_process(config, cmd):
orig = signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
signal.signal(signal.SIGINT, orig)
try:
output, _ = proc.communicate()
proc.wait()
except KeyboardInterrupt:
proc.kill()
return output
def get_rustfmt_binary():
"""
Returns the path of the first rustfmt binary available
if not found returns None
"""
binary = os.environ.get("RUSTFMT")
if binary:
return binary
rust_path = os.path.join(get_tools_dir(), "rustc", "bin")
return which("rustfmt", path=os.pathsep.join([rust_path, os.environ["PATH"]]))
def get_rustfmt_version(binary):
"""
Returns found binary's version
"""
try:
output = subprocess.check_output(
[binary, "--version"],
stderr=subprocess.STDOUT,
universal_newlines=True,
)
except subprocess.CalledProcessError as e:
output = e.output
version = re.findall(r"\d.\d+.\d+", output)[0]
return Version(version)
def lint(paths, config, fix=None, **lintargs):
log = lintargs["log"]
paths = list(expand_exclusions(paths, config, lintargs["root"]))
# An empty path array can occur when the user passes in `-n`. If we don't
# return early in this case, rustfmt will attempt to read stdin and hang.
if not paths:
return []
binary = get_rustfmt_binary()
if not binary:
print(RUSTFMT_NOT_FOUND)
if "MOZ_AUTOMATION" in os.environ:
return 1
return []
min_version_str = config.get("min_rustfmt_version")
min_version = Version(min_version_str)
actual_version = get_rustfmt_version(binary)
log.debug(
"Found version: {}. Minimal expected version: {}".format(
actual_version, min_version
)
)
if actual_version < min_version:
print(RUSTFMT_WRONG_VERSION.format(version=min_version_str))
return 1
cmd_args = [binary]
cmd_args.append("--check")
base_command = cmd_args + paths
log.debug("Command: {}".format(" ".join(cmd_args)))
output = run_process(config, base_command)
issues = parse_issues(config, output, paths)
if fix:
issues["fixed"] = len(issues["results"])
issues["results"] = []
cmd_args.remove("--check")
base_command = cmd_args + paths
log.debug("Command: {}".format(" ".join(cmd_args)))
output = run_process(config, base_command)
return issues