Source code
Revision control
Copy as Markdown
Other Tools
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
import os
import platform
import re
import signal
import subprocess
import sys
from mozlint import result
from mozlint.pathutils import expand_exclusions
here = os.path.abspath(os.path.dirname(__file__))
def default_bindir():
# We use sys.prefix to find executables as that gets modified with
# virtualenv's activate_this.py, whereas sys.executable doesn't.
if platform.system() == "Windows":
return os.path.join(sys.prefix, "Scripts")
else:
return os.path.join(sys.prefix, "bin")
def get_black_version(binary):
"""
Returns found binary's version
"""
try:
output = subprocess.check_output(
[binary, "--version"],
stderr=subprocess.STDOUT,
universal_newlines=True,
)
except subprocess.CalledProcessError as e:
output = e.output
try:
# Accept `black.EXE, version ...` on Windows.
# for old version of black, the output is
# black, version 21.4b2
# From black 21.11b1, the output is like
# black, 21.11b1 (compiled: no)
return re.match(r"black.*,( version)? (\S+)", output)[2]
except TypeError as e:
print(f"Could not parse the version '{output}'")
print(f"Error: {e}")
def parse_issues(config, output, paths, *, log):
would_reformat = re.compile("^would reformat (.*)$", re.I)
reformatted = re.compile("^reformatted (.*)$", re.I)
cannot_reformat = re.compile("^error: cannot format (.*?): (.*)$", re.I)
results = []
for l in output.split(b"\n"):
line = l.decode("utf-8").rstrip("\r\n")
if line.startswith("All done!") or line.startswith("Oh no!"):
break
match = would_reformat.match(line)
if match:
res = {"path": match.group(1), "level": "error"}
results.append(result.from_config(config, **res))
continue
match = reformatted.match(line)
if match:
res = {"path": match.group(1), "level": "warning", "message": "reformatted"}
results.append(result.from_config(config, **res))
continue
match = cannot_reformat.match(line)
if match:
res = {"path": match.group(1), "level": "error", "message": match.group(2)}
results.append(result.from_config(config, **res))
continue
log.debug(f"Unhandled line: {line}")
return results
def run_process(config, cmd):
orig = signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
signal.signal(signal.SIGINT, orig)
try:
output, _ = proc.communicate()
proc.wait()
except KeyboardInterrupt:
proc.kill()
return output
def run_black(config, paths, fix=None, *, log, virtualenv_bin_path):
fixed = 0
binary = os.path.join(virtualenv_bin_path or default_bindir(), "black")
log.debug(f"Black version {get_black_version(binary)}")
cmd_args = [binary]
if not fix:
cmd_args.append("--check")
if platform.system() == "Windows":
MAX_PATHS_PER_JOB = (
50 # set a max size to prevent command lines that are too long on Windows
)
chunked_paths = [
paths[i : i + MAX_PATHS_PER_JOB]
for i in range(0, len(paths), MAX_PATHS_PER_JOB)
]
else:
chunked_paths = [paths]
all_output = []
for paths_chunk in chunked_paths:
base_command = cmd_args + paths_chunk
log.debug("Command: {}".format(" ".join(base_command)))
output = parse_issues(
config, run_process(config, base_command), paths_chunk, log=log
)
all_output.extend(output)
# black returns an issue for fixed files as well
for eachIssue in output:
if eachIssue.message == "reformatted":
fixed += 1
return {"results": all_output, "fixed": fixed}
def lint(paths, config, fix=None, **lintargs):
files = list(expand_exclusions(paths, config, lintargs["root"]))
return run_black(
config,
files,
fix=fix,
log=lintargs["log"],
virtualenv_bin_path=lintargs.get("virtualenv_bin_path"),
)